Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[gpuCI] Forward-merge branch-21.08 to branch-21.10 [skip ci] #8842

Merged
merged 1 commit into from
Jul 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions cpp/include/cudf/io/datasource.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include <arrow/result.h>
#include <arrow/status.h>

#include <future>
#include <memory>

namespace cudf {
Expand Down Expand Up @@ -209,6 +210,34 @@ class datasource {
CUDF_FAIL("datasource classes that support device_read must override it.");
}

/**
* @brief Asynchronously reads a selected range into a preallocated device buffer
*
* Returns a future value that contains the number of bytes read. Calling `get()` method of the
* return value synchronizes this function.
*
* For optimal performance, should only be called when `is_device_read_preferred` returns `true`.
* Data source implementations that don't support direct device reads don't need to override this
* function.
*
* @throws cudf::logic_error when the object does not support direct device reads, i.e.
* `supports_device_read` returns `false`.
*
* @param offset Number of bytes from the start
* @param size Number of bytes to read
* @param dst Address of the existing device memory
* @param stream CUDA stream to use
*
* @return The number of bytes read as a future value (can be smaller than size)
*/
virtual std::future<size_t> device_read_async(size_t offset,
size_t size,
uint8_t* dst,
rmm::cuda_stream_view stream)
{
CUDF_FAIL("datasource classes that support device_read_async must override it.");
}

/**
* @brief Returns the size of the data in the source.
*
Expand Down
13 changes: 9 additions & 4 deletions cpp/src/io/orc/reader_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -1132,6 +1132,7 @@ table_with_metadata reader::impl::read(size_type skip_rows,
size_t num_rowgroups = 0;
int stripe_idx = 0;

std::vector<std::pair<std::future<size_t>, size_t>> read_tasks;
for (auto const& stripe_source_mapping : selected_stripes) {
// Iterate through the source files selected stripes
for (auto const& stripe : stripe_source_mapping.stripe_info) {
Expand Down Expand Up @@ -1170,10 +1171,11 @@ table_with_metadata reader::impl::read(size_type skip_rows,
}
if (_metadata->per_file_metadata[stripe_source_mapping.source_idx]
.source->is_device_read_preferred(len)) {
CUDF_EXPECTS(
_metadata->per_file_metadata[stripe_source_mapping.source_idx].source->device_read(
offset, len, d_dst, stream) == len,
"Unexpected discrepancy in bytes read.");
read_tasks.push_back(
std::make_pair(_metadata->per_file_metadata[stripe_source_mapping.source_idx]
.source->device_read_async(offset, len, d_dst, stream),
len));

} else {
const auto buffer =
_metadata->per_file_metadata[stripe_source_mapping.source_idx].source->host_read(
Expand Down Expand Up @@ -1246,6 +1248,9 @@ table_with_metadata reader::impl::read(size_type skip_rows,
stripe_idx++;
}
}
for (auto& task : read_tasks) {
CUDF_EXPECTS(task.first.get() == task.second, "Unexpected discrepancy in bytes read.");
}

// Process dataset chunk pages into output columns
if (stripe_data.size() != 0) {
Expand Down
33 changes: 24 additions & 9 deletions cpp/src/io/parquet/reader_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -823,7 +823,7 @@ void generate_depth_remappings(std::map<int, std::pair<std::vector<int>, std::ve
/**
* @copydoc cudf::io::detail::parquet::read_column_chunks
*/
void reader::impl::read_column_chunks(
std::future<void> reader::impl::read_column_chunks(
std::vector<std::unique_ptr<datasource::buffer>>& page_data,
hostdevice_vector<gpu::ColumnChunkDesc>& chunks, // TODO const?
size_t begin_chunk,
Expand All @@ -833,6 +833,7 @@ void reader::impl::read_column_chunks(
rmm::cuda_stream_view stream)
{
// Transfer chunk data, coalescing adjacent chunks
std::vector<std::future<size_t>> read_tasks;
for (size_t chunk = begin_chunk; chunk < end_chunk;) {
const size_t io_offset = column_chunk_offsets[chunk];
size_t io_size = chunks[chunk].compressed_size;
Expand All @@ -854,7 +855,11 @@ void reader::impl::read_column_chunks(
if (io_size != 0) {
auto& source = _sources[chunk_source_map[chunk]];
if (source->is_device_read_preferred(io_size)) {
page_data[chunk] = source->device_read(io_offset, io_size, stream);
auto buffer = rmm::device_buffer(io_size, stream);
auto fut_read_size = source->device_read_async(
io_offset, io_size, static_cast<uint8_t*>(buffer.data()), stream);
read_tasks.emplace_back(std::move(fut_read_size));
page_data[chunk] = datasource::buffer::create(std::move(buffer));
} else {
auto const buffer = source->host_read(io_offset, io_size);
page_data[chunk] =
Expand All @@ -869,6 +874,12 @@ void reader::impl::read_column_chunks(
chunk = next_chunk;
}
}
auto sync_fn = [](decltype(read_tasks) read_tasks) {
for (auto& task : read_tasks) {
task.wait();
}
};
return std::async(std::launch::deferred, sync_fn, std::move(read_tasks));
}

/**
Expand Down Expand Up @@ -1435,6 +1446,7 @@ table_with_metadata reader::impl::read(size_type skip_rows,
// Initialize column chunk information
size_t total_decompressed_size = 0;
auto remaining_rows = num_rows;
std::vector<std::future<void>> read_rowgroup_tasks;
for (const auto& rg : selected_row_groups) {
const auto& row_group = _metadata->get_row_group(rg.index, rg.source_index);
auto const row_group_start = rg.start_row;
Expand Down Expand Up @@ -1502,16 +1514,19 @@ table_with_metadata reader::impl::read(size_type skip_rows,
}
}
// Read compressed chunk data to device memory
read_column_chunks(page_data,
chunks,
io_chunk_idx,
chunks.size(),
column_chunk_offsets,
chunk_source_map,
stream);
read_rowgroup_tasks.push_back(read_column_chunks(page_data,
chunks,
io_chunk_idx,
chunks.size(),
column_chunk_offsets,
chunk_source_map,
stream));

remaining_rows -= row_group.num_rows;
}
for (auto& task : read_rowgroup_tasks) {
task.wait();
}
assert(remaining_rows <= 0);

// Process dataset chunk pages into output columns
Expand Down
14 changes: 7 additions & 7 deletions cpp/src/io/parquet/reader_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,13 @@ class reader::impl {
* @param stream CUDA stream used for device memory operations and kernel launches.
*
*/
void read_column_chunks(std::vector<std::unique_ptr<datasource::buffer>>& page_data,
hostdevice_vector<gpu::ColumnChunkDesc>& chunks,
size_t begin_chunk,
size_t end_chunk,
const std::vector<size_t>& column_chunk_offsets,
std::vector<size_type> const& chunk_source_map,
rmm::cuda_stream_view stream);
std::future<void> read_column_chunks(std::vector<std::unique_ptr<datasource::buffer>>& page_data,
hostdevice_vector<gpu::ColumnChunkDesc>& chunks,
size_t begin_chunk,
size_t end_chunk,
const std::vector<size_t>& column_chunk_offsets,
std::vector<size_type> const& chunk_source_map,
rmm::cuda_stream_view stream);

/**
* @brief Returns the number of total pages from the given column chunks
Expand Down
13 changes: 12 additions & 1 deletion cpp/src/io/utilities/datasource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class file_source : public datasource {

bool supports_device_read() const override { return _cufile_in != nullptr; }

bool is_device_read_preferred(size_t size) const
bool is_device_read_preferred(size_t size) const override
{
return _cufile_in != nullptr && _cufile_in->is_cufile_io_preferred(size);
}
Expand All @@ -67,6 +67,17 @@ class file_source : public datasource {
return _cufile_in->read(offset, read_size, dst, stream);
}

std::future<size_t> device_read_async(size_t offset,
size_t size,
uint8_t* dst,
rmm::cuda_stream_view stream) override
{
CUDF_EXPECTS(supports_device_read(), "Device reads are not supported for this file.");

auto const read_size = std::min(size, _file.size() - offset);
return _cufile_in->read_async(offset, read_size, dst, stream);
}

size_t size() const override { return _file.size(); }

protected:
Expand Down
56 changes: 48 additions & 8 deletions cpp/src/io/utilities/file_io_utilities.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@
* limitations under the License.
*/
#include "file_io_utilities.hpp"
#include <cudf/detail/utilities/integer_utils.hpp>

#include <rmm/device_buffer.hpp>

#include <dlfcn.h>

#include <fstream>
#include <numeric>

namespace cudf {
namespace io {
Expand Down Expand Up @@ -166,30 +168,68 @@ void cufile_registered_file::register_handle()
cufile_registered_file::~cufile_registered_file() { shim->handle_deregister(cf_handle); }

cufile_input_impl::cufile_input_impl(std::string const& filepath)
: shim{cufile_shim::instance()}, cf_file(shim, filepath, O_RDONLY | O_DIRECT)
: shim{cufile_shim::instance()},
cf_file(shim, filepath, O_RDONLY | O_DIRECT),
pool(16) // The benefit from multithreaded read plateaus around 16 threads
{
pool.sleep_duration = 10;
}

std::unique_ptr<datasource::buffer> cufile_input_impl::read(size_t offset,
size_t size,
rmm::cuda_stream_view stream)
{
rmm::device_buffer out_data(size, stream);
CUDF_EXPECTS(shim->read(cf_file.handle(), out_data.data(), size, offset, 0) != -1,
"cuFile error reading from a file");

auto read_size = read(offset, size, reinterpret_cast<uint8_t*>(out_data.data()), stream);
out_data.resize(read_size, stream);
return datasource::buffer::create(std::move(out_data));
}

std::future<size_t> cufile_input_impl::read_async(size_t offset,
size_t size,
uint8_t* dst,
rmm::cuda_stream_view stream)
{
int device;
cudaGetDevice(&device);

auto read_slice = [=](void* dst, size_t size, size_t offset) -> ssize_t {
cudaSetDevice(device);
auto read_size = shim->read(cf_file.handle(), dst, size, offset, 0);
CUDF_EXPECTS(read_size != -1, "cuFile error reading from a file");
return read_size;
};

std::vector<std::future<ssize_t>> slice_tasks;
constexpr size_t max_slice_bytes = 4 * 1024 * 1024;
size_t n_slices = util::div_rounding_up_safe(size, max_slice_bytes);
size_t slice_size = max_slice_bytes;
size_t slice_offset = 0;
for (size_t t = 0; t < n_slices; ++t) {
void* dst_slice = dst + slice_offset;

if (t == n_slices - 1) { slice_size = size % max_slice_bytes; }
slice_tasks.push_back(pool.submit(read_slice, dst_slice, slice_size, offset + slice_offset));

slice_offset += slice_size;
}
auto waiter = [](decltype(slice_tasks) slice_tasks) -> size_t {
return std::accumulate(slice_tasks.begin(), slice_tasks.end(), 0, [](auto sum, auto& task) {
return sum + task.get();
});
};
// The future returned from this function is deferred, not async becasue we want to avoid creating
// threads for each read_async call. This overhead is significant in case of multiple small reads.
return std::async(std::launch::deferred, waiter, std::move(slice_tasks));
}

size_t cufile_input_impl::read(size_t offset,
size_t size,
uint8_t* dst,
rmm::cuda_stream_view stream)
{
CUDF_EXPECTS(shim->read(cf_file.handle(), dst, size, offset, 0) != -1,
"cuFile error reading from a file");
// always read the requested size for now
return size;
auto result = read_async(offset, size, dst, stream);
return result.get();
}

cufile_output_impl::cufile_output_impl(std::string const& filepath)
Expand Down
25 changes: 25 additions & 0 deletions cpp/src/io/utilities/file_io_utilities.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
#pragma once

#ifdef CUFILE_FOUND
#include "thread_pool.hpp"

#include <cufile.h>
#include <cudf_test/file_utilities.hpp>
#endif
Expand Down Expand Up @@ -106,6 +108,23 @@ class cufile_input : public cufile_io_base {
* @return The number of bytes read
*/
virtual size_t read(size_t offset, size_t size, uint8_t* dst, rmm::cuda_stream_view stream) = 0;

/**
* @brief Asynchronously reads into existing device memory.
*
* @throws cudf::logic_error on cuFile error
*
* @param offset Number of bytes from the start
* @param size Number of bytes to read
* @param dst Address of the existing device memory
* @param stream CUDA stream to use
*
* @return The number of bytes read as an std::future
*/
virtual std::future<size_t> read_async(size_t offset,
size_t size,
uint8_t* dst,
rmm::cuda_stream_view stream) = 0;
};

/**
Expand Down Expand Up @@ -202,9 +221,15 @@ class cufile_input_impl final : public cufile_input {

size_t read(size_t offset, size_t size, uint8_t* dst, rmm::cuda_stream_view stream) override;

std::future<size_t> read_async(size_t offset,
size_t size,
uint8_t* dst,
rmm::cuda_stream_view stream) override;

private:
cufile_shim const* shim = nullptr;
cufile_registered_file const cf_file;
cudf::detail::thread_pool pool;
};

/**
Expand Down
Loading