Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix logical type issues in the Parquet writer #14322

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions cpp/include/cudf/io/parquet.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,9 @@ class parquet_writer_options {
// Parquet writer can write INT96 or TIMESTAMP_MICROS. Defaults to TIMESTAMP_MICROS.
// If true then overrides any per-column setting in _metadata.
bool _write_timestamps_as_int96 = false;
// Parquet writer can write timestamps as UTC
// Defaults to true because libcudf timestamps are implicitly UTC
bool _write_timestamps_as_UTC = true;
// Column chunks file paths to be set in the raw output metadata. One per output file
std::vector<std::string> _column_chunks_file_paths;
// Maximum size of each row group (unless smaller than a single page)
Expand Down Expand Up @@ -652,6 +655,13 @@ class parquet_writer_options {
*/
bool is_enabled_int96_timestamps() const { return _write_timestamps_as_int96; }

/**
* @brief Returns `true` if timestamps will be written as UTC
*
* @return `true` if timestamps will be written as UTC
*/
[[nodiscard]] auto is_enabled_utc_timestamps() const { return _write_timestamps_as_UTC; }

/**
* @brief Returns Column chunks file paths to be set in the raw output metadata.
*
Expand Down Expand Up @@ -789,6 +799,13 @@ class parquet_writer_options {
*/
void enable_int96_timestamps(bool req) { _write_timestamps_as_int96 = req; }

/**
* @brief Sets preference for writing timestamps as UTC. Write timestamps as UTC if set to `true`.
*
* @param val Boolean value to enable/disable writing of timestamps as UTC.
*/
void enable_utc_timestamps(bool val) { _write_timestamps_as_UTC = val; }

/**
* @brief Sets column chunks file path to be set in the raw output metadata.
*
Expand Down Expand Up @@ -1100,6 +1117,18 @@ class parquet_writer_options_builder {
return *this;
}

/**
* @brief Set to true if timestamps are to be written as UTC.
*
* @param enabled Boolean value to enable/disable writing of timestamps as UTC.
* @return this for chaining
*/
parquet_writer_options_builder& utc_timestamps(bool enabled)
{
options._write_timestamps_as_UTC = enabled;
return *this;
}

/**
* @brief Set to true if V2 page headers are to be written.
*
Expand Down Expand Up @@ -1171,6 +1200,8 @@ class chunked_parquet_writer_options {
// Parquet writer can write INT96 or TIMESTAMP_MICROS. Defaults to TIMESTAMP_MICROS.
// If true then overrides any per-column setting in _metadata.
bool _write_timestamps_as_int96 = false;
// Parquet writer can write timestamps as UTC. Defaults to true.
bool _write_timestamps_as_UTC = true;
// Maximum size of each row group (unless smaller than a single page)
size_t _row_group_size_bytes = default_row_group_size_bytes;
// Maximum number of rows in row group (unless smaller than a single page)
Expand Down Expand Up @@ -1254,6 +1285,13 @@ class chunked_parquet_writer_options {
*/
bool is_enabled_int96_timestamps() const { return _write_timestamps_as_int96; }

/**
* @brief Returns `true` if timestamps will be written as UTC
*
* @return `true` if timestamps will be written as UTC
*/
[[nodiscard]] auto is_enabled_utc_timestamps() const { return _write_timestamps_as_UTC; }

/**
* @brief Returns maximum row group size, in bytes.
*
Expand Down Expand Up @@ -1375,6 +1413,13 @@ class chunked_parquet_writer_options {
*/
void enable_int96_timestamps(bool req) { _write_timestamps_as_int96 = req; }

/**
* @brief Sets preference for writing timestamps as UTC. Write timestamps as UTC if set to `true`.
*
* @param val Boolean value to enable/disable writing of timestamps as UTC.
*/
void enable_utc_timestamps(bool val) { _write_timestamps_as_UTC = val; }

/**
* @brief Sets the maximum row group size, in bytes.
*
Expand Down Expand Up @@ -1539,6 +1584,18 @@ class chunked_parquet_writer_options_builder {
return *this;
}

/**
* @brief Set to true if timestamps are to be written as UTC.
*
* @param enabled Boolean value to enable/disable writing of timestamps as UTC.
* @return this for chaining
*/
chunked_parquet_writer_options_builder& utc_timestamps(bool enabled)
{
options._write_timestamps_as_UTC = enabled;
return *this;
}

/**
* @brief Set to true if V2 page headers are to be written.
*
Expand Down
12 changes: 8 additions & 4 deletions cpp/src/io/parquet/parquet.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,17 @@ struct TimeUnit {
};

struct TimeType {
bool isAdjustedToUTC = false;
TimeUnit unit;
// Default to true because the timestamps are implicitly in UTC
// Writer option overrides this default
bool isAdjustedToUTC = true;
vuule marked this conversation as resolved.
Show resolved Hide resolved
TimeUnit unit = {TimeUnit::MILLIS};
};

struct TimestampType {
bool isAdjustedToUTC = false;
TimeUnit unit;
// Default to true because the timestamps are implicitly in UTC
// Writer option overrides this default
bool isAdjustedToUTC = true;
TimeUnit unit = {TimeUnit::MILLIS};
};

struct IntType {
Expand Down
46 changes: 32 additions & 14 deletions cpp/src/io/parquet/writer_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ struct leaf_schema_fn {
cudf::detail::LinkedColPtr const& col;
column_in_metadata const& col_meta;
bool timestamp_is_int96;
bool timestamp_is_utc;

template <typename T>
std::enable_if_t<std::is_same_v<T, bool>, void> operator()()
Expand Down Expand Up @@ -404,7 +405,7 @@ struct leaf_schema_fn {
col_schema.ts_scale = 1000;
if (not timestamp_is_int96) {
col_schema.converted_type = ConvertedType::TIMESTAMP_MILLIS;
col_schema.logical_type = LogicalType{TimestampType{false, TimeUnit::MILLIS}};
col_schema.logical_type = LogicalType{TimestampType{timestamp_is_utc, TimeUnit::MILLIS}};
}
}

Expand All @@ -415,7 +416,7 @@ struct leaf_schema_fn {
col_schema.stats_dtype = statistics_dtype::dtype_timestamp64;
if (not timestamp_is_int96) {
col_schema.converted_type = ConvertedType::TIMESTAMP_MILLIS;
col_schema.logical_type = LogicalType{TimestampType{false, TimeUnit::MILLIS}};
col_schema.logical_type = LogicalType{TimestampType{timestamp_is_utc, TimeUnit::MILLIS}};
}
}

Expand All @@ -426,7 +427,7 @@ struct leaf_schema_fn {
col_schema.stats_dtype = statistics_dtype::dtype_timestamp64;
if (not timestamp_is_int96) {
col_schema.converted_type = ConvertedType::TIMESTAMP_MICROS;
col_schema.logical_type = LogicalType{TimestampType{false, TimeUnit::MICROS}};
col_schema.logical_type = LogicalType{TimestampType{timestamp_is_utc, TimeUnit::MICROS}};
}
}

Expand All @@ -441,7 +442,7 @@ struct leaf_schema_fn {
}
// set logical type if it's not int96
else {
col_schema.logical_type = LogicalType{TimestampType{false, TimeUnit::NANOS}};
col_schema.logical_type = LogicalType{TimestampType{timestamp_is_utc, TimeUnit::NANOS}};
}
}

Expand All @@ -453,7 +454,7 @@ struct leaf_schema_fn {
col_schema.converted_type = ConvertedType::TIME_MILLIS;
col_schema.stats_dtype = statistics_dtype::dtype_int32;
col_schema.ts_scale = 24 * 60 * 60 * 1000;
col_schema.logical_type = LogicalType{TimeType{false, TimeUnit::MILLIS}};
col_schema.logical_type = LogicalType{TimeType{timestamp_is_utc, TimeUnit::MILLIS}};
}

template <typename T>
Expand All @@ -463,7 +464,7 @@ struct leaf_schema_fn {
col_schema.converted_type = ConvertedType::TIME_MILLIS;
col_schema.stats_dtype = statistics_dtype::dtype_int32;
col_schema.ts_scale = 1000;
col_schema.logical_type = LogicalType{TimeType{false, TimeUnit::MILLIS}};
col_schema.logical_type = LogicalType{TimeType{timestamp_is_utc, TimeUnit::MILLIS}};
}

template <typename T>
Expand All @@ -472,7 +473,7 @@ struct leaf_schema_fn {
col_schema.type = Type::INT32;
col_schema.converted_type = ConvertedType::TIME_MILLIS;
col_schema.stats_dtype = statistics_dtype::dtype_int32;
col_schema.logical_type = LogicalType{TimeType{false, TimeUnit::MILLIS}};
col_schema.logical_type = LogicalType{TimeType{timestamp_is_utc, TimeUnit::MILLIS}};
}

template <typename T>
Expand All @@ -481,7 +482,7 @@ struct leaf_schema_fn {
col_schema.type = Type::INT64;
col_schema.converted_type = ConvertedType::TIME_MICROS;
col_schema.stats_dtype = statistics_dtype::dtype_int64;
col_schema.logical_type = LogicalType{TimeType{false, TimeUnit::MICROS}};
col_schema.logical_type = LogicalType{TimeType{timestamp_is_utc, TimeUnit::MICROS}};
}

// unsupported outside cudf for parquet 1.0.
Expand All @@ -490,7 +491,7 @@ struct leaf_schema_fn {
{
col_schema.type = Type::INT64;
col_schema.stats_dtype = statistics_dtype::dtype_int64;
col_schema.logical_type = LogicalType{TimeType{false, TimeUnit::NANOS}};
col_schema.logical_type = LogicalType{TimeType{timestamp_is_utc, TimeUnit::NANOS}};
}

template <typename T>
Expand Down Expand Up @@ -567,7 +568,8 @@ std::vector<schema_tree_node> construct_schema_tree(
cudf::detail::LinkedColVector const& linked_columns,
table_input_metadata& metadata,
single_write_mode write_mode,
bool int96_timestamps)
bool int96_timestamps,
bool utc_timestamps)
{
std::vector<schema_tree_node> schema;
schema_tree_node root{};
Expand Down Expand Up @@ -739,8 +741,9 @@ std::vector<schema_tree_node> construct_schema_tree(

bool timestamp_is_int96 = int96_timestamps or col_meta.is_enabled_int96_timestamps();

cudf::type_dispatcher(col->type(),
leaf_schema_fn{col_schema, col, col_meta, timestamp_is_int96});
cudf::type_dispatcher(
col->type(),
leaf_schema_fn{col_schema, col, col_meta, timestamp_is_int96, utc_timestamps});

col_schema.repetition_type = col_nullable ? OPTIONAL : REQUIRED;
col_schema.name = (schema[parent_idx].name == "list") ? "element" : col_meta.get_name();
Expand Down Expand Up @@ -1467,6 +1470,7 @@ void fill_table_meta(std::unique_ptr<table_input_metadata> const& table_meta)
* @param max_dictionary_size Maximum dictionary size, in bytes
* @param single_write_mode Flag to indicate that we are guaranteeing a single table write
* @param int96_timestamps Flag to indicate if timestamps will be written as INT96
* @param utc_timestamps Flag to indicate if timestamps are UTC
* @param write_v2_headers True if V2 page headers are to be written
* @param out_sink Sink for checking if device write is supported, should not be used to write any
* data in this function
Expand All @@ -1491,12 +1495,14 @@ auto convert_table_to_parquet_data(table_input_metadata& table_meta,
size_t max_dictionary_size,
single_write_mode write_mode,
bool int96_timestamps,
bool utc_timestamps,
bool write_v2_headers,
host_span<std::unique_ptr<data_sink> const> out_sink,
rmm::cuda_stream_view stream)
{
auto vec = table_to_linked_columns(input);
auto schema_tree = construct_schema_tree(vec, table_meta, write_mode, int96_timestamps);
auto vec = table_to_linked_columns(input);
auto schema_tree =
construct_schema_tree(vec, table_meta, write_mode, int96_timestamps, utc_timestamps);
// Construct parquet_column_views from the schema tree leaf nodes.
std::vector<parquet_column_view> parquet_columns;

Expand Down Expand Up @@ -2026,6 +2032,7 @@ writer::impl::impl(std::vector<std::unique_ptr<data_sink>> sinks,
_max_dictionary_size(options.get_max_dictionary_size()),
_max_page_fragment_size(options.get_max_page_fragment_size()),
_int96_timestamps(options.is_enabled_int96_timestamps()),
_utc_timestamps(options.is_enabled_utc_timestamps()),
_write_v2_headers(options.is_enabled_write_v2_headers()),
_column_index_truncate_length(options.get_column_index_truncate_length()),
_kv_meta(options.get_key_value_metadata()),
Expand Down Expand Up @@ -2054,6 +2061,7 @@ writer::impl::impl(std::vector<std::unique_ptr<data_sink>> sinks,
_max_dictionary_size(options.get_max_dictionary_size()),
_max_page_fragment_size(options.get_max_page_fragment_size()),
_int96_timestamps(options.is_enabled_int96_timestamps()),
_utc_timestamps(options.is_enabled_utc_timestamps()),
_write_v2_headers(options.is_enabled_write_v2_headers()),
_column_index_truncate_length(options.get_column_index_truncate_length()),
_kv_meta(options.get_key_value_metadata()),
Expand Down Expand Up @@ -2131,6 +2139,7 @@ void writer::impl::write(table_view const& input, std::vector<partition_info> co
_max_dictionary_size,
_single_write_mode,
_int96_timestamps,
_utc_timestamps,
_write_v2_headers,
_out_sink,
_stream);
Expand Down Expand Up @@ -2394,6 +2403,15 @@ std::unique_ptr<std::vector<uint8_t>> writer::merge_row_group_metadata(
}
}

// Remove any LogicalType::UNKNOWN annotations that were passed in as they can confuse
// column type inferencing.
// See https://github.com/rapidsai/cudf/pull/14264#issuecomment-1778311615
for (auto& se : md.schema) {
if (se.logical_type.has_value() && se.logical_type.value().type == LogicalType::UNKNOWN) {
se.logical_type = thrust::nullopt;
}
}

// Thrift-encode the resulting output
file_header_s fhdr;
file_ender_s fendr;
Expand Down
1 change: 1 addition & 0 deletions cpp/src/io/parquet/writer_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ class writer::impl {
size_t const _max_dictionary_size;
std::optional<size_type> const _max_page_fragment_size;
bool const _int96_timestamps;
bool const _utc_timestamps;
bool const _write_v2_headers;
int32_t const _column_index_truncate_length;
std::vector<std::map<std::string, std::string>> const _kv_meta; // Optional user metadata.
Expand Down
21 changes: 21 additions & 0 deletions python/cudf/cudf/_lib/cpp/io/parquet.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,12 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil:
void set_column_chunks_file_paths(
vector[string] column_chunks_file_paths
) except +
void set_int96_timestamps(
bool enabled
) except +
void set_utc_timestamps(
bool enabled
) except +
void set_row_group_size_bytes(size_t val) except +
void set_row_group_size_rows(size_type val) except +
void set_max_page_size_bytes(size_t val) except +
Expand Down Expand Up @@ -129,6 +135,9 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil:
parquet_writer_options_builder& int96_timestamps(
bool enabled
) except +
parquet_writer_options_builder& utc_timestamps(
bool enabled
) except +
parquet_writer_options_builder& row_group_size_bytes(
size_t val
) except +
Expand Down Expand Up @@ -172,6 +181,12 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil:
void set_compression(
cudf_io_types.compression_type compression
) except +
void set_int96_timestamps(
bool enabled
) except +
void set_utc_timestamps(
bool enabled
) except +
void set_row_group_size_bytes(size_t val) except +
void set_row_group_size_rows(size_type val) except +
void set_max_page_size_bytes(size_t val) except +
Expand Down Expand Up @@ -199,6 +214,12 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil:
chunked_parquet_writer_options_builder& compression(
cudf_io_types.compression_type compression
) except +
chunked_parquet_writer_options_builder& int96_timestamps(
bool enabled
) except +
chunked_parquet_writer_options_builder& utc_timestamps(
bool enabled
) except +
chunked_parquet_writer_options_builder& row_group_size_bytes(
size_t val
) except +
Expand Down
1 change: 1 addition & 0 deletions python/cudf/cudf/_lib/parquet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,7 @@ def write_parquet(
.compression(comp_type)
.stats_level(stat_freq)
.int96_timestamps(_int96_timestamps)
.utc_timestamps(False)
.build()
)
if partitions_info is not None:
Expand Down
Loading