diff --git a/cpp/include/cudf/io/parquet.hpp b/cpp/include/cudf/io/parquet.hpp index 354bf839632..ea18da74d5a 100644 --- a/cpp/include/cudf/io/parquet.hpp +++ b/cpp/include/cudf/io/parquet.hpp @@ -532,6 +532,9 @@ class parquet_writer_options { // Parquet writer can write INT96 or TIMESTAMP_MICROS. Defaults to TIMESTAMP_MICROS. // If true then overrides any per-column setting in _metadata. bool _write_timestamps_as_int96 = false; + // Parquet writer can write timestamps as UTC + // Defaults to true because libcudf timestamps are implicitly UTC + bool _write_timestamps_as_UTC = true; // Column chunks file paths to be set in the raw output metadata. One per output file std::vector _column_chunks_file_paths; // Maximum size of each row group (unless smaller than a single page) @@ -652,6 +655,13 @@ class parquet_writer_options { */ bool is_enabled_int96_timestamps() const { return _write_timestamps_as_int96; } + /** + * @brief Returns `true` if timestamps will be written as UTC + * + * @return `true` if timestamps will be written as UTC + */ + [[nodiscard]] auto is_enabled_utc_timestamps() const { return _write_timestamps_as_UTC; } + /** * @brief Returns Column chunks file paths to be set in the raw output metadata. * @@ -789,6 +799,13 @@ class parquet_writer_options { */ void enable_int96_timestamps(bool req) { _write_timestamps_as_int96 = req; } + /** + * @brief Sets preference for writing timestamps as UTC. Write timestamps as UTC if set to `true`. + * + * @param val Boolean value to enable/disable writing of timestamps as UTC. + */ + void enable_utc_timestamps(bool val) { _write_timestamps_as_UTC = val; } + /** * @brief Sets column chunks file path to be set in the raw output metadata. * @@ -1100,6 +1117,18 @@ class parquet_writer_options_builder { return *this; } + /** + * @brief Set to true if timestamps are to be written as UTC. + * + * @param enabled Boolean value to enable/disable writing of timestamps as UTC. + * @return this for chaining + */ + parquet_writer_options_builder& utc_timestamps(bool enabled) + { + options._write_timestamps_as_UTC = enabled; + return *this; + } + /** * @brief Set to true if V2 page headers are to be written. * @@ -1171,6 +1200,8 @@ class chunked_parquet_writer_options { // Parquet writer can write INT96 or TIMESTAMP_MICROS. Defaults to TIMESTAMP_MICROS. // If true then overrides any per-column setting in _metadata. bool _write_timestamps_as_int96 = false; + // Parquet writer can write timestamps as UTC. Defaults to true. + bool _write_timestamps_as_UTC = true; // Maximum size of each row group (unless smaller than a single page) size_t _row_group_size_bytes = default_row_group_size_bytes; // Maximum number of rows in row group (unless smaller than a single page) @@ -1254,6 +1285,13 @@ class chunked_parquet_writer_options { */ bool is_enabled_int96_timestamps() const { return _write_timestamps_as_int96; } + /** + * @brief Returns `true` if timestamps will be written as UTC + * + * @return `true` if timestamps will be written as UTC + */ + [[nodiscard]] auto is_enabled_utc_timestamps() const { return _write_timestamps_as_UTC; } + /** * @brief Returns maximum row group size, in bytes. * @@ -1375,6 +1413,13 @@ class chunked_parquet_writer_options { */ void enable_int96_timestamps(bool req) { _write_timestamps_as_int96 = req; } + /** + * @brief Sets preference for writing timestamps as UTC. Write timestamps as UTC if set to `true`. + * + * @param val Boolean value to enable/disable writing of timestamps as UTC. + */ + void enable_utc_timestamps(bool val) { _write_timestamps_as_UTC = val; } + /** * @brief Sets the maximum row group size, in bytes. * @@ -1539,6 +1584,18 @@ class chunked_parquet_writer_options_builder { return *this; } + /** + * @brief Set to true if timestamps are to be written as UTC. + * + * @param enabled Boolean value to enable/disable writing of timestamps as UTC. + * @return this for chaining + */ + chunked_parquet_writer_options_builder& utc_timestamps(bool enabled) + { + options._write_timestamps_as_UTC = enabled; + return *this; + } + /** * @brief Set to true if V2 page headers are to be written. * diff --git a/cpp/src/io/parquet/parquet.hpp b/cpp/src/io/parquet/parquet.hpp index 699cad89703..9ab686b99d5 100644 --- a/cpp/src/io/parquet/parquet.hpp +++ b/cpp/src/io/parquet/parquet.hpp @@ -58,13 +58,17 @@ struct TimeUnit { }; struct TimeType { - bool isAdjustedToUTC = false; - TimeUnit unit; + // Default to true because the timestamps are implicitly in UTC + // Writer option overrides this default + bool isAdjustedToUTC = true; + TimeUnit unit = {TimeUnit::MILLIS}; }; struct TimestampType { - bool isAdjustedToUTC = false; - TimeUnit unit; + // Default to true because the timestamps are implicitly in UTC + // Writer option overrides this default + bool isAdjustedToUTC = true; + TimeUnit unit = {TimeUnit::MILLIS}; }; struct IntType { diff --git a/cpp/src/io/parquet/writer_impl.cu b/cpp/src/io/parquet/writer_impl.cu index c06acc1690b..c2b10e09b1a 100644 --- a/cpp/src/io/parquet/writer_impl.cu +++ b/cpp/src/io/parquet/writer_impl.cu @@ -278,6 +278,7 @@ struct leaf_schema_fn { cudf::detail::LinkedColPtr const& col; column_in_metadata const& col_meta; bool timestamp_is_int96; + bool timestamp_is_utc; template std::enable_if_t, void> operator()() @@ -404,7 +405,7 @@ struct leaf_schema_fn { col_schema.ts_scale = 1000; if (not timestamp_is_int96) { col_schema.converted_type = ConvertedType::TIMESTAMP_MILLIS; - col_schema.logical_type = LogicalType{TimestampType{false, TimeUnit::MILLIS}}; + col_schema.logical_type = LogicalType{TimestampType{timestamp_is_utc, TimeUnit::MILLIS}}; } } @@ -415,7 +416,7 @@ struct leaf_schema_fn { col_schema.stats_dtype = statistics_dtype::dtype_timestamp64; if (not timestamp_is_int96) { col_schema.converted_type = ConvertedType::TIMESTAMP_MILLIS; - col_schema.logical_type = LogicalType{TimestampType{false, TimeUnit::MILLIS}}; + col_schema.logical_type = LogicalType{TimestampType{timestamp_is_utc, TimeUnit::MILLIS}}; } } @@ -426,7 +427,7 @@ struct leaf_schema_fn { col_schema.stats_dtype = statistics_dtype::dtype_timestamp64; if (not timestamp_is_int96) { col_schema.converted_type = ConvertedType::TIMESTAMP_MICROS; - col_schema.logical_type = LogicalType{TimestampType{false, TimeUnit::MICROS}}; + col_schema.logical_type = LogicalType{TimestampType{timestamp_is_utc, TimeUnit::MICROS}}; } } @@ -441,7 +442,7 @@ struct leaf_schema_fn { } // set logical type if it's not int96 else { - col_schema.logical_type = LogicalType{TimestampType{false, TimeUnit::NANOS}}; + col_schema.logical_type = LogicalType{TimestampType{timestamp_is_utc, TimeUnit::NANOS}}; } } @@ -453,7 +454,7 @@ struct leaf_schema_fn { col_schema.converted_type = ConvertedType::TIME_MILLIS; col_schema.stats_dtype = statistics_dtype::dtype_int32; col_schema.ts_scale = 24 * 60 * 60 * 1000; - col_schema.logical_type = LogicalType{TimeType{false, TimeUnit::MILLIS}}; + col_schema.logical_type = LogicalType{TimeType{timestamp_is_utc, TimeUnit::MILLIS}}; } template @@ -463,7 +464,7 @@ struct leaf_schema_fn { col_schema.converted_type = ConvertedType::TIME_MILLIS; col_schema.stats_dtype = statistics_dtype::dtype_int32; col_schema.ts_scale = 1000; - col_schema.logical_type = LogicalType{TimeType{false, TimeUnit::MILLIS}}; + col_schema.logical_type = LogicalType{TimeType{timestamp_is_utc, TimeUnit::MILLIS}}; } template @@ -472,7 +473,7 @@ struct leaf_schema_fn { col_schema.type = Type::INT32; col_schema.converted_type = ConvertedType::TIME_MILLIS; col_schema.stats_dtype = statistics_dtype::dtype_int32; - col_schema.logical_type = LogicalType{TimeType{false, TimeUnit::MILLIS}}; + col_schema.logical_type = LogicalType{TimeType{timestamp_is_utc, TimeUnit::MILLIS}}; } template @@ -481,7 +482,7 @@ struct leaf_schema_fn { col_schema.type = Type::INT64; col_schema.converted_type = ConvertedType::TIME_MICROS; col_schema.stats_dtype = statistics_dtype::dtype_int64; - col_schema.logical_type = LogicalType{TimeType{false, TimeUnit::MICROS}}; + col_schema.logical_type = LogicalType{TimeType{timestamp_is_utc, TimeUnit::MICROS}}; } // unsupported outside cudf for parquet 1.0. @@ -490,7 +491,7 @@ struct leaf_schema_fn { { col_schema.type = Type::INT64; col_schema.stats_dtype = statistics_dtype::dtype_int64; - col_schema.logical_type = LogicalType{TimeType{false, TimeUnit::NANOS}}; + col_schema.logical_type = LogicalType{TimeType{timestamp_is_utc, TimeUnit::NANOS}}; } template @@ -567,7 +568,8 @@ std::vector construct_schema_tree( cudf::detail::LinkedColVector const& linked_columns, table_input_metadata& metadata, single_write_mode write_mode, - bool int96_timestamps) + bool int96_timestamps, + bool utc_timestamps) { std::vector schema; schema_tree_node root{}; @@ -739,8 +741,9 @@ std::vector construct_schema_tree( bool timestamp_is_int96 = int96_timestamps or col_meta.is_enabled_int96_timestamps(); - cudf::type_dispatcher(col->type(), - leaf_schema_fn{col_schema, col, col_meta, timestamp_is_int96}); + cudf::type_dispatcher( + col->type(), + leaf_schema_fn{col_schema, col, col_meta, timestamp_is_int96, utc_timestamps}); col_schema.repetition_type = col_nullable ? OPTIONAL : REQUIRED; col_schema.name = (schema[parent_idx].name == "list") ? "element" : col_meta.get_name(); @@ -1467,6 +1470,7 @@ void fill_table_meta(std::unique_ptr const& table_meta) * @param max_dictionary_size Maximum dictionary size, in bytes * @param single_write_mode Flag to indicate that we are guaranteeing a single table write * @param int96_timestamps Flag to indicate if timestamps will be written as INT96 + * @param utc_timestamps Flag to indicate if timestamps are UTC * @param write_v2_headers True if V2 page headers are to be written * @param out_sink Sink for checking if device write is supported, should not be used to write any * data in this function @@ -1491,12 +1495,14 @@ auto convert_table_to_parquet_data(table_input_metadata& table_meta, size_t max_dictionary_size, single_write_mode write_mode, bool int96_timestamps, + bool utc_timestamps, bool write_v2_headers, host_span const> out_sink, rmm::cuda_stream_view stream) { - auto vec = table_to_linked_columns(input); - auto schema_tree = construct_schema_tree(vec, table_meta, write_mode, int96_timestamps); + auto vec = table_to_linked_columns(input); + auto schema_tree = + construct_schema_tree(vec, table_meta, write_mode, int96_timestamps, utc_timestamps); // Construct parquet_column_views from the schema tree leaf nodes. std::vector parquet_columns; @@ -2026,6 +2032,7 @@ writer::impl::impl(std::vector> sinks, _max_dictionary_size(options.get_max_dictionary_size()), _max_page_fragment_size(options.get_max_page_fragment_size()), _int96_timestamps(options.is_enabled_int96_timestamps()), + _utc_timestamps(options.is_enabled_utc_timestamps()), _write_v2_headers(options.is_enabled_write_v2_headers()), _column_index_truncate_length(options.get_column_index_truncate_length()), _kv_meta(options.get_key_value_metadata()), @@ -2054,6 +2061,7 @@ writer::impl::impl(std::vector> sinks, _max_dictionary_size(options.get_max_dictionary_size()), _max_page_fragment_size(options.get_max_page_fragment_size()), _int96_timestamps(options.is_enabled_int96_timestamps()), + _utc_timestamps(options.is_enabled_utc_timestamps()), _write_v2_headers(options.is_enabled_write_v2_headers()), _column_index_truncate_length(options.get_column_index_truncate_length()), _kv_meta(options.get_key_value_metadata()), @@ -2131,6 +2139,7 @@ void writer::impl::write(table_view const& input, std::vector co _max_dictionary_size, _single_write_mode, _int96_timestamps, + _utc_timestamps, _write_v2_headers, _out_sink, _stream); @@ -2394,6 +2403,15 @@ std::unique_ptr> writer::merge_row_group_metadata( } } + // Remove any LogicalType::UNKNOWN annotations that were passed in as they can confuse + // column type inferencing. + // See https://github.com/rapidsai/cudf/pull/14264#issuecomment-1778311615 + for (auto& se : md.schema) { + if (se.logical_type.has_value() && se.logical_type.value().type == LogicalType::UNKNOWN) { + se.logical_type = thrust::nullopt; + } + } + // Thrift-encode the resulting output file_header_s fhdr; file_ender_s fendr; diff --git a/cpp/src/io/parquet/writer_impl.hpp b/cpp/src/io/parquet/writer_impl.hpp index 1d27a8400c8..3415205d179 100644 --- a/cpp/src/io/parquet/writer_impl.hpp +++ b/cpp/src/io/parquet/writer_impl.hpp @@ -157,6 +157,7 @@ class writer::impl { size_t const _max_dictionary_size; std::optional const _max_page_fragment_size; bool const _int96_timestamps; + bool const _utc_timestamps; bool const _write_v2_headers; int32_t const _column_index_truncate_length; std::vector> const _kv_meta; // Optional user metadata. diff --git a/python/cudf/cudf/_lib/cpp/io/parquet.pxd b/python/cudf/cudf/_lib/cpp/io/parquet.pxd index 2b92b9b58d3..cace29b5d45 100644 --- a/python/cudf/cudf/_lib/cpp/io/parquet.pxd +++ b/python/cudf/cudf/_lib/cpp/io/parquet.pxd @@ -90,6 +90,12 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil: void set_column_chunks_file_paths( vector[string] column_chunks_file_paths ) except + + void set_int96_timestamps( + bool enabled + ) except + + void set_utc_timestamps( + bool enabled + ) except + void set_row_group_size_bytes(size_t val) except + void set_row_group_size_rows(size_type val) except + void set_max_page_size_bytes(size_t val) except + @@ -129,6 +135,9 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil: parquet_writer_options_builder& int96_timestamps( bool enabled ) except + + parquet_writer_options_builder& utc_timestamps( + bool enabled + ) except + parquet_writer_options_builder& row_group_size_bytes( size_t val ) except + @@ -172,6 +181,12 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil: void set_compression( cudf_io_types.compression_type compression ) except + + void set_int96_timestamps( + bool enabled + ) except + + void set_utc_timestamps( + bool enabled + ) except + void set_row_group_size_bytes(size_t val) except + void set_row_group_size_rows(size_type val) except + void set_max_page_size_bytes(size_t val) except + @@ -199,6 +214,12 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil: chunked_parquet_writer_options_builder& compression( cudf_io_types.compression_type compression ) except + + chunked_parquet_writer_options_builder& int96_timestamps( + bool enabled + ) except + + chunked_parquet_writer_options_builder& utc_timestamps( + bool enabled + ) except + chunked_parquet_writer_options_builder& row_group_size_bytes( size_t val ) except + diff --git a/python/cudf/cudf/_lib/parquet.pyx b/python/cudf/cudf/_lib/parquet.pyx index 85fd25cf1a9..f75a6c2b20e 100644 --- a/python/cudf/cudf/_lib/parquet.pyx +++ b/python/cudf/cudf/_lib/parquet.pyx @@ -399,6 +399,7 @@ def write_parquet( .compression(comp_type) .stats_level(stat_freq) .int96_timestamps(_int96_timestamps) + .utc_timestamps(False) .build() ) if partitions_info is not None: diff --git a/python/dask_cudf/dask_cudf/io/tests/test_parquet.py b/python/dask_cudf/dask_cudf/io/tests/test_parquet.py index 234b8fc5212..7b4e20012f7 100644 --- a/python/dask_cudf/dask_cudf/io/tests/test_parquet.py +++ b/python/dask_cudf/dask_cudf/io/tests/test_parquet.py @@ -148,7 +148,6 @@ def test_roundtrip_from_pandas(tmpdir): def test_strings(tmpdir): - fn = str(tmpdir) dfp = pd.DataFrame( {"a": ["aa", "bbb", "cccc"], "b": ["hello", "dog", "man"]} @@ -161,7 +160,6 @@ def test_strings(tmpdir): def test_dask_timeseries_from_pandas(tmpdir): - fn = str(tmpdir.join("test.parquet")) ddf2 = dask.datasets.timeseries(freq="D") pdf = ddf2.compute() @@ -173,7 +171,6 @@ def test_dask_timeseries_from_pandas(tmpdir): @pytest.mark.parametrize("index", [False, None]) @pytest.mark.parametrize("divisions", [False, True]) def test_dask_timeseries_from_dask(tmpdir, index, divisions): - fn = str(tmpdir) ddf2 = dask.datasets.timeseries(freq="D") ddf2.to_parquet(fn, engine="pyarrow", write_index=index) @@ -188,7 +185,6 @@ def test_dask_timeseries_from_dask(tmpdir, index, divisions): @pytest.mark.parametrize("index", [False, None]) @pytest.mark.parametrize("divisions", [False, True]) def test_dask_timeseries_from_daskcudf(tmpdir, index, divisions): - fn = str(tmpdir) ddf2 = dask_cudf.from_cudf( cudf.datasets.timeseries(freq="D"), npartitions=4 @@ -205,7 +201,6 @@ def test_dask_timeseries_from_daskcudf(tmpdir, index, divisions): @pytest.mark.parametrize("index", [False, True]) def test_empty(tmpdir, index): - fn = str(tmpdir) dfp = pd.DataFrame({"a": [11.0, 12.0, 12.0], "b": [4, 5, 6]})[:0] if index: @@ -218,7 +213,6 @@ def test_empty(tmpdir, index): def test_filters(tmpdir): - tmp_path = str(tmpdir) df = pd.DataFrame({"x": range(10), "y": list("aabbccddee")}) ddf = dd.from_pandas(df, npartitions=5) @@ -251,7 +245,6 @@ def test_filters(tmpdir): @pytest.mark.parametrize("numeric", [True, False]) @pytest.mark.parametrize("null", [np.nan, None]) def test_isna_filters(tmpdir, null, numeric): - tmp_path = str(tmpdir) df = pd.DataFrame( { @@ -284,7 +277,6 @@ def test_isna_filters(tmpdir, null, numeric): def test_filters_at_row_group_level(tmpdir): - tmp_path = str(tmpdir) df = pd.DataFrame({"x": range(10), "y": list("aabbccddee")}) ddf = dd.from_pandas(df, npartitions=5) @@ -405,7 +397,6 @@ def test_split_row_groups(tmpdir, row_groups, index): @need_create_meta @pytest.mark.parametrize("partition_on", [None, "a"]) def test_create_metadata_file(tmpdir, partition_on): - tmpdir = str(tmpdir) # Write ddf without a _metadata file @@ -445,7 +436,6 @@ def test_create_metadata_file(tmpdir, partition_on): @need_create_meta def test_create_metadata_file_inconsistent_schema(tmpdir): - # NOTE: This test demonstrates that the CudfEngine # can be used to generate a global `_metadata` file # even if there are inconsistent schemas in the dataset. @@ -481,9 +471,7 @@ def test_create_metadata_file_inconsistent_schema(tmpdir): # call `compute` on `ddf1`, because the dtype of # the inconsistent column ("a") may be "object" # before computing, and "int" after - # TODO: Uncomment after cudf#14326 is closed - # (See: https://github.com/rapidsai/cudf/issues/14326) - # dd.assert_eq(ddf1.compute(), ddf2) + dd.assert_eq(ddf1.compute(), ddf2) dd.assert_eq(ddf1.compute(), ddf2.compute())