diff --git a/conda/recipes/libcudf/meta.yaml b/conda/recipes/libcudf/meta.yaml index 4e20c979f6c..eae915c47fe 100644 --- a/conda/recipes/libcudf/meta.yaml +++ b/conda/recipes/libcudf/meta.yaml @@ -137,6 +137,7 @@ test: - test -f $PREFIX/include/cudf/io/orc_metadata.hpp - test -f $PREFIX/include/cudf/io/orc.hpp - test -f $PREFIX/include/cudf/io/parquet.hpp + - test -f $PREFIX/include/cudf/io/text/byte_range_info.hpp - test -f $PREFIX/include/cudf/io/text/data_chunk_source_factories.hpp - test -f $PREFIX/include/cudf/io/text/data_chunk_source.hpp - test -f $PREFIX/include/cudf/io/text/detail/multistate.hpp diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 2ffd62f1b53..825ea37c6ac 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -311,6 +311,7 @@ add_library( src/io/parquet/writer_impl.cu src/io/statistics/orc_column_statistics.cu src/io/statistics/parquet_column_statistics.cu + src/io/text/byte_range_info.cpp src/io/text/multibyte_split.cu src/io/utilities/column_buffer.cpp src/io/utilities/config_utils.cpp diff --git a/cpp/include/cudf/io/text/byte_range_info.hpp b/cpp/include/cudf/io/text/byte_range_info.hpp new file mode 100644 index 00000000000..cb2d00f0d1f --- /dev/null +++ b/cpp/include/cudf/io/text/byte_range_info.hpp @@ -0,0 +1,74 @@ +/* + * Copyright (c) 2022, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +#include +#include + +namespace cudf { +namespace io { +namespace text { + +/** + * @brief stores offset and size used to indicate a byte range + */ +class byte_range_info { + private: + int64_t _offset; + int64_t _size; + + public: + constexpr byte_range_info() noexcept : _offset(0), _size(0) {} + constexpr byte_range_info(int64_t offset, int64_t size) : _offset(offset), _size(size) + { + CUDF_EXPECTS(offset >= 0, "offset must be non-negative"); + CUDF_EXPECTS(size >= 0, "size must be non-negative"); + } + + constexpr byte_range_info(byte_range_info const& other) noexcept = default; + constexpr byte_range_info& operator=(byte_range_info const& other) noexcept = default; + + [[nodiscard]] constexpr int64_t offset() { return _offset; } + [[nodiscard]] constexpr int64_t size() { return _size; } +}; + +/** + * @brief Create a collection of consecutive ranges between [0, total_bytes). + * + * Each range wil be the same size except if `total_bytes` is not evenly divisible by + * `range_count`, in which case the last range size will be the remainder. + * + * @param total_bytes total number of bytes in all ranges + * @param range_count total number of ranges in which to divide bytes + * @return Vector of range objects + */ +std::vector create_byte_range_infos_consecutive(int64_t total_bytes, + int64_t range_count); + +/** + * @brief Create a byte_range_info which represents as much of a file as possible. Specifically, + * `[0, numeric_limit::max())`. + * + * @return `[0, numeric_limit::max())` + */ +byte_range_info create_byte_range_info_max(); + +} // namespace text +} // namespace io +} // namespace cudf diff --git a/cpp/include/cudf/io/text/data_chunk_source.hpp b/cpp/include/cudf/io/text/data_chunk_source.hpp index 5e6dda5a514..3499b86ab42 100644 --- a/cpp/include/cudf/io/text/data_chunk_source.hpp +++ b/cpp/include/cudf/io/text/data_chunk_source.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021, NVIDIA CORPORATION. + * Copyright (c) 2021-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -36,6 +36,7 @@ namespace text { */ class device_data_chunk { public: + virtual ~device_data_chunk() = default; [[nodiscard]] virtual char const* data() const = 0; [[nodiscard]] virtual std::size_t size() const = 0; virtual operator device_span() const = 0; @@ -52,6 +53,9 @@ class device_data_chunk { */ class data_chunk_reader { public: + virtual ~data_chunk_reader() = default; + virtual void skip_bytes(std::size_t size) = 0; + /** * @brief Get the next chunk of bytes from the data source * @@ -76,6 +80,7 @@ class data_chunk_reader { */ class data_chunk_source { public: + virtual ~data_chunk_source() = default; [[nodiscard]] virtual std::unique_ptr create_reader() const = 0; }; diff --git a/cpp/include/cudf/io/text/data_chunk_source_factories.hpp b/cpp/include/cudf/io/text/data_chunk_source_factories.hpp index aeb4b7fff53..ffe159b59dc 100644 --- a/cpp/include/cudf/io/text/data_chunk_source_factories.hpp +++ b/cpp/include/cudf/io/text/data_chunk_source_factories.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021, NVIDIA CORPORATION. + * Copyright (c) 2021-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -89,6 +89,8 @@ class istream_data_chunk_reader : public data_chunk_reader { } } + void skip_bytes(std::size_t size) override { _datastream->ignore(size); }; + std::unique_ptr get_next_chunk(std::size_t read_size, rmm::cuda_stream_view stream) override { @@ -143,6 +145,12 @@ class device_span_data_chunk_reader : public data_chunk_reader { public: device_span_data_chunk_reader(device_span data) : _data(data) {} + void skip_bytes(std::size_t read_size) override + { + if (read_size > _data.size() - _position) { read_size = _data.size() - _position; } + _position += read_size; + }; + std::unique_ptr get_next_chunk(std::size_t read_size, rmm::cuda_stream_view stream) override { diff --git a/cpp/include/cudf/io/text/detail/trie.hpp b/cpp/include/cudf/io/text/detail/trie.hpp index 06d15276a68..a908a9fa227 100644 --- a/cpp/include/cudf/io/text/detail/trie.hpp +++ b/cpp/include/cudf/io/text/detail/trie.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021, NVIDIA CORPORATION. + * Copyright (c) 2021-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -89,20 +89,6 @@ struct trie_device_view { */ constexpr uint8_t get_match_length(uint16_t idx) { return _nodes[idx].match_length; } - /** - * @brief returns the longest matching state of any state in the multistate. - */ - template - constexpr uint8_t get_match_length(multistate const& states) - { - int8_t val = 0; - for (uint8_t i = 0; i < states.size(); i++) { - auto match_length = get_match_length(states.get_tail(i)); - if (match_length > val) { val = match_length; } - } - return val; - } - private: constexpr void transition_enqueue_all( // char c, diff --git a/cpp/include/cudf/io/text/multibyte_split.hpp b/cpp/include/cudf/io/text/multibyte_split.hpp index d42ee9f510e..25f7ef98a81 100644 --- a/cpp/include/cudf/io/text/multibyte_split.hpp +++ b/cpp/include/cudf/io/text/multibyte_split.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021, NVIDIA CORPORATION. + * Copyright (c) 2021-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,6 +17,7 @@ #pragma once #include +#include #include #include @@ -27,10 +28,53 @@ namespace cudf { namespace io { namespace text { +/** + * @brief Splits the source text into a strings column using a multiple byte delimiter. + * + * Providing a byte range allows multibyte_split to read a whole file, but only return the offsets + * of delimiters which begin within the range. If thinking in terms of "records", where each + * delimiter dictates the end of a record, all records which begin within the byte range provided + * will be returned, including any record which may begin in the range but end outside of the + * range. Records which begin outside of the range will ignored, even if those records end inside + * the range. + * + * @code{.pseudo} + * Examples: + * source: "abc..def..ghi..jkl.." + * delimiter: ".." + * + * byte_range: nullopt + * return: ["abc..", "def..", "ghi..", jkl..", ""] + * + * byte_range: [0, 2) + * return: ["abc.."] + * + * byte_range: [2, 9) + * return: ["def..", "ghi.."] + * + * byte_range: [11, 2) + * return: [] + * + * byte_range: [13, 7) + * return: ["jkl..", ""] + * @endcode + * + * @param source The source string + * @param delimiter UTF-8 encoded string for which to find offsets in the source + * @param byte_range range in which to consider offsets relevant + * @param mr Memory resource to use for the device memory allocation + * @return The strings found by splitting the source by the delimiter within the relevant byte + * range. + */ std::unique_ptr multibyte_split( data_chunk_source const& source, std::string const& delimiter, - rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()); + std::optional byte_range = std::nullopt, + rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()); + +std::unique_ptr multibyte_split(data_chunk_source const& source, + std::string const& delimiter, + rmm::mr::device_memory_resource* mr); } // namespace text } // namespace io diff --git a/cpp/src/io/text/byte_range_info.cpp b/cpp/src/io/text/byte_range_info.cpp new file mode 100644 index 00000000000..290e0451839 --- /dev/null +++ b/cpp/src/io/text/byte_range_info.cpp @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2022, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +#include + +namespace cudf { +namespace io { +namespace text { + +byte_range_info create_byte_range_info_max() { return {0, std::numeric_limits::max()}; } + +std::vector create_byte_range_infos_consecutive(int64_t total_bytes, + int64_t range_count) +{ + auto range_size = util::div_rounding_up_safe(total_bytes, range_count); + auto ranges = std::vector(); + + ranges.reserve(range_size); + + for (int64_t i = 0; i < range_count; i++) { + auto offset = i * range_size; + auto size = std::min(range_size, total_bytes - offset); + ranges.emplace_back(offset, size); + } + + return ranges; +} + +} // namespace text +} // namespace io +} // namespace cudf diff --git a/cpp/src/io/text/multibyte_split.cu b/cpp/src/io/text/multibyte_split.cu index d287b9f2419..99f3bde3bf6 100644 --- a/cpp/src/io/text/multibyte_split.cu +++ b/cpp/src/io/text/multibyte_split.cu @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021, NVIDIA CORPORATION. + * Copyright (c) 2021-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,16 +18,24 @@ #include #include #include +#include #include #include #include #include #include +#include #include #include +#include #include +#include +#include +#include +#include + #include #include @@ -96,7 +104,7 @@ __global__ void multibyte_split_init_kernel( cudf::size_type base_tile_idx, cudf::size_type num_tiles, cudf::io::text::detail::scan_tile_state_view tile_multistates, - cudf::io::text::detail::scan_tile_state_view tile_output_offsets, + cudf::io::text::detail::scan_tile_state_view tile_output_offsets, cudf::io::text::detail::scan_tile_status status = cudf::io::text::detail::scan_tile_status::invalid) { @@ -110,7 +118,7 @@ __global__ void multibyte_split_init_kernel( __global__ void multibyte_split_seed_kernel( cudf::io::text::detail::scan_tile_state_view tile_multistates, - cudf::io::text::detail::scan_tile_state_view tile_output_offsets, + cudf::io::text::detail::scan_tile_state_view tile_output_offsets, multistate tile_multistate_seed, uint32_t tile_output_offset) { @@ -124,17 +132,15 @@ __global__ void multibyte_split_seed_kernel( __global__ void multibyte_split_kernel( cudf::size_type base_tile_idx, cudf::io::text::detail::scan_tile_state_view tile_multistates, - cudf::io::text::detail::scan_tile_state_view tile_output_offsets, + cudf::io::text::detail::scan_tile_state_view tile_output_offsets, cudf::io::text::detail::trie_device_view trie, - int32_t chunk_input_offset, cudf::device_span chunk_input_chars, - cudf::device_span abs_output_delimiter_offsets, - cudf::device_span abs_output_chars) + cudf::device_span abs_output_delimiter_offsets) { using InputLoad = cub::BlockLoad; - using OffsetScan = cub::BlockScan; - using OffsetScanCallback = cudf::io::text::detail::scan_tile_state_callback; + using OffsetScan = cub::BlockScan; + using OffsetScanCallback = cudf::io::text::detail::scan_tile_state_callback; __shared__ union { typename InputLoad::TempStorage input_load; @@ -166,7 +172,7 @@ __global__ void multibyte_split_kernel( // STEP 3: Flag matches - uint32_t thread_offsets[ITEMS_PER_THREAD]; + int64_t thread_offsets[ITEMS_PER_THREAD]; for (int32_t i = 0; i < ITEMS_PER_THREAD; i++) { thread_offsets[i] = i < thread_input_size and trie.is_match(thread_states[i]); @@ -182,16 +188,11 @@ __global__ void multibyte_split_kernel( // Step 5: Assign outputs from each thread using match offsets. - if (abs_output_chars.size() > 0) { - for (int32_t i = 0; i < ITEMS_PER_THREAD and i < thread_input_size; i++) { - abs_output_chars[chunk_input_offset + thread_input_offset + i] = thread_chars[i]; - } - } - if (abs_output_delimiter_offsets.size() > 0) { for (int32_t i = 0; i < ITEMS_PER_THREAD and i < thread_input_size; i++) { if (trie.is_match(thread_states[i])) { - auto const match_end = base_tile_idx * ITEMS_PER_TILE + thread_input_offset + i + 1; + auto const match_end = + static_cast(base_tile_idx) * ITEMS_PER_TILE + thread_input_offset + i + 1; abs_output_delimiter_offsets[thread_offsets[i]] = match_end; } } @@ -236,17 +237,16 @@ std::vector get_streams(int32_t count, rmm::cuda_stream_p return streams; } -cudf::size_type multibyte_split_scan_full_source(cudf::io::text::data_chunk_source const& source, - cudf::io::text::detail::trie const& trie, - scan_tile_state& tile_multistates, - scan_tile_state& tile_offsets, - device_span output_buffer, - device_span output_char_buffer, - rmm::cuda_stream_view stream, - std::vector const& streams) +int64_t multibyte_split_scan_full_source(cudf::io::text::data_chunk_source const& source, + cudf::io::text::detail::trie const& trie, + scan_tile_state& tile_multistates, + scan_tile_state& tile_offsets, + device_span output_buffer, + rmm::cuda_stream_view stream, + std::vector const& streams) { CUDF_FUNC_RANGE(); - cudf::size_type chunk_offset = 0; + int64_t chunk_offset = 0; multibyte_split_init_kernel<<>>( // -TILES_PER_CHUNK, @@ -298,14 +298,14 @@ cudf::size_type multibyte_split_scan_full_source(cudf::io::text::data_chunk_sour tile_multistates, tile_offsets, trie.view(), - chunk_offset, *chunk, - output_buffer, - output_char_buffer); + output_buffer); cudaEventRecord(last_launch_event, chunk_stream); chunk_offset += chunk->size(); + + chunk.reset(); } cudaEventDestroy(last_launch_event); @@ -317,6 +317,7 @@ cudf::size_type multibyte_split_scan_full_source(cudf::io::text::data_chunk_sour std::unique_ptr multibyte_split(cudf::io::text::data_chunk_source const& source, std::string const& delimiter, + byte_range_info byte_range, rmm::cuda_stream_view stream, rmm::mr::device_memory_resource* mr, rmm::cuda_stream_pool& stream_pool) @@ -336,7 +337,7 @@ std::unique_ptr multibyte_split(cudf::io::text::data_chunk_source // best when at least 32 more than max possible concurrent tiles, due to rolling `invalid`s auto num_tile_states = std::max(32, TILES_PER_CHUNK * concurrency + 32); auto tile_multistates = scan_tile_state(num_tile_states, stream); - auto tile_offsets = scan_tile_state(num_tile_states, stream); + auto tile_offsets = scan_tile_state(num_tile_states, stream); auto streams = get_streams(concurrency, stream_pool); @@ -345,52 +346,104 @@ std::unique_ptr multibyte_split(cudf::io::text::data_chunk_source trie, tile_multistates, tile_offsets, - cudf::device_span(static_cast(nullptr), 0), - cudf::device_span(static_cast(nullptr), 0), + cudf::device_span(static_cast(nullptr), 0), stream, streams); // allocate results - auto num_tiles = cudf::util::div_rounding_up_safe(bytes_total, ITEMS_PER_TILE); - auto num_results = tile_offsets.get_inclusive_prefix(num_tiles - 1, stream); - auto string_offsets = rmm::device_uvector(num_results + 2, stream, mr); - auto string_chars = rmm::device_uvector(bytes_total, stream, mr); + auto num_tiles = + cudf::util::div_rounding_up_safe(bytes_total, static_cast(ITEMS_PER_TILE)); + auto num_results = tile_offsets.get_inclusive_prefix(num_tiles - 1, stream); + + auto string_offsets = rmm::device_uvector(num_results + 2, stream); // first and last element are set manually to zero and size of input, respectively. // kernel is only responsible for determining delimiter offsets - auto string_count = static_cast(string_offsets.size() - 1); string_offsets.set_element_to_zero_async(0, stream); - string_offsets.set_element_async(string_count, bytes_total, stream); + string_offsets.set_element_async(string_offsets.size() - 1, bytes_total, stream); + + // kernel needs to find first and last relevant offset., as well as count of relevant offsets. multibyte_split_scan_full_source( source, trie, tile_multistates, tile_offsets, - cudf::device_span(string_offsets).subspan(1, num_results), - string_chars, + cudf::device_span(string_offsets).subspan(1, num_results), stream, streams); + auto relevant_offsets_begin = thrust::lower_bound(rmm::exec_policy(stream), + string_offsets.begin(), + string_offsets.end() - 1, + byte_range.offset()); + + auto relevant_offsets_end = thrust::upper_bound(rmm::exec_policy(stream), + string_offsets.begin(), + string_offsets.end() - 1, + byte_range.offset() + byte_range.size()) + + 1; + + auto string_offsets_out_size = relevant_offsets_end - relevant_offsets_begin; + + auto string_offsets_out = rmm::device_uvector(string_offsets_out_size, stream, mr); + + auto relevant_offset_first = + string_offsets.element(relevant_offsets_begin - string_offsets.begin(), stream); + auto relevant_offset_last = + string_offsets.element(relevant_offsets_end - string_offsets.begin() - 1, stream); + + auto string_chars_size = relevant_offset_last - relevant_offset_first; + auto string_chars = rmm::device_uvector(string_chars_size, stream, mr); + + // copy relevant offsets and adjust them to be zero-based. + thrust::transform(rmm::exec_policy(stream), + relevant_offsets_begin, + relevant_offsets_end, + string_offsets_out.begin(), + [relevant_offset_first] __device__(int64_t offset) { + return static_cast(offset - relevant_offset_first); + }); + + auto reader = source.create_reader(); + reader->skip_bytes(relevant_offset_first); + + auto relevant_bytes = reader->get_next_chunk(string_chars_size, stream); + + thrust::copy(rmm::exec_policy(stream), + relevant_bytes->data(), // + relevant_bytes->data() + relevant_bytes->size(), + string_chars.begin()); + + auto string_count = string_offsets_out.size() - 1; + return cudf::make_strings_column( - string_count, std::move(string_offsets), std::move(string_chars)); + string_count, std::move(string_offsets_out), std::move(string_chars)); } } // namespace detail std::unique_ptr multibyte_split(cudf::io::text::data_chunk_source const& source, std::string const& delimiter, + std::optional byte_range, rmm::mr::device_memory_resource* mr) { auto stream = rmm::cuda_stream_default; auto stream_pool = rmm::cuda_stream_pool(2); - auto result = detail::multibyte_split(source, delimiter, stream, mr, stream_pool); - stream.synchronize(); + auto result = detail::multibyte_split( + source, delimiter, byte_range.value_or(create_byte_range_info_max()), stream, mr, stream_pool); return result; } +std::unique_ptr multibyte_split(cudf::io::text::data_chunk_source const& source, + std::string const& delimiter, + rmm::mr::device_memory_resource* mr) +{ + return multibyte_split(source, delimiter, std::nullopt, mr); +} + } // namespace text } // namespace io } // namespace cudf diff --git a/cpp/tests/io/text/multibyte_split_test.cpp b/cpp/tests/io/text/multibyte_split_test.cpp index 27a8be95e9b..cfd1a16f19a 100644 --- a/cpp/tests/io/text/multibyte_split_test.cpp +++ b/cpp/tests/io/text/multibyte_split_test.cpp @@ -21,6 +21,8 @@ #include #include +#include +#include #include #include #include @@ -142,4 +144,29 @@ TEST_F(MultibyteSplitTest, HandpickedInput) CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected, *out, debug_output_level::ALL_ERRORS); } +TEST_F(MultibyteSplitTest, LargeInputMultipleRange) +{ + auto host_input = std::string(); + auto host_expected = std::vector(); + + for (auto i = 0; i < 1000; i++) { + host_input += "...:|"; + } + + auto delimiter = std::string("...:|"); + auto source = cudf::io::text::make_source(host_input); + + auto byte_ranges = cudf::io::text::create_byte_range_infos_consecutive(host_input.size(), 3); + auto out0 = cudf::io::text::multibyte_split(*source, delimiter, byte_ranges[0]); + auto out1 = cudf::io::text::multibyte_split(*source, delimiter, byte_ranges[1]); + auto out2 = cudf::io::text::multibyte_split(*source, delimiter, byte_ranges[2]); + + auto out_views = std::vector({out0->view(), out1->view(), out2->view()}); + auto out = cudf::concatenate(out_views); + + auto expected = cudf::io::text::multibyte_split(*source, delimiter); + + CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected->view(), *out, debug_output_level::ALL_ERRORS); +} + CUDF_TEST_PROGRAM_MAIN() diff --git a/python/cudf/cudf/_lib/cpp/io/text.pxd b/python/cudf/cudf/_lib/cpp/io/text.pxd index 9ce0c68cb08..5b110d6234c 100644 --- a/python/cudf/cudf/_lib/cpp/io/text.pxd +++ b/python/cudf/cudf/_lib/cpp/io/text.pxd @@ -1,4 +1,4 @@ -# Copyright (c) 2020-2021, NVIDIA CORPORATION. +# Copyright (c) 2020-2022, NVIDIA CORPORATION. from libcpp.memory cimport unique_ptr from libcpp.string cimport string @@ -6,6 +6,13 @@ from libcpp.string cimport string from cudf._lib.cpp.column.column cimport column +cdef extern from "cudf/io/text/byte_range_info.hpp" \ + namespace "cudf::io::text" nogil: + + cdef cppclass byte_range_info: + byte_range_info() except + + byte_range_info(size_t offset, size_t size) except + + cdef extern from "cudf/io/text/data_chunk_source.hpp" \ namespace "cudf::io::text" nogil: @@ -25,3 +32,7 @@ cdef extern from "cudf/io/text/multibyte_split.hpp" \ unique_ptr[column] multibyte_split(data_chunk_source source, string delimiter) except + + + unique_ptr[column] multibyte_split(data_chunk_source source, + string delimiter, + byte_range_info byte_range) except + diff --git a/python/cudf/cudf/_lib/text.pyx b/python/cudf/cudf/_lib/text.pyx index 9f33f32bdaf..daea227cc39 100644 --- a/python/cudf/cudf/_lib/text.pyx +++ b/python/cudf/cudf/_lib/text.pyx @@ -1,4 +1,4 @@ -# Copyright (c) 2020-2021, NVIDIA CORPORATION. +# Copyright (c) 2020-2022, NVIDIA CORPORATION. import cudf @@ -10,6 +10,7 @@ from libcpp.utility cimport move from cudf._lib.column cimport Column from cudf._lib.cpp.column.column cimport column from cudf._lib.cpp.io.text cimport ( + byte_range_info, data_chunk_source, make_source, make_source_from_file, @@ -18,7 +19,8 @@ from cudf._lib.cpp.io.text cimport ( def read_text(object filepaths_or_buffers, - object delimiter=None): + object delimiter=None, + object byte_range=None): """ Cython function to call into libcudf API, see `multibyte_split`. @@ -31,9 +33,25 @@ def read_text(object filepaths_or_buffers, cdef unique_ptr[data_chunk_source] datasource cdef unique_ptr[column] c_col - - with nogil: - datasource = move(make_source_from_file(filename)) - c_col = move(multibyte_split(dereference(datasource), delim)) + cdef size_t c_byte_range_offset + cdef size_t c_byte_range_size + cdef byte_range_info c_byte_range + + if (byte_range is not None): + c_byte_range_offset = byte_range[0] + c_byte_range_size = byte_range[1] + with nogil: + datasource = move(make_source_from_file(filename)) + c_byte_range = byte_range_info( + c_byte_range_offset, + c_byte_range_size) + c_col = move(multibyte_split( + dereference(datasource), + delim, + c_byte_range)) + else: + with nogil: + datasource = move(make_source_from_file(filename)) + c_col = move(multibyte_split(dereference(datasource), delim)) return {None: Column.from_unique_ptr(move(c_col))} diff --git a/python/cudf/cudf/io/text.py b/python/cudf/cudf/io/text.py index 705645b8349..04809f8fd59 100644 --- a/python/cudf/cudf/io/text.py +++ b/python/cudf/cudf/io/text.py @@ -1,4 +1,4 @@ -# Copyright (c) 2018-2021, NVIDIA CORPORATION. +# Copyright (c) 2018-2022, NVIDIA CORPORATION. from io import BytesIO, StringIO @@ -12,7 +12,7 @@ @annotate("READ_TEXT", color="purple", domain="cudf_python") @ioutils.doc_read_text() def read_text( - filepath_or_buffer, delimiter=None, **kwargs, + filepath_or_buffer, delimiter=None, byte_range=None, **kwargs, ): """{docstring}""" @@ -24,5 +24,7 @@ def read_text( ) return cudf.Series._from_data( - libtext.read_text(filepath_or_buffer, delimiter=delimiter,) + libtext.read_text( + filepath_or_buffer, delimiter=delimiter, byte_range=byte_range + ) ) diff --git a/python/cudf/cudf/tests/test_text.py b/python/cudf/cudf/tests/test_text.py index 5ff66fc750f..fb6505f5f92 100644 --- a/python/cudf/cudf/tests/test_text.py +++ b/python/cudf/cudf/tests/test_text.py @@ -1,4 +1,4 @@ -# Copyright (c) 2019, NVIDIA CORPORATION. +# Copyright (c) 2019-2022, NVIDIA CORPORATION. import numpy as np import pytest @@ -778,3 +778,54 @@ def test_read_text(datadir): actual = cudf.read_text(chess_file, delimiter=delimiter) assert_eq(expected, actual) + + +def test_read_text_byte_range(datadir): + chess_file = str(datadir) + "/chess.pgn" + delimiter = "1." + + with open(chess_file, "r") as f: + data = f.read() + content = data.split(delimiter) + + # Since Python split removes the delimiter and read_text does + # not we need to add it back to the 'content' + expected = cudf.Series( + [ + c + delimiter if i < (len(content) - 1) else c + for i, c in enumerate(content) + ] + ) + + byte_range_size = (len(data) // 3) + (len(data) % 3 != 0) + + actual_0 = cudf.read_text( + chess_file, + delimiter=delimiter, + byte_range=[byte_range_size * 0, byte_range_size], + ) + actual_1 = cudf.read_text( + chess_file, + delimiter=delimiter, + byte_range=[byte_range_size * 1, byte_range_size], + ) + actual_2 = cudf.read_text( + chess_file, + delimiter=delimiter, + byte_range=[byte_range_size * 2, byte_range_size], + ) + + actual = cudf.concat([actual_0, actual_1, actual_2], ignore_index=True) + + assert_eq(expected, actual) + + +def test_read_text_byte_range_large(datadir): + content = str(("\n" if x % 5 == 0 else "x") for x in range(0, 300000000)) + delimiter = "1." + temp_file = str(datadir) + "/temp.txt" + + with open(temp_file, "w") as f: + f.write(content) + + cudf.read_text(temp_file, delimiter=delimiter)