Skip to content

Commit

Permalink
address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
karthikeyann committed Sep 25, 2024
1 parent 2c06379 commit e5f6d2a
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 38 deletions.
75 changes: 40 additions & 35 deletions cpp/src/io/json/host_tree_algorithms.cu
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ struct json_column_data {
using hashmap_of_device_columns =
std::unordered_map<NodeIndexT, std::reference_wrapper<device_json_column>>;

std::pair<cudf::detail::host_vector<uint8_t>, hashmap_of_device_columns> build_tree(
std::pair<cudf::detail::host_vector<bool>, hashmap_of_device_columns> build_tree(
device_json_column& root,
host_span<uint8_t const> is_str_column_all_nulls,
tree_meta_t& d_column_tree,
Expand All @@ -242,7 +242,7 @@ void scatter_offsets(tree_meta_t const& tree,
device_span<size_type> node_ids,
device_span<size_type> sorted_col_ids, // Reuse this for parent_col_ids
tree_meta_t const& d_column_tree,
host_span<const uint8_t> ignore_vals,
host_span<const bool> ignore_vals,
hashmap_of_device_columns const& columns,
rmm::cuda_stream_view stream);

Expand Down Expand Up @@ -284,8 +284,10 @@ void make_device_json_column(device_span<SymbolT const> input,
// sort by {col_id} on {node_ids} stable
rmm::device_uvector<NodeIndexT> node_ids(col_ids.size(), stream);
thrust::sequence(rmm::exec_policy_nosync(stream), node_ids.begin(), node_ids.end());
thrust::stable_sort_by_key(
rmm::exec_policy_nosync(stream), sorted_col_ids.begin(), sorted_col_ids.end(), node_ids.begin());
thrust::stable_sort_by_key(rmm::exec_policy_nosync(stream),
sorted_col_ids.begin(),
sorted_col_ids.end(),
node_ids.begin());

NodeIndexT const row_array_parent_col_id =
get_row_array_parent_col_id(col_ids, is_enabled_lines, stream);
Expand Down Expand Up @@ -355,7 +357,7 @@ void make_device_json_column(device_span<SymbolT const> input,
stream);
}

std::pair<cudf::detail::host_vector<uint8_t>, hashmap_of_device_columns> build_tree(
std::pair<cudf::detail::host_vector<bool>, hashmap_of_device_columns> build_tree(
device_json_column& root,
host_span<uint8_t const> is_str_column_all_nulls,
tree_meta_t& d_column_tree,
Expand Down Expand Up @@ -442,7 +444,8 @@ std::pair<cudf::detail::host_vector<uint8_t>, hashmap_of_device_columns> build_t
// map{parent_col_id, child_col_name}> = child_col_id, used for null value column tracking
std::map<std::pair<NodeIndexT, std::string>, NodeIndexT> mapped_columns;
// find column_ids which are values, but should be ignored in validity
auto ignore_vals = cudf::detail::make_host_vector<uint8_t>(num_columns, stream);
auto ignore_vals = cudf::detail::make_host_vector<bool>(num_columns, stream);
std::fill(ignore_vals.begin(), ignore_vals.end(), false);
std::vector<uint8_t> is_mixed_type_column(num_columns, 0);
std::vector<uint8_t> is_pruned(num_columns, 0);
// for columns that are not mixed type but have been forced as string
Expand Down Expand Up @@ -522,7 +525,7 @@ std::pair<cudf::detail::host_vector<uint8_t>, hashmap_of_device_columns> build_t
if (parent_col_id != parent_node_sentinel &&
(is_mixed_type_column[parent_col_id] || is_pruned[this_col_id]) ||
forced_as_string_column[parent_col_id]) {
ignore_vals[this_col_id] = 1;
ignore_vals[this_col_id] = true;
if (is_mixed_type_column[parent_col_id]) { is_mixed_type_column[this_col_id] = 1; }
if (forced_as_string_column[parent_col_id]) { forced_as_string_column[this_col_id] = true; }
continue;
Expand Down Expand Up @@ -568,12 +571,12 @@ std::pair<cudf::detail::host_vector<uint8_t>, hashmap_of_device_columns> build_t
}

if (column_categories[this_col_id] == NC_VAL || column_categories[this_col_id] == NC_STR) {
ignore_vals[this_col_id] = 1;
ignore_vals[this_col_id] = true;
continue;
}
if (column_categories[old_col_id] == NC_VAL || column_categories[old_col_id] == NC_STR) {
// remap
ignore_vals[old_col_id] = 1;
ignore_vals[old_col_id] = true;
mapped_columns.erase({parent_col_id, name});
columns.erase(old_col_id);
parent_col.child_columns.erase(name);
Expand Down Expand Up @@ -623,7 +626,7 @@ std::pair<cudf::detail::host_vector<uint8_t>, hashmap_of_device_columns> build_t
auto parent_col_id = column_parent_ids[this_col_id];
if (parent_col_id != parent_node_sentinel and is_mixed_type_column[parent_col_id] == 1) {
is_mixed_type_column[this_col_id] = 1;
ignore_vals[this_col_id] = 1;
ignore_vals[this_col_id] = true;
columns.erase(this_col_id);
}
// Convert only mixed type columns as string (so to copy), but not its children
Expand All @@ -643,7 +646,7 @@ std::pair<cudf::detail::host_vector<uint8_t>, hashmap_of_device_columns> build_t
auto parent_col_id = column_parent_ids[this_col_id];
if (parent_col_id != parent_node_sentinel and forced_as_string_column[parent_col_id]) {
forced_as_string_column[this_col_id] = true;
ignore_vals[this_col_id] = 1;
ignore_vals[this_col_id] = true;
}
// Convert only mixed type columns as string (so to copy), but not its children
if (parent_col_id != parent_node_sentinel and not forced_as_string_column[parent_col_id] and
Expand All @@ -669,7 +672,7 @@ void scatter_offsets(tree_meta_t const& tree,
device_span<size_type> node_ids,
device_span<size_type> sorted_col_ids, // Reuse this for parent_col_ids
tree_meta_t const& d_column_tree,
host_span<const uint8_t> ignore_vals,
host_span<const bool> ignore_vals,
hashmap_of_device_columns const& columns,
rmm::cuda_stream_view stream)
{
Expand Down Expand Up @@ -809,7 +812,7 @@ std::map<std::string, schema_element> unified_schema(cudf::io::json_reader_optio
{
return std::visit(
cudf::detail::visitor_overload{
[](std::vector<data_type> const& user_dtypes) -> std::map<std::string, schema_element> {
[](std::vector<data_type> const& user_dtypes) {
std::map<std::string, schema_element> dnew;
std::transform(thrust::counting_iterator<size_t>(0),
thrust::counting_iterator<size_t>(user_dtypes.size()),
Expand All @@ -819,8 +822,7 @@ std::map<std::string, schema_element> unified_schema(cudf::io::json_reader_optio
});
return dnew;
},
[](std::map<std::string, data_type> const& user_dtypes)
-> std::map<std::string, schema_element> {
[](std::map<std::string, data_type> const& user_dtypes) {
std::map<std::string, schema_element> dnew;
std::transform(user_dtypes.begin(),
user_dtypes.end(),
Expand All @@ -830,12 +832,11 @@ std::map<std::string, schema_element> unified_schema(cudf::io::json_reader_optio
});
return dnew;
},
[](std::map<std::string, schema_element> const& user_dtypes)
-> std::map<std::string, schema_element> { return user_dtypes; }},
[](std::map<std::string, schema_element> const& user_dtypes) { return user_dtypes; }},
options.get_dtypes());
}

std::pair<cudf::detail::host_vector<uint8_t>, hashmap_of_device_columns> build_tree(
std::pair<cudf::detail::host_vector<bool>, hashmap_of_device_columns> build_tree(
device_json_column& root,
host_span<uint8_t const> is_str_column_all_nulls,
tree_meta_t& d_column_tree,
Expand Down Expand Up @@ -886,8 +887,10 @@ void make_device_json_column(device_span<SymbolT const> input,
// sort by {col_id} on {node_ids} stable
rmm::device_uvector<NodeIndexT> node_ids(col_ids.size(), stream);
thrust::sequence(rmm::exec_policy_nosync(stream), node_ids.begin(), node_ids.end());
thrust::stable_sort_by_key(
rmm::exec_policy_nosync(stream), sorted_col_ids.begin(), sorted_col_ids.end(), node_ids.begin());
thrust::stable_sort_by_key(rmm::exec_policy_nosync(stream),
sorted_col_ids.begin(),
sorted_col_ids.end(),
node_ids.begin());

NodeIndexT const row_array_parent_col_id =
get_row_array_parent_col_id(col_ids, is_enabled_lines, stream);
Expand Down Expand Up @@ -958,7 +961,7 @@ void make_device_json_column(device_span<SymbolT const> input,
stream);
}

std::pair<cudf::detail::host_vector<uint8_t>, hashmap_of_device_columns> build_tree(
std::pair<cudf::detail::host_vector<bool>, hashmap_of_device_columns> build_tree(
device_json_column& root,
host_span<uint8_t const> is_str_column_all_nulls,
tree_meta_t& d_column_tree,
Expand Down Expand Up @@ -1002,10 +1005,11 @@ std::pair<cudf::detail::host_vector<uint8_t>, hashmap_of_device_columns> build_t
} else if (column_category == NC_VAL || column_category == NC_STR) {
col.string_offsets.resize(max_row_offsets[i] + 1, stream);
col.string_lengths.resize(max_row_offsets[i] + 1, stream);
thrust::uninitialized_fill(
rmm::exec_policy_nosync(stream), col.string_offsets.begin(), col.string_offsets.end(), 0);
thrust::uninitialized_fill(
rmm::exec_policy_nosync(stream), col.string_lengths.begin(), col.string_lengths.end(), 0);
thrust::fill(
rmm::exec_policy_nosync(stream),
thrust::make_zip_iterator(col.string_offsets.begin(), col.string_lengths.begin()),
thrust::make_zip_iterator(col.string_offsets.end(), col.string_lengths.end()),
thrust::make_tuple(0, 0));
} else if (column_category == NC_LIST) {
col.child_offsets.resize(max_row_offsets[i] + 2, stream);
thrust::uninitialized_fill(
Expand All @@ -1032,7 +1036,7 @@ std::pair<cudf::detail::host_vector<uint8_t>, hashmap_of_device_columns> build_t
}

// Pruning
auto is_pruned = cudf::detail::make_host_vector<uint8_t>(num_columns, stream);
auto is_pruned = cudf::detail::make_host_vector<bool>(num_columns, stream);
std::fill_n(is_pruned.begin(), num_columns, options.is_enabled_prune_columns());

// prune all children of a column, but not self.
Expand All @@ -1058,7 +1062,8 @@ std::pair<cudf::detail::host_vector<uint8_t>, hashmap_of_device_columns> build_t
// Pruning: iterate through schema and mark only those columns and enforce type.
// NoPruning: iterate through schema and enforce type.

if (adj[parent_node_sentinel].empty()) return {cudf::detail::make_host_vector<uint8_t>(0, stream), {}}; // for empty file
if (adj[parent_node_sentinel].empty())
return {cudf::detail::make_host_vector<bool>(0, stream), {}}; // for empty file
CUDF_EXPECTS(adj[parent_node_sentinel].size() == 1, "Should be 1");
auto expected_types = cudf::detail::make_host_vector<NodeT>(num_columns, stream);
std::fill_n(expected_types.begin(), num_columns, NUM_NODE_CLASSES);
Expand Down Expand Up @@ -1124,7 +1129,8 @@ std::pair<cudf::detail::host_vector<uint8_t>, hashmap_of_device_columns> build_t
}
};
if (is_array_of_arrays) {
if (adj[adj[parent_node_sentinel][0]].empty()) return {cudf::detail::make_host_vector<uint8_t>(0, stream), {}};
if (adj[adj[parent_node_sentinel][0]].empty())
return {cudf::detail::make_host_vector<bool>(0, stream), {}};
auto root_list_col_id =
is_enabled_lines ? adj[parent_node_sentinel][0] : adj[adj[parent_node_sentinel][0]][0];
// mark root and row array col_id as not pruned.
Expand Down Expand Up @@ -1258,13 +1264,12 @@ std::pair<cudf::detail::host_vector<uint8_t>, hashmap_of_device_columns> build_t
}
}
} else {
if (struct_col_id != -1 and list_col_id != -1) {
CUDF_FAIL("A mix of lists and structs within the same column is not supported");
} else {
// either one only: so ignore str column.
if ((struct_col_id != -1 or list_col_id != -1) and str_col_id != -1) {
is_pruned[str_col_id] = true;
}
// if both are present, error out.
CUDF_EXPECTS(struct_col_id == -1 or list_col_id == -1,
"A mix of lists and structs within the same column is not supported");
// either one only: so ignore str column.
if ((struct_col_id != -1 or list_col_id != -1) and str_col_id != -1) {
is_pruned[str_col_id] = true;
}
}
};
Expand Down
7 changes: 4 additions & 3 deletions cpp/src/io/json/json_tree.cu
Original file line number Diff line number Diff line change
Expand Up @@ -506,16 +506,16 @@ std::pair<size_t, rmm::device_uvector<size_type>> remapped_field_nodes_after_uni
if (num_keys == 0) { return {num_keys, rmm::device_uvector<size_type>(num_keys, stream)}; }
rmm::device_uvector<size_type> offsets(num_keys, stream);
rmm::device_uvector<size_type> lengths(num_keys, stream);
auto offset_length_it = thrust::make_zip_iterator(offsets.begin(), lengths.begin());
thrust::transform(rmm::exec_policy_nosync(stream),
keys.begin(),
keys.end(),
thrust::make_zip_iterator(offsets.begin(), lengths.begin()),
offset_length_it,
[node_range_begin = d_tree.node_range_begin.data(),
node_range_end = d_tree.node_range_end.data()] __device__(auto key) {
return thrust::make_tuple(node_range_begin[key],
node_range_end[key] - node_range_begin[key]);
});
auto offset_length_it = thrust::make_zip_iterator(offsets.begin(), lengths.begin());
cudf::io::parse_options_view opt{',', '\n', '\0', '.'};
opt.keepquotes = true;

Expand Down Expand Up @@ -555,7 +555,7 @@ std::pair<size_t, rmm::device_uvector<size_type>> remapped_field_nodes_after_uni
using hasher_type = decltype(d_hasher);
constexpr size_type empty_node_index_sentinel = -1;
auto key_set = cuco::static_set{
cuco::extent{compute_hash_table_size(num_keys, 100)}, // 40% occupancy
cuco::extent{compute_hash_table_size(num_keys)},
cuco::empty_key{empty_node_index_sentinel},
d_equal,
cuco::linear_probing<1, hasher_type>{d_hasher},
Expand All @@ -570,6 +570,7 @@ std::pair<size_t, rmm::device_uvector<size_type>> remapped_field_nodes_after_uni
found_keys.begin(),
thrust::make_discard_iterator(),
stream.value());
// set.size will synchronize the stream before return.
return {key_set.size(stream), std::move(found_keys)};
}

Expand Down

0 comments on commit e5f6d2a

Please sign in to comment.