Skip to content

Commit

Permalink
Add column_device_view to orc writer (#7676)
Browse files Browse the repository at this point in the history
This PR adds column_device_view members to EncChunk, DictionaryChunk and StripeDictionary structures which are used in the ORC writer. The idea is to replace members in these structures which replicate the same information. Usage of nvstrdesc_s has also been eliminated in the ORC writer.

Fixes #7347, Addresses #5682, Addresses #7334

Authors:
  - Kumar Aatish (@kaatish)

Approvers:
  - Vukasin Milovanovic (@vuule)
  - Devavret Makkar (@devavret)

URL: #7676
  • Loading branch information
kaatish authored Mar 25, 2021
1 parent b854598 commit f1f1d0f
Show file tree
Hide file tree
Showing 10 changed files with 283 additions and 273 deletions.
151 changes: 80 additions & 71 deletions cpp/src/io/orc/dict_enc.cu
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "orc_common.h"
#include "orc_gpu.h"

#include <cudf/table/table_device_view.cuh>
#include <io/utilities/block_utils.cuh>

#include <rmm/cuda_stream_view.hpp>
Expand Down Expand Up @@ -46,14 +47,16 @@ struct dictinit_state_s {
};

/**
* @brief Return a 12-bit hash from a byte sequence
* @brief Return a 12-bit hash from a string
*/
static inline __device__ uint32_t nvstr_init_hash(char const *ptr, uint32_t len)
static inline __device__ uint32_t hash_string(const string_view val)
{
if (len != 0) {
return (ptr[0] + (ptr[len - 1] << 5) + (len << 10)) & ((1 << init_hash_bits) - 1);
} else {
if (val.empty()) {
return 0;
} else {
char const *ptr = val.data();
uint32_t len = val.size_bytes();
return (ptr[0] + (ptr[len - 1] << 5) + (len << 10)) & ((1 << init_hash_bits) - 1);
}
}

Expand All @@ -71,7 +74,8 @@ static __device__ void LoadNonNullIndices(volatile dictinit_state_s *s,
{
if (t == 0) { s->nnz = 0; }
for (uint32_t i = 0; i < s->chunk.num_rows; i += block_size) {
const uint32_t *valid_map = s->chunk.valid_map_base;
const uint32_t *valid_map = s->chunk.leaf_column->null_mask();
auto column_offset = s->chunk.leaf_column->offset();
uint32_t is_valid, nz_pos;
if (t < block_size / 32) {
if (!valid_map) {
Expand All @@ -80,10 +84,10 @@ static __device__ void LoadNonNullIndices(volatile dictinit_state_s *s,
uint32_t const row = s->chunk.start_row + i + t * 32;
auto const chunk_end = s->chunk.start_row + s->chunk.num_rows;

auto const valid_map_idx = (row + s->chunk.column_offset) / 32;
auto const valid_map_idx = (row + column_offset) / 32;
uint32_t valid = (row < chunk_end) ? valid_map[valid_map_idx] : 0;

auto const rows_in_next_word = (row + s->chunk.column_offset) & 0x1f;
auto const rows_in_next_word = (row + column_offset) & 0x1f;
if (rows_in_next_word != 0) {
auto const rows_in_current_word = 32 - rows_in_next_word;
// Read next word if any rows are within the chunk
Expand Down Expand Up @@ -111,12 +115,18 @@ static __device__ void LoadNonNullIndices(volatile dictinit_state_s *s,
* @brief Gather all non-NULL string rows and compute total character data size
*
* @param[in] chunks DictionaryChunk device array [rowgroup][column]
* @param[in] num_columns Number of columns
* @param[in] num_columns Number of string columns
*/
// blockDim {block_size,1,1}
template <int block_size>
__global__ void __launch_bounds__(block_size, 2)
gpuInitDictionaryIndices(DictionaryChunk *chunks, uint32_t num_columns)
gpuInitDictionaryIndices(DictionaryChunk *chunks,
const table_device_view view,
uint32_t *dict_data,
uint32_t *dict_index,
size_t row_index_stride,
size_type *str_col_ids,
uint32_t num_columns)
{
__shared__ __align__(16) dictinit_state_s state_g;

Expand All @@ -131,12 +141,21 @@ __global__ void __launch_bounds__(block_size, 2)
dictinit_state_s *const s = &state_g;
uint32_t col_id = blockIdx.x;
uint32_t group_id = blockIdx.y;
const nvstrdesc_s *ck_data;
uint32_t *dict_data;
uint32_t nnz, start_row, dict_char_count;
int t = threadIdx.x;

if (t == 0) s->chunk = chunks[group_id * num_columns + col_id];
if (t == 0) {
column_device_view *leaf_column_view = view.begin() + str_col_ids[col_id];
s->chunk = chunks[group_id * num_columns + col_id];
s->chunk.leaf_column = leaf_column_view;
s->chunk.dict_data =
dict_data + col_id * leaf_column_view->size() + group_id * row_index_stride;
s->chunk.dict_index = dict_index + col_id * leaf_column_view->size();
s->chunk.start_row = group_id * row_index_stride;
s->chunk.num_rows =
min(row_index_stride,
max(static_cast<size_t>(leaf_column_view->size() - s->chunk.start_row), size_t{0}));
}
for (uint32_t i = 0; i < sizeof(s->map) / sizeof(uint32_t); i += block_size) {
if (i + t < sizeof(s->map) / sizeof(uint32_t)) s->map.u32[i + t] = 0;
}
Expand All @@ -152,15 +171,15 @@ __global__ void __launch_bounds__(block_size, 2)
nnz = s->nnz;
dict_data = s->chunk.dict_data;
start_row = s->chunk.start_row;
ck_data = static_cast<const nvstrdesc_s *>(s->chunk.column_data_base) + start_row;
for (uint32_t i = 0; i < nnz; i += block_size) {
uint32_t ck_row = 0;
uint32_t hash = 0;
uint32_t len = 0;
if (i + t < nnz) {
ck_row = s->dict[i + t];
len = static_cast<uint32_t>(ck_data[ck_row].count);
hash = nvstr_init_hash(ck_data[ck_row].ptr, len);
ck_row = s->dict[i + t];
string_view string_val = s->chunk.leaf_column->element<string_view>(ck_row + start_row);
len = static_cast<uint32_t>(string_val.size_bytes());
hash = hash_string(string_val);
}
len = block_reduce(temp_storage.reduce_storage).Sum(len);
if (t == 0) s->chunk.string_char_count += len;
Expand Down Expand Up @@ -200,10 +219,11 @@ __global__ void __launch_bounds__(block_size, 2)
uint32_t ck_row = 0, pos = 0, hash = 0, pos_old, pos_new, sh, colliding_row;
bool collision;
if (i + t < nnz) {
ck_row = dict_data[i + t] - start_row;
hash = nvstr_init_hash(ck_data[ck_row].ptr, static_cast<uint32_t>(ck_data[ck_row].count));
sh = (hash & 1) ? 16 : 0;
pos_old = s->map.u16[hash];
ck_row = dict_data[i + t] - start_row;
string_view string_val = s->chunk.leaf_column->element<string_view>(ck_row + start_row);
hash = hash_string(string_val);
sh = (hash & 1) ? 16 : 0;
pos_old = s->map.u16[hash];
}
// The isolation of the atomicAdd, along with pos_old/pos_new is to guarantee deterministic
// behavior for the first row in the hash map that will be used for early duplicate detection
Expand Down Expand Up @@ -233,18 +253,16 @@ __global__ void __launch_bounds__(block_size, 2)
for (uint32_t i = 0; i < nnz; i += block_size) {
uint32_t ck_row = 0, ck_row_ref = 0, is_dupe = 0;
if (i + t < nnz) {
const char *str1, *str2;
uint32_t len1, len2, hash;
ck_row = s->dict[i + t];
str1 = ck_data[ck_row].ptr;
len1 = static_cast<uint32_t>(ck_data[ck_row].count);
hash = nvstr_init_hash(str1, len1);
ck_row_ref = s->dict[(hash > 0) ? s->map.u16[hash - 1] : 0];
ck_row = s->dict[i + t];
string_view string_value = s->chunk.leaf_column->element<string_view>(ck_row + start_row);
auto const string_length = static_cast<uint32_t>(string_value.size_bytes());
auto const hash = hash_string(string_value);
ck_row_ref = s->dict[(hash > 0) ? s->map.u16[hash - 1] : 0];
if (ck_row_ref != ck_row) {
str2 = ck_data[ck_row_ref].ptr;
len2 = static_cast<uint32_t>(ck_data[ck_row_ref].count);
is_dupe = nvstr_is_equal(str1, len1, str2, len2);
dict_char_count += (is_dupe) ? 0 : len1;
string_view reference_string =
s->chunk.leaf_column->element<string_view>(ck_row_ref + start_row);
is_dupe = (string_value == reference_string);
dict_char_count += (is_dupe) ? 0 : string_length;
}
}
uint32_t dupes_in_block;
Expand All @@ -269,6 +287,12 @@ __global__ void __launch_bounds__(block_size, 2)
chunks[group_id * num_columns + col_id].string_char_count = s->chunk.string_char_count;
chunks[group_id * num_columns + col_id].num_dict_strings = nnz - s->total_dupes;
chunks[group_id * num_columns + col_id].dict_char_count = dict_char_count;
chunks[group_id * num_columns + col_id].leaf_column = s->chunk.leaf_column;

chunks[group_id * num_columns + col_id].dict_data = s->chunk.dict_data;
chunks[group_id * num_columns + col_id].dict_index = s->chunk.dict_index;
chunks[group_id * num_columns + col_id].start_row = s->chunk.start_row;
chunks[group_id * num_columns + col_id].num_rows = s->chunk.num_rows;
}
}

Expand Down Expand Up @@ -357,7 +381,6 @@ __global__ void __launch_bounds__(block_size)
uint32_t num_strings;
uint32_t *dict_data, *dict_index;
uint32_t dict_char_count;
const nvstrdesc_s *str_data;
int t = threadIdx.x;

if (t == 0) s->stripe = stripes[stripe_id * num_columns + col_id];
Expand All @@ -366,21 +389,17 @@ __global__ void __launch_bounds__(block_size)
num_strings = s->stripe.num_strings;
dict_data = s->stripe.dict_data;
if (!dict_data) return;
dict_index = s->stripe.dict_index;
str_data = static_cast<const nvstrdesc_s *>(s->stripe.column_data_base);
dict_char_count = 0;
dict_index = s->stripe.dict_index;
string_view current_string = string_view::min();
dict_char_count = 0;
for (uint32_t i = 0; i < num_strings; i += block_size) {
uint32_t cur = (i + t < num_strings) ? dict_data[i + t] : 0;
uint32_t cur_len = 0;
const char *cur_ptr;
bool is_dupe = false;
if (i + t < num_strings) {
cur_ptr = str_data[cur].ptr;
cur_len = str_data[cur].count;
}
bool is_dupe = false;
if (i + t < num_strings) { current_string = s->stripe.leaf_column->element<string_view>(cur); }
if (i + t != 0 && i + t < num_strings) {
uint32_t prev = dict_data[i + t - 1];
is_dupe = nvstr_is_equal(cur_ptr, cur_len, str_data[prev].ptr, str_data[prev].count);
is_dupe = (current_string == (s->stripe.leaf_column->element<string_view>(prev)));
}
dict_char_count += (is_dupe) ? 0 : cur_len;
uint32_t dupes_in_block;
Expand All @@ -403,35 +422,27 @@ __global__ void __launch_bounds__(block_size)
}

/**
* @brief Launches kernel for initializing dictionary chunks
*
* @param[in] chunks DictionaryChunk device array [rowgroup][column]
* @param[in] num_columns Number of columns
* @param[in] num_rowgroups Number of row groups
* @param[in] stream CUDA stream to use, default `rmm::cuda_stream_default`
* @copydoc cudf::io::orc::gpu::InitDictionaryIndices
*/
void InitDictionaryIndices(DictionaryChunk *chunks,
void InitDictionaryIndices(const table_device_view &view,
DictionaryChunk *chunks,
uint32_t *dict_data,
uint32_t *dict_index,
size_t row_index_stride,
size_type *str_col_ids,
uint32_t num_columns,
uint32_t num_rowgroups,
rmm::cuda_stream_view stream)
{
static constexpr int block_size = 512;
dim3 dim_block(block_size, 1);
dim3 dim_grid(num_columns, num_rowgroups);
gpuInitDictionaryIndices<block_size>
<<<dim_grid, dim_block, 0, stream.value()>>>(chunks, num_columns);
gpuInitDictionaryIndices<block_size><<<dim_grid, dim_block, 0, stream.value()>>>(
chunks, view, dict_data, dict_index, row_index_stride, str_col_ids, num_columns);
}

/**
* @brief Launches kernel for building stripe dictionaries
*
* @param[in] stripes StripeDictionary device array [stripe][column]
* @param[in] stripes_host StripeDictionary host array [stripe][column]
* @param[in] chunks DictionaryChunk device array [rowgroup][column]
* @param[in] num_stripes Number of stripes
* @param[in] num_rowgroups Number of row groups
* @param[in] num_columns Number of columns
* @param[in] stream CUDA stream to use, default `rmm::cuda_stream_default`
* @copydoc cudf::io::orc::gpu::BuildStripeDictionaries
*/
void BuildStripeDictionaries(StripeDictionary *stripes,
StripeDictionary *stripes_host,
Expand All @@ -447,18 +458,16 @@ void BuildStripeDictionaries(StripeDictionary *stripes,
stripes, chunks, num_columns);
for (uint32_t i = 0; i < num_stripes * num_columns; i++) {
if (stripes_host[i].dict_data != nullptr) {
thrust::device_ptr<uint32_t> p = thrust::device_pointer_cast(stripes_host[i].dict_data);
const nvstrdesc_s *str_data =
static_cast<const nvstrdesc_s *>(stripes_host[i].column_data_base);
thrust::device_ptr<uint32_t> dict_data_ptr =
thrust::device_pointer_cast(stripes_host[i].dict_data);
column_device_view *string_column = stripes_host[i].leaf_column;
// NOTE: Requires the --expt-extended-lambda nvcc flag
thrust::sort(rmm::exec_policy(stream),
p,
p + stripes_host[i].num_strings,
[str_data] __device__(const uint32_t &lhs, const uint32_t &rhs) {
return nvstr_is_lesser(str_data[lhs].ptr,
(uint32_t)str_data[lhs].count,
str_data[rhs].ptr,
(uint32_t)str_data[rhs].count);
dict_data_ptr,
dict_data_ptr + stripes_host[i].num_strings,
[string_column] __device__(const uint32_t &lhs, const uint32_t &rhs) {
return string_column->element<string_view>(lhs) <
string_column->element<string_view>(rhs);
});
}
}
Expand Down
Loading

0 comments on commit f1f1d0f

Please sign in to comment.