Skip to content

Commit

Permalink
Use cuFile for Parquet IO when available (#7444)
Browse files Browse the repository at this point in the history
Adds optional cuFile integration:
- `cufile.h` is included in the build when available.
- `libcufile.so` is loaded at runtime if `LIBCUDF_CUFILE_POLICY` environment variable is set to "ALWAYS" or "GDS".
- cuFile compatibility mode is set through the same policy variable - "ALWAYS" means on, "GDS" means off.
- cuFile is currently only used on Parquet R/W and in CSV writer.
- device_read/write API can be used with file datasource/data_sink.
- Added CUDA stream to `device_read`.

Authors:
  - Vukasin Milovanovic (@vuule)

Approvers:
  - Keith Kraus (@kkraus14)
  - Karthikeyan (@karthikeyann)
  - Devavret Makkar (@devavret)
  - Robert Maynard (@robertmaynard)

URL: #7444
  • Loading branch information
vuule authored Mar 17, 2021
1 parent 0146f74 commit 39ad863
Show file tree
Hide file tree
Showing 14 changed files with 757 additions and 124 deletions.
8 changes: 8 additions & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,8 @@ include(cmake/thirdparty/CUDF_GetLibcudacxx.cmake)
include(cmake/thirdparty/CUDF_GetGTest.cmake)
# Stringify libcudf and libcudacxx headers used in JIT operations
include(cmake/Modules/StringifyJITHeaders.cmake)
# find cuFile
include(cmake/Modules/FindcuFile.cmake)

###################################################################################################
# - library targets -------------------------------------------------------------------------------
Expand Down Expand Up @@ -244,6 +246,7 @@ add_library(cudf
src/io/statistics/column_stats.cu
src/io/utilities/data_sink.cpp
src/io/utilities/datasource.cpp
src/io/utilities/file_io_utilities.cpp
src/io/utilities/parsing_utils.cu
src/io/utilities/type_conversion.cpp
src/jit/cache.cpp
Expand Down Expand Up @@ -469,6 +472,11 @@ else()
target_link_libraries(cudf PUBLIC CUDA::nvrtc CUDA::cudart CUDA::cuda_driver)
endif()

# Add cuFile interface if available
if(TARGET cuFile::cuFile_interface)
target_link_libraries(cudf PRIVATE cuFile::cuFile_interface)
endif()

file(WRITE "${CUDF_BINARY_DIR}/fatbin.ld"
[=[
SECTIONS
Expand Down
2 changes: 1 addition & 1 deletion cpp/benchmarks/fixture/benchmark_fixture.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,4 +88,4 @@ class benchmark : public ::benchmark::Fixture {
std::shared_ptr<rmm::mr::device_memory_resource> mr;
};

}; // namespace cudf
} // namespace cudf
6 changes: 6 additions & 0 deletions cpp/cmake/Modules/FindcuFile.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,12 @@ find_package_handle_standard_args(cuFile
cuFile_VERSION
)

if (cuFile_INCLUDE_DIR AND NOT TARGET cuFile::cuFile_interface)
add_library(cuFile::cuFile_interface IMPORTED INTERFACE)
target_include_directories(cuFile::cuFile_interface INTERFACE "$<BUILD_INTERFACE:${cuFile_INCLUDE_DIR}>")
target_compile_options(cuFile::cuFile_interface INTERFACE "${cuFile_COMPILE_OPTIONS}")
target_compile_definitions(cuFile::cuFile_interface INTERFACE CUFILE_FOUND)
endif ()

if (cuFile_FOUND AND NOT TARGET cuFile::cuFile)
add_library(cuFile::cuFile UNKNOWN IMPORTED)
Expand Down
26 changes: 19 additions & 7 deletions cpp/include/cudf/io/data_sink.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020, NVIDIA CORPORATION.
* Copyright (c) 2020-2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -107,23 +107,35 @@ class data_sink {
*/
virtual bool supports_device_write() const { return false; }

/**
* @brief Estimates whether a direct device write would be more optimal for the given size.
*
* @param size Number of bytes to write
* @return whether the device write is expected to be more performant for the given size
*/
virtual bool is_device_write_preferred(size_t size) const { return supports_device_write(); }

/**
* @brief Append the buffer content to the sink from a gpu address
*
* @param[in] data Pointer to the buffer to be written into the sink object
* @param[in] size Number of bytes to write
* For optimal performance, should only be called when `is_device_write_preferred` returns `true`.
* Data sink implementations that don't support direct device writes don't need to override
* this function.
*
* @return void
* @throws cudf::logic_error the object does not support direct device writes, i.e.
* `supports_device_write` returns `false`.
*
* @param gpu_data Pointer to the buffer to be written into the sink object
* @param size Number of bytes to write
* @param stream CUDA stream to use
*/
virtual void device_write(void const* gpu_data, size_t size, rmm::cuda_stream_view stream)
{
CUDF_FAIL("data_sink classes that support device_write must override this function.");
CUDF_FAIL("data_sink classes that support device_write must override it.");
}

/**
* @brief Flush the data written into the sink
*
* @return void
*/
virtual void flush() = 0;

Expand Down
94 changes: 81 additions & 13 deletions cpp/include/cudf/io/datasource.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020, NVIDIA CORPORATION.
* Copyright (c) 2020-2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,6 +19,8 @@
#include <cudf/io/types.hpp>
#include <cudf/utilities/error.hpp>

#include <rmm/cuda_stream_view.hpp>

#include <arrow/buffer.h>
#include <arrow/io/file.h>
#include <arrow/io/interfaces.h>
Expand Down Expand Up @@ -50,12 +52,15 @@ class datasource {
/**
* @brief Returns the address of the data in the buffer.
*/
virtual const uint8_t* data() const = 0;
virtual uint8_t const* data() const = 0;

/**
* @brief Base class destructor
*/
virtual ~buffer() {}

template <typename Container>
static std::unique_ptr<buffer> create(Container&& data_owner);
};

/**
Expand Down Expand Up @@ -147,37 +152,57 @@ class datasource {
*/
virtual bool supports_device_read() const { return false; }

/**
* @brief Estimates whether a direct device read would be more optimal for the given size.
*
* @param size Number of bytes to read
* @return whether the device read is expected to be more performant for the given size
*/
virtual bool is_device_read_preferred(size_t size) const { return supports_device_read(); }

/**
* @brief Returns a device buffer with a subset of data from the source.
*
* For optimal performance, should only be called when `is_device_read_preferred` returns `true`.
* Data source implementations that don't support direct device reads don't need to override this
* function.
*
* @param[in] offset Bytes from the start
* @param[in] size Bytes to read
* @throws cudf::logic_error the object does not support direct device reads, i.e.
* `supports_device_read` returns `false`.
*
* @param offset Number of bytes from the start
* @param size Number of bytes to read
* @param stream CUDA stream to use
*
* @return The data buffer in the device memory
*/
virtual std::unique_ptr<datasource::buffer> device_read(size_t offset, size_t size)
virtual std::unique_ptr<datasource::buffer> device_read(size_t offset,
size_t size,
rmm::cuda_stream_view stream)
{
CUDF_FAIL("datasource classes that support device_read must override this function.");
CUDF_FAIL("datasource classes that support device_read must override it.");
}

/**
* @brief Reads a selected range into a preallocated device buffer
*
* For optimal performance, should only be called when `is_device_read_preferred` returns `true`.
* Data source implementations that don't support direct device reads don't need to override this
* function.
*
* @param[in] offset Bytes from the start
* @param[in] size Bytes to read
* @param[in] dst Address of the existing device memory
* @throws cudf::logic_error when the object does not support direct device reads, i.e.
* `supports_device_read` returns `false`.
*
* @param offset Number of bytes from the start
* @param size Number of bytes to read
* @param dst Address of the existing device memory
* @param stream CUDA stream to use
*
* @return The number of bytes read (can be smaller than size)
*/
virtual size_t device_read(size_t offset, size_t size, uint8_t* dst)
virtual size_t device_read(size_t offset, size_t size, uint8_t* dst, rmm::cuda_stream_view stream)
{
CUDF_FAIL("datasource classes that support device_read must override this function.");
CUDF_FAIL("datasource classes that support device_read must override it.");
}

/**
Expand Down Expand Up @@ -205,14 +230,57 @@ class datasource {

size_t size() const override { return _size; }

const uint8_t* data() const override { return _data; }
uint8_t const* data() const override { return _data; }

private:
uint8_t* const _data;
size_t const _size;
};

/**
* @brief Derived implementation of `buffer` that owns the data.
*
* Can use different container types to hold the data buffer.
*
* @tparam Container Type of the container object that owns the data
*/
template <typename Container>
class owning_buffer : public buffer {
public:
/**
* @brief Moves the input container into the newly created object.
*/
owning_buffer(Container&& data_owner)
: _data(std::move(data_owner)), _data_ptr(_data.data()), _size(_data.size())
{
}

/**
* @brief Moves the input container into the newly created object, and exposes a subspan of the
* buffer.
*/
owning_buffer(Container&& data_owner, uint8_t const* data_ptr, size_t size)
: _data(std::move(data_owner)), _data_ptr(data_ptr), _size(size)
{
}

size_t size() const override { return _size; }

uint8_t const* data() const override { return static_cast<uint8_t const*>(_data_ptr); }

private:
Container _data;
void const* _data_ptr;
size_t _size;
};
};

template <typename Container>
std::unique_ptr<datasource::buffer> datasource::buffer::create(Container&& data_owner)
{
return std::make_unique<owning_buffer<Container>>(std::move(data_owner));
}

/**
* @brief Implementation class for reading from an Apache Arrow file. The file
* could be a memory-mapped file or other implementation supported by Arrow.
Expand All @@ -230,7 +298,7 @@ class arrow_io_source : public datasource {
{
}
size_t size() const override { return arrow_buffer->size(); }
const uint8_t* data() const override { return arrow_buffer->data(); }
uint8_t const* data() const override { return arrow_buffer->data(); }
};

public:
Expand Down
32 changes: 12 additions & 20 deletions cpp/src/io/csv/writer_impl.cu
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020, NVIDIA CORPORATION.
* Copyright (c) 2020-2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -416,36 +416,28 @@ void writer::impl::write_chunked(strings_column_view const& str_column_view,
auto total_num_bytes = strings_column.chars_size();
char const* ptr_all_bytes = strings_column.chars().data<char>();

if (out_sink_->supports_device_write()) {
// host algorithm call, but the underlying call
// is a device_write taking a device buffer;
//
if (out_sink_->is_device_write_preferred(total_num_bytes)) {
// Direct write from device memory
out_sink_->device_write(ptr_all_bytes, total_num_bytes, stream);
out_sink_->device_write(newline.data(),
newline.size(),
stream); // needs newline at the end, to separate from next chunk
} else {
// no device write possible;
//
// copy the bytes to host, too:
//
// copy the bytes to host to write them out
thrust::host_vector<char> h_bytes(total_num_bytes);
CUDA_TRY(cudaMemcpyAsync(h_bytes.data(),
ptr_all_bytes,
total_num_bytes * sizeof(char),
cudaMemcpyDeviceToHost,
stream.value()));

stream.synchronize();

// host algorithm call, where the underlying call
// is also host_write taking a host buffer;
//
char const* ptr_h_bytes = h_bytes.data();
out_sink_->host_write(ptr_h_bytes, total_num_bytes);
out_sink_->host_write(h_bytes.data(), total_num_bytes);
}

// Needs newline at the end, to separate from next chunk
if (out_sink_->is_device_write_preferred(newline.size())) {
out_sink_->device_write(newline.data(), newline.size(), stream);
} else {
out_sink_->host_write(options_.get_line_terminator().data(),
options_.get_line_terminator()
.size()); // needs newline at the end, to separate from next chunk
options_.get_line_terminator().size());
}
}

Expand Down
4 changes: 2 additions & 2 deletions cpp/src/io/parquet/parquet_gpu.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2018-2020, NVIDIA CORPORATION.
* Copyright (c) 2018-2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -184,7 +184,7 @@ struct ColumnChunkDesc {
{
}

uint8_t *compressed_data; // pointer to compressed column chunk data
uint8_t const *compressed_data; // pointer to compressed column chunk data
size_t compressed_size; // total compressed data size for this chunk
size_t num_values; // total number of values in this column
size_t start_row; // starting row of this chunk
Expand Down
23 changes: 13 additions & 10 deletions cpp/src/io/parquet/reader_impl.cu
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2020, NVIDIA CORPORATION.
* Copyright (c) 2019-2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -822,7 +822,7 @@ void generate_depth_remappings(std::map<int, std::pair<std::vector<int>, std::ve
* @copydoc cudf::io::detail::parquet::read_column_chunks
*/
void reader::impl::read_column_chunks(
std::vector<rmm::device_buffer> &page_data,
std::vector<std::unique_ptr<datasource::buffer>> &page_data,
hostdevice_vector<gpu::ColumnChunkDesc> &chunks, // TODO const?
size_t begin_chunk,
size_t end_chunk,
Expand Down Expand Up @@ -850,9 +850,15 @@ void reader::impl::read_column_chunks(
next_chunk++;
}
if (io_size != 0) {
auto buffer = _sources[chunk_source_map[chunk]]->host_read(io_offset, io_size);
page_data[chunk] = rmm::device_buffer(buffer->data(), buffer->size(), stream);
uint8_t *d_compdata = static_cast<uint8_t *>(page_data[chunk].data());
auto &source = _sources[chunk_source_map[chunk]];
if (source->is_device_read_preferred(io_size)) {
page_data[chunk] = source->device_read(io_offset, io_size, stream);
} else {
auto const buffer = source->host_read(io_offset, io_size);
page_data[chunk] =
datasource::buffer::create(rmm::device_buffer(buffer->data(), buffer->size(), stream));
}
auto d_compdata = page_data[chunk]->data();
do {
chunks[chunk].compressed_data = d_compdata;
d_compdata += chunks[chunk].compressed_size;
Expand Down Expand Up @@ -1414,7 +1420,7 @@ table_with_metadata reader::impl::read(size_type skip_rows,
std::vector<size_type> chunk_source_map(num_chunks);

// Tracker for eventually deallocating compressed and uncompressed data
std::vector<rmm::device_buffer> page_data(num_chunks);
std::vector<std::unique_ptr<datasource::buffer>> page_data(num_chunks);

// Keep track of column chunk file offsets
std::vector<size_t> column_chunk_offsets(num_chunks);
Expand Down Expand Up @@ -1516,10 +1522,7 @@ table_with_metadata reader::impl::read(size_type skip_rows,
decomp_page_data = decompress_page_data(chunks, pages, stream);
// Free compressed data
for (size_t c = 0; c < chunks.size(); c++) {
if (chunks[c].codec != parquet::Compression::UNCOMPRESSED && page_data[c].size() != 0) {
page_data[c].resize(0);
page_data[c].shrink_to_fit();
}
if (chunks[c].codec != parquet::Compression::UNCOMPRESSED) { page_data[c].reset(); }
}
}

Expand Down
Loading

0 comments on commit 39ad863

Please sign in to comment.