Skip to content

Commit

Permalink
Fix logical type issues in the Parquet writer (#14322)
Browse files Browse the repository at this point in the history
Closes #14315
Closes #14326

Parquet writer writes time and timestamp types with logical type with `isAdjustedToUTC` as `false`. However, timestamps in libcudf tables are implicitly in UTC and don't need to be adjusted.
This PR changes the `isAdjustedToUTC` to true.

Also added a writer option to write timestamps as local, as this is the expected behavior on the Python side.

Also changed the way logical type is handled for UNKNOWN type columns in `merge_row_group_metadata` - the logical type is excluded from merged metadata because of issues with type inference.

Authors:
  - Vukasin Milovanovic (https://github.com/vuule)

Approvers:
  - David Wendt (https://github.com/davidwendt)
  - GALI PREM SAGAR (https://github.com/galipremsagar)
  - Mike Wilson (https://github.com/hyperbolic2346)

URL: #14322
  • Loading branch information
vuule authored Oct 31, 2023
1 parent b4746d8 commit f4c95aa
Show file tree
Hide file tree
Showing 7 changed files with 121 additions and 31 deletions.
57 changes: 57 additions & 0 deletions cpp/include/cudf/io/parquet.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,9 @@ class parquet_writer_options {
// Parquet writer can write INT96 or TIMESTAMP_MICROS. Defaults to TIMESTAMP_MICROS.
// If true then overrides any per-column setting in _metadata.
bool _write_timestamps_as_int96 = false;
// Parquet writer can write timestamps as UTC
// Defaults to true because libcudf timestamps are implicitly UTC
bool _write_timestamps_as_UTC = true;
// Column chunks file paths to be set in the raw output metadata. One per output file
std::vector<std::string> _column_chunks_file_paths;
// Maximum size of each row group (unless smaller than a single page)
Expand Down Expand Up @@ -652,6 +655,13 @@ class parquet_writer_options {
*/
bool is_enabled_int96_timestamps() const { return _write_timestamps_as_int96; }

/**
* @brief Returns `true` if timestamps will be written as UTC
*
* @return `true` if timestamps will be written as UTC
*/
[[nodiscard]] auto is_enabled_utc_timestamps() const { return _write_timestamps_as_UTC; }

/**
* @brief Returns Column chunks file paths to be set in the raw output metadata.
*
Expand Down Expand Up @@ -789,6 +799,13 @@ class parquet_writer_options {
*/
void enable_int96_timestamps(bool req) { _write_timestamps_as_int96 = req; }

/**
* @brief Sets preference for writing timestamps as UTC. Write timestamps as UTC if set to `true`.
*
* @param val Boolean value to enable/disable writing of timestamps as UTC.
*/
void enable_utc_timestamps(bool val) { _write_timestamps_as_UTC = val; }

/**
* @brief Sets column chunks file path to be set in the raw output metadata.
*
Expand Down Expand Up @@ -1100,6 +1117,18 @@ class parquet_writer_options_builder {
return *this;
}

/**
* @brief Set to true if timestamps are to be written as UTC.
*
* @param enabled Boolean value to enable/disable writing of timestamps as UTC.
* @return this for chaining
*/
parquet_writer_options_builder& utc_timestamps(bool enabled)
{
options._write_timestamps_as_UTC = enabled;
return *this;
}

/**
* @brief Set to true if V2 page headers are to be written.
*
Expand Down Expand Up @@ -1171,6 +1200,8 @@ class chunked_parquet_writer_options {
// Parquet writer can write INT96 or TIMESTAMP_MICROS. Defaults to TIMESTAMP_MICROS.
// If true then overrides any per-column setting in _metadata.
bool _write_timestamps_as_int96 = false;
// Parquet writer can write timestamps as UTC. Defaults to true.
bool _write_timestamps_as_UTC = true;
// Maximum size of each row group (unless smaller than a single page)
size_t _row_group_size_bytes = default_row_group_size_bytes;
// Maximum number of rows in row group (unless smaller than a single page)
Expand Down Expand Up @@ -1254,6 +1285,13 @@ class chunked_parquet_writer_options {
*/
bool is_enabled_int96_timestamps() const { return _write_timestamps_as_int96; }

/**
* @brief Returns `true` if timestamps will be written as UTC
*
* @return `true` if timestamps will be written as UTC
*/
[[nodiscard]] auto is_enabled_utc_timestamps() const { return _write_timestamps_as_UTC; }

/**
* @brief Returns maximum row group size, in bytes.
*
Expand Down Expand Up @@ -1375,6 +1413,13 @@ class chunked_parquet_writer_options {
*/
void enable_int96_timestamps(bool req) { _write_timestamps_as_int96 = req; }

/**
* @brief Sets preference for writing timestamps as UTC. Write timestamps as UTC if set to `true`.
*
* @param val Boolean value to enable/disable writing of timestamps as UTC.
*/
void enable_utc_timestamps(bool val) { _write_timestamps_as_UTC = val; }

/**
* @brief Sets the maximum row group size, in bytes.
*
Expand Down Expand Up @@ -1539,6 +1584,18 @@ class chunked_parquet_writer_options_builder {
return *this;
}

/**
* @brief Set to true if timestamps are to be written as UTC.
*
* @param enabled Boolean value to enable/disable writing of timestamps as UTC.
* @return this for chaining
*/
chunked_parquet_writer_options_builder& utc_timestamps(bool enabled)
{
options._write_timestamps_as_UTC = enabled;
return *this;
}

/**
* @brief Set to true if V2 page headers are to be written.
*
Expand Down
12 changes: 8 additions & 4 deletions cpp/src/io/parquet/parquet.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,17 @@ struct TimeUnit {
};

struct TimeType {
bool isAdjustedToUTC = false;
TimeUnit unit;
// Default to true because the timestamps are implicitly in UTC
// Writer option overrides this default
bool isAdjustedToUTC = true;
TimeUnit unit = {TimeUnit::MILLIS};
};

struct TimestampType {
bool isAdjustedToUTC = false;
TimeUnit unit;
// Default to true because the timestamps are implicitly in UTC
// Writer option overrides this default
bool isAdjustedToUTC = true;
TimeUnit unit = {TimeUnit::MILLIS};
};

struct IntType {
Expand Down
46 changes: 32 additions & 14 deletions cpp/src/io/parquet/writer_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ struct leaf_schema_fn {
cudf::detail::LinkedColPtr const& col;
column_in_metadata const& col_meta;
bool timestamp_is_int96;
bool timestamp_is_utc;

template <typename T>
std::enable_if_t<std::is_same_v<T, bool>, void> operator()()
Expand Down Expand Up @@ -404,7 +405,7 @@ struct leaf_schema_fn {
col_schema.ts_scale = 1000;
if (not timestamp_is_int96) {
col_schema.converted_type = ConvertedType::TIMESTAMP_MILLIS;
col_schema.logical_type = LogicalType{TimestampType{false, TimeUnit::MILLIS}};
col_schema.logical_type = LogicalType{TimestampType{timestamp_is_utc, TimeUnit::MILLIS}};
}
}

Expand All @@ -415,7 +416,7 @@ struct leaf_schema_fn {
col_schema.stats_dtype = statistics_dtype::dtype_timestamp64;
if (not timestamp_is_int96) {
col_schema.converted_type = ConvertedType::TIMESTAMP_MILLIS;
col_schema.logical_type = LogicalType{TimestampType{false, TimeUnit::MILLIS}};
col_schema.logical_type = LogicalType{TimestampType{timestamp_is_utc, TimeUnit::MILLIS}};
}
}

Expand All @@ -426,7 +427,7 @@ struct leaf_schema_fn {
col_schema.stats_dtype = statistics_dtype::dtype_timestamp64;
if (not timestamp_is_int96) {
col_schema.converted_type = ConvertedType::TIMESTAMP_MICROS;
col_schema.logical_type = LogicalType{TimestampType{false, TimeUnit::MICROS}};
col_schema.logical_type = LogicalType{TimestampType{timestamp_is_utc, TimeUnit::MICROS}};
}
}

Expand All @@ -441,7 +442,7 @@ struct leaf_schema_fn {
}
// set logical type if it's not int96
else {
col_schema.logical_type = LogicalType{TimestampType{false, TimeUnit::NANOS}};
col_schema.logical_type = LogicalType{TimestampType{timestamp_is_utc, TimeUnit::NANOS}};
}
}

Expand All @@ -453,7 +454,7 @@ struct leaf_schema_fn {
col_schema.converted_type = ConvertedType::TIME_MILLIS;
col_schema.stats_dtype = statistics_dtype::dtype_int32;
col_schema.ts_scale = 24 * 60 * 60 * 1000;
col_schema.logical_type = LogicalType{TimeType{false, TimeUnit::MILLIS}};
col_schema.logical_type = LogicalType{TimeType{timestamp_is_utc, TimeUnit::MILLIS}};
}

template <typename T>
Expand All @@ -463,7 +464,7 @@ struct leaf_schema_fn {
col_schema.converted_type = ConvertedType::TIME_MILLIS;
col_schema.stats_dtype = statistics_dtype::dtype_int32;
col_schema.ts_scale = 1000;
col_schema.logical_type = LogicalType{TimeType{false, TimeUnit::MILLIS}};
col_schema.logical_type = LogicalType{TimeType{timestamp_is_utc, TimeUnit::MILLIS}};
}

template <typename T>
Expand All @@ -472,7 +473,7 @@ struct leaf_schema_fn {
col_schema.type = Type::INT32;
col_schema.converted_type = ConvertedType::TIME_MILLIS;
col_schema.stats_dtype = statistics_dtype::dtype_int32;
col_schema.logical_type = LogicalType{TimeType{false, TimeUnit::MILLIS}};
col_schema.logical_type = LogicalType{TimeType{timestamp_is_utc, TimeUnit::MILLIS}};
}

template <typename T>
Expand All @@ -481,7 +482,7 @@ struct leaf_schema_fn {
col_schema.type = Type::INT64;
col_schema.converted_type = ConvertedType::TIME_MICROS;
col_schema.stats_dtype = statistics_dtype::dtype_int64;
col_schema.logical_type = LogicalType{TimeType{false, TimeUnit::MICROS}};
col_schema.logical_type = LogicalType{TimeType{timestamp_is_utc, TimeUnit::MICROS}};
}

// unsupported outside cudf for parquet 1.0.
Expand All @@ -490,7 +491,7 @@ struct leaf_schema_fn {
{
col_schema.type = Type::INT64;
col_schema.stats_dtype = statistics_dtype::dtype_int64;
col_schema.logical_type = LogicalType{TimeType{false, TimeUnit::NANOS}};
col_schema.logical_type = LogicalType{TimeType{timestamp_is_utc, TimeUnit::NANOS}};
}

template <typename T>
Expand Down Expand Up @@ -567,7 +568,8 @@ std::vector<schema_tree_node> construct_schema_tree(
cudf::detail::LinkedColVector const& linked_columns,
table_input_metadata& metadata,
single_write_mode write_mode,
bool int96_timestamps)
bool int96_timestamps,
bool utc_timestamps)
{
std::vector<schema_tree_node> schema;
schema_tree_node root{};
Expand Down Expand Up @@ -739,8 +741,9 @@ std::vector<schema_tree_node> construct_schema_tree(

bool timestamp_is_int96 = int96_timestamps or col_meta.is_enabled_int96_timestamps();

cudf::type_dispatcher(col->type(),
leaf_schema_fn{col_schema, col, col_meta, timestamp_is_int96});
cudf::type_dispatcher(
col->type(),
leaf_schema_fn{col_schema, col, col_meta, timestamp_is_int96, utc_timestamps});

col_schema.repetition_type = col_nullable ? OPTIONAL : REQUIRED;
col_schema.name = (schema[parent_idx].name == "list") ? "element" : col_meta.get_name();
Expand Down Expand Up @@ -1467,6 +1470,7 @@ void fill_table_meta(std::unique_ptr<table_input_metadata> const& table_meta)
* @param max_dictionary_size Maximum dictionary size, in bytes
* @param single_write_mode Flag to indicate that we are guaranteeing a single table write
* @param int96_timestamps Flag to indicate if timestamps will be written as INT96
* @param utc_timestamps Flag to indicate if timestamps are UTC
* @param write_v2_headers True if V2 page headers are to be written
* @param out_sink Sink for checking if device write is supported, should not be used to write any
* data in this function
Expand All @@ -1491,12 +1495,14 @@ auto convert_table_to_parquet_data(table_input_metadata& table_meta,
size_t max_dictionary_size,
single_write_mode write_mode,
bool int96_timestamps,
bool utc_timestamps,
bool write_v2_headers,
host_span<std::unique_ptr<data_sink> const> out_sink,
rmm::cuda_stream_view stream)
{
auto vec = table_to_linked_columns(input);
auto schema_tree = construct_schema_tree(vec, table_meta, write_mode, int96_timestamps);
auto vec = table_to_linked_columns(input);
auto schema_tree =
construct_schema_tree(vec, table_meta, write_mode, int96_timestamps, utc_timestamps);
// Construct parquet_column_views from the schema tree leaf nodes.
std::vector<parquet_column_view> parquet_columns;

Expand Down Expand Up @@ -2026,6 +2032,7 @@ writer::impl::impl(std::vector<std::unique_ptr<data_sink>> sinks,
_max_dictionary_size(options.get_max_dictionary_size()),
_max_page_fragment_size(options.get_max_page_fragment_size()),
_int96_timestamps(options.is_enabled_int96_timestamps()),
_utc_timestamps(options.is_enabled_utc_timestamps()),
_write_v2_headers(options.is_enabled_write_v2_headers()),
_column_index_truncate_length(options.get_column_index_truncate_length()),
_kv_meta(options.get_key_value_metadata()),
Expand Down Expand Up @@ -2054,6 +2061,7 @@ writer::impl::impl(std::vector<std::unique_ptr<data_sink>> sinks,
_max_dictionary_size(options.get_max_dictionary_size()),
_max_page_fragment_size(options.get_max_page_fragment_size()),
_int96_timestamps(options.is_enabled_int96_timestamps()),
_utc_timestamps(options.is_enabled_utc_timestamps()),
_write_v2_headers(options.is_enabled_write_v2_headers()),
_column_index_truncate_length(options.get_column_index_truncate_length()),
_kv_meta(options.get_key_value_metadata()),
Expand Down Expand Up @@ -2131,6 +2139,7 @@ void writer::impl::write(table_view const& input, std::vector<partition_info> co
_max_dictionary_size,
_single_write_mode,
_int96_timestamps,
_utc_timestamps,
_write_v2_headers,
_out_sink,
_stream);
Expand Down Expand Up @@ -2394,6 +2403,15 @@ std::unique_ptr<std::vector<uint8_t>> writer::merge_row_group_metadata(
}
}

// Remove any LogicalType::UNKNOWN annotations that were passed in as they can confuse
// column type inferencing.
// See https://github.com/rapidsai/cudf/pull/14264#issuecomment-1778311615
for (auto& se : md.schema) {
if (se.logical_type.has_value() && se.logical_type.value().type == LogicalType::UNKNOWN) {
se.logical_type = thrust::nullopt;
}
}

// Thrift-encode the resulting output
file_header_s fhdr;
file_ender_s fendr;
Expand Down
1 change: 1 addition & 0 deletions cpp/src/io/parquet/writer_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ class writer::impl {
size_t const _max_dictionary_size;
std::optional<size_type> const _max_page_fragment_size;
bool const _int96_timestamps;
bool const _utc_timestamps;
bool const _write_v2_headers;
int32_t const _column_index_truncate_length;
std::vector<std::map<std::string, std::string>> const _kv_meta; // Optional user metadata.
Expand Down
21 changes: 21 additions & 0 deletions python/cudf/cudf/_lib/cpp/io/parquet.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,12 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil:
void set_column_chunks_file_paths(
vector[string] column_chunks_file_paths
) except +
void set_int96_timestamps(
bool enabled
) except +
void set_utc_timestamps(
bool enabled
) except +
void set_row_group_size_bytes(size_t val) except +
void set_row_group_size_rows(size_type val) except +
void set_max_page_size_bytes(size_t val) except +
Expand Down Expand Up @@ -129,6 +135,9 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil:
parquet_writer_options_builder& int96_timestamps(
bool enabled
) except +
parquet_writer_options_builder& utc_timestamps(
bool enabled
) except +
parquet_writer_options_builder& row_group_size_bytes(
size_t val
) except +
Expand Down Expand Up @@ -172,6 +181,12 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil:
void set_compression(
cudf_io_types.compression_type compression
) except +
void set_int96_timestamps(
bool enabled
) except +
void set_utc_timestamps(
bool enabled
) except +
void set_row_group_size_bytes(size_t val) except +
void set_row_group_size_rows(size_type val) except +
void set_max_page_size_bytes(size_t val) except +
Expand Down Expand Up @@ -199,6 +214,12 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil:
chunked_parquet_writer_options_builder& compression(
cudf_io_types.compression_type compression
) except +
chunked_parquet_writer_options_builder& int96_timestamps(
bool enabled
) except +
chunked_parquet_writer_options_builder& utc_timestamps(
bool enabled
) except +
chunked_parquet_writer_options_builder& row_group_size_bytes(
size_t val
) except +
Expand Down
1 change: 1 addition & 0 deletions python/cudf/cudf/_lib/parquet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,7 @@ def write_parquet(
.compression(comp_type)
.stats_level(stat_freq)
.int96_timestamps(_int96_timestamps)
.utc_timestamps(False)
.build()
)
if partitions_info is not None:
Expand Down
Loading

0 comments on commit f4c95aa

Please sign in to comment.