Skip to content

Commit

Permalink
Fix sort merge and add basic tests
Browse files Browse the repository at this point in the history
  • Loading branch information
willdealtry committed Jun 21, 2024
1 parent f472424 commit 875bb7b
Show file tree
Hide file tree
Showing 15 changed files with 219 additions and 80 deletions.
2 changes: 1 addition & 1 deletion cpp/arcticdb/async/test/test_async.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ TEST(Async, SinkBasic) {

auto seg = ac::SegmentInMemory();
aa::EncodeAtomTask enc{
ac::entity::KeyType::GENERATION, ac::entity::VersionId{6}, ac::entity::NumericId{123}, ac::entity::NumericId{456}, ac::timestamp{457}, {ac::entity::NumericIndex{999}}, std::move(seg), codec_opt, ac::EncodingVersion::V2
ac::entity::KeyType::GENERATION, ac::entity::VersionId{6}, ac::entity::NumericId{123}, ac::entity::NumericId{456}, ac::timestamp{457}, ac::entity::NumericIndex{999}, std::move(seg), codec_opt, ac::EncodingVersion::V2
};

auto v = sched.submit_cpu_task(std::move(enc)).via(&aa::io_executor()).thenValue(aa::WriteSegmentTask{lib}).get();
Expand Down
4 changes: 2 additions & 2 deletions cpp/arcticdb/column_store/column.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,7 @@ void Column::inflate_string_array(
const TensorType<position_t> &string_refs,
CursoredBuffer<ChunkedBuffer> &data,
CursoredBuffer<Buffer> &shapes,
std::vector<position_t> &offsets,
boost::container::small_vector<position_t, 1> &offsets,
const StringPool &string_pool) {
ssize_t max_size = 0;
for (int i = 0; i < string_refs.size(); ++i)
Expand Down Expand Up @@ -490,7 +490,7 @@ void Column::inflate_string_arrays(const StringPool &string_pool) {

CursoredBuffer<ChunkedBuffer> data;
CursoredBuffer<Buffer> shapes;
std::vector<position_t> offsets;
boost::container::small_vector<position_t, 1> offsets;
for (position_t row = 0; row < row_count(); ++row) {
auto string_refs = tensor_at<position_t>(row).value();
inflate_string_array(string_refs, data, shapes, offsets, string_pool);
Expand Down
4 changes: 2 additions & 2 deletions cpp/arcticdb/column_store/column.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,7 @@ class Column {
void inflate_string_array(const TensorType<position_t> &string_refs,
CursoredBuffer<ChunkedBuffer> &data,
CursoredBuffer<Buffer> &shapes,
std::vector<position_t> &offsets,
boost::container::small_vector<position_t, 1> &offsets,
const StringPool &string_pool);

void inflate_string_arrays(const StringPool &string_pool);
Expand Down Expand Up @@ -920,7 +920,7 @@ class Column {
CursoredBuffer<ChunkedBuffer> data_;
CursoredBuffer<Buffer> shapes_;

mutable std::vector<position_t> offsets_;
mutable boost::container::small_vector<position_t, 1> offsets_;
TypeDescriptor type_;
std::optional<TypeDescriptor> orig_type_;
ssize_t last_logical_row_ = -1;
Expand Down
4 changes: 2 additions & 2 deletions cpp/arcticdb/pipeline/read_frame.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -490,9 +490,9 @@ void decode_into_frame_dynamic(
// decode_or_expand will invoke the empty type handler which will do backfilling with the default value depending on the
// destination type.
if (!trivially_compatible_types(m.source_type_desc_, m.dest_type_desc_) && !source_is_empty) {
m.dest_type_desc_.visit_tag([&buffer, &m, &data, encoded_field, buffers, encdoing_version] (auto dest_desc_tag) {
m.dest_type_desc_.visit_tag([&buffer, &m, buffers] (auto dest_desc_tag) {
using DestinationType = typename decltype(dest_desc_tag)::DataTypeTag::raw_type;
m.source_type_desc_.visit_tag([&buffer, &m, &data, &encoded_field, &buffers, encdoing_version] (auto src_desc_tag ) {
m.source_type_desc_.visit_tag([&buffer, &m] (auto src_desc_tag ) {
using SourceType = typename decltype(src_desc_tag)::DataTypeTag::raw_type;
if constexpr(std::is_arithmetic_v<SourceType> && std::is_arithmetic_v<DestinationType>) {
// If the source and destination types are different, then sizeof(destination type) >= sizeof(source type)
Expand Down
46 changes: 31 additions & 15 deletions cpp/arcticdb/processing/clause.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,15 @@ std::vector<std::vector<size_t>> structure_by_column_slice(std::vector<RangesAnd
return res;
}

std::vector<std::vector<size_t>> structure_all_together(std::vector<RangesAndKey>& ranges_and_keys) {
std::vector<size_t> res;
for (const auto& [idx, ranges_and_key]: folly::enumerate(ranges_and_keys))
res.emplace_back(idx);

return {std::move(res)};
}


/*
* On entry to a clause, construct ProcessingUnits from the input entity IDs. These will either be provided by the
* structure_for_processing method for the first clause in the pipeline, or by the previous clause for all subsequent
Expand Down Expand Up @@ -909,15 +918,24 @@ void merge_impl(
// MergeClause receives a list of DataFrames as input and merge them into a single one where all
// the rows are sorted by time stamp
Composite<EntityIds> MergeClause::process(Composite<EntityIds>&& entity_ids) const {
return std::move(entity_ids);
}

std::optional<std::vector<Composite<EntityIds>>> MergeClause::repartition(
std::vector<Composite<EntityIds>> &&comps) const {

// TODO this is a hack because we don't currently have a way to
// specify any particular input shape unless a clause is the
// first one and can use structure_for_processing. Ideally
// merging should be parallel like resampling
auto entity_ids = merge_composites(std::move(comps));
auto procs = gather_entities(component_manager_, std::move(entity_ids));

auto compare =
[](const std::unique_ptr<SegmentWrapper> &left,
const std::unique_ptr<SegmentWrapper> &right) {
const auto left_index = index::index_value_from_row(left->row(),
IndexDescriptor::TIMESTAMP, 0);
const auto right_index = index::index_value_from_row(right->row(),
IndexDescriptor::TIMESTAMP, 0);
const auto left_index = index::index_value_from_row(left->row(), IndexDescriptor::TIMESTAMP, 0);
const auto right_index = index::index_value_from_row(right->row(), IndexDescriptor::TIMESTAMP, 0);
return left_index > right_index;
};

Expand All @@ -931,13 +949,15 @@ Composite<EntityIds> MergeClause::process(Composite<EntityIds>&& entity_ids) con
procs.broadcast([&input_streams, &min_start_row, &max_end_row, &min_start_col, &max_end_col](auto&& proc) {
for (auto&& [idx, segment]: folly::enumerate(proc.segments_.value())) {
size_t start_row = proc.row_ranges_->at(idx)->start();
min_start_row = start_row < min_start_row ? start_row : min_start_row;
size_t end_row = proc.row_ranges_->at(idx)->end();
max_end_row = end_row > max_end_row ? end_row : max_end_row;
min_start_row = std::min(start_row, min_start_row);
max_end_row = std::max(end_row, max_end_row);

size_t start_col = proc.col_ranges_->at(idx)->start();
min_start_col = start_col < min_start_col ? start_col : min_start_col;
size_t end_col = proc.col_ranges_->at(idx)->end();
max_end_col = end_col > max_end_col ? end_col : max_end_col;
min_start_col = std::min(start_col, min_start_col);
max_end_col = std::max(end_col, max_end_col);

input_streams.push(std::make_unique<SegmentWrapper>(std::move(*segment)));
}
});
Expand All @@ -957,15 +977,11 @@ Composite<EntityIds> MergeClause::process(Composite<EntityIds>&& entity_ids) con
stream_descriptor_);
}, index_, density_policy_);

return ret;
std::vector<Composite<EntityIds>> output;
output.emplace_back(std::move(ret));
return std::make_optional(std::move(output));
}

std::optional<std::vector<Composite<EntityIds>>> MergeClause::repartition(
std::vector<Composite<EntityIds>> &&comps) const {
std::vector<Composite<EntityIds>> v;
v.push_back(merge_composites_shallow(std::move(comps)));
return v;
}

Composite<EntityIds> ColumnStatsGenerationClause::process(Composite<EntityIds>&& entity_ids) const {
auto procs = gather_entities(component_manager_, std::move(entity_ids), true);
Expand Down
10 changes: 6 additions & 4 deletions cpp/arcticdb/processing/clause.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,12 @@ struct IClause {

using Clause = folly::Poly<IClause>;

std::vector<std::vector<size_t>> structure_by_row_slice(std::vector<RangesAndKey>& ranges_and_keys,
size_t start_from);
std::vector<std::vector<size_t>> structure_by_row_slice(std::vector<RangesAndKey>& ranges_and_keys, size_t start_from);

std::vector<std::vector<size_t>> structure_by_column_slice(std::vector<RangesAndKey>& ranges_and_keys);

std::vector<std::vector<size_t>> structure_all_together(std::vector<RangesAndKey>& ranges_and_keys);

Composite<ProcessingUnit> gather_entities(std::shared_ptr<ComponentManager> component_manager,
Composite<EntityIds>&& entity_ids,
bool include_atom_keys = false,
Expand Down Expand Up @@ -614,8 +616,8 @@ struct MergeClause {

[[nodiscard]] std::vector<std::vector<size_t>> structure_for_processing(
std::vector<RangesAndKey>& ranges_and_keys,
size_t start_from) {
return structure_by_row_slice(ranges_and_keys, start_from);
size_t) {
return structure_all_together(ranges_and_keys);
}

[[nodiscard]] Composite<EntityIds> process(Composite<EntityIds>&& entity_ids) const;
Expand Down
4 changes: 2 additions & 2 deletions cpp/arcticdb/processing/operation_dispatch_binary.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ VariantData binary_operator(const ColumnWithStrings& col, const Value& val, Func
Column::transform<typename col_type_info::TDT, ScalarTagType<DataTypeTag<output_data_type>>>(
*(col.column_),
*output_column,
[&func, &col, &column_name, raw_value](auto input_value) -> ReversedTargetType {
[&func, raw_value](auto input_value) -> ReversedTargetType {
return func.apply(raw_value, input_value);
});
} else {
Expand All @@ -412,7 +412,7 @@ VariantData binary_operator(const ColumnWithStrings& col, const Value& val, Func
Column::transform<typename col_type_info::TDT, ScalarTagType<DataTypeTag<output_data_type>>>(
*(col.column_),
*output_column,
[&func, &col, &column_name, raw_value](auto input_value) -> TargetType {
[&func, raw_value](auto input_value) -> TargetType {
return func.apply(input_value, raw_value);
});
}
Expand Down
2 changes: 1 addition & 1 deletion cpp/arcticdb/processing/operation_dispatch_unary.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ VariantData unary_comparator(const ColumnWithStrings& col, Func&& func) {
constexpr auto sparse_missing_value_output = std::is_same_v<std::remove_reference_t<Func>, IsNullOperator>;
details::visit_type(col.column_->type().data_type(), [&](auto col_tag) {
using type_info = ScalarTypeInfo<decltype(col_tag)>;
Column::transform<typename type_info::TDT>(*(col.column_), output_bitset, sparse_missing_value_output, [&col, &func](auto input_value) -> bool {
Column::transform<typename type_info::TDT>(*(col.column_), output_bitset, sparse_missing_value_output, [&](auto input_value) -> bool {
if constexpr (is_floating_point_type(type_info::data_type)) {
return func.apply(input_value);
} else if constexpr (is_sequence_type(type_info::data_type)) {
Expand Down
64 changes: 34 additions & 30 deletions cpp/arcticdb/processing/test/test_clause.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -372,52 +372,56 @@ TEST(Clause, Split) {

TEST(Clause, Merge) {
using namespace arcticdb;
const auto seg_size = 5;
const auto seg_size = 30;
ScopedConfig max_blocks("Merge.SegmentSize", seg_size);
auto component_manager = std::make_shared<ComponentManager>();

const auto num_segs = 4;
const auto num_rows = 20;

auto stream_id = StreamId("Merge");
StreamDescriptor descriptor{};
descriptor.add_field(FieldRef{make_scalar_type(DataType::NANOSECONDS_UTC64),"time"});
MergeClause merge_clause{TimeseriesIndex{"time"}, DenseColumnPolicy{}, stream_id, descriptor};
MergeClause merge_clause{TimeseriesIndex{"time"}, SparseColumnPolicy{}, stream_id, descriptor};
merge_clause.set_component_manager(component_manager);

Composite<EntityIds> entity_ids;
std::vector<SegmentInMemory> copies;
const auto num_segs = 2;
const auto num_rows = 5;
auto seg = get_standard_timeseries_segment(std::get<StringId>(stream_id), num_rows);

std::vector<SegmentInMemory> segs;
for(auto x = 0u; x < num_segs; ++x) {
auto symbol = fmt::format("merge_{}", x);
auto seg = get_standard_timeseries_segment(symbol, 10);
copies.emplace_back(seg.clone());
auto proc_unit = ProcessingUnit{std::move(seg)};
entity_ids.push_back(push_entities(component_manager, std::move(proc_unit)));
segs.emplace_back(SegmentInMemory{seg.descriptor().clone(), num_rows / num_segs, false});
}

auto res = gather_entities(component_manager, merge_clause.process(std::move(entity_ids))).as_range();
ASSERT_EQ(res.size(), 4u);
for(auto i = 0; i < num_rows * num_segs; ++i) {
auto& output_seg = *res[i / seg_size].segments_->at(0);
auto output_row = i % seg_size;
const auto& expected_seg = copies[i % num_segs];
auto expected_row = i / num_segs;
for(auto field : folly::enumerate(output_seg.descriptor().fields())) {
if(field.index == 1)
continue;

visit_field(*field, [&output_seg, &expected_seg, output_row, expected_row, &field] (auto tdt) {
using DataTypeTag = typename decltype(tdt)::DataTypeTag;
if constexpr(is_sequence_type(DataTypeTag::data_type)) {
const auto val1 = output_seg.string_at(output_row, position_t(field.index));
const auto val2 = expected_seg.string_at(expected_row, position_t(field.index));
ASSERT_EQ(val1, val2);
for(auto i = 0u; i < num_rows; ++i) {
auto& current = segs[i % num_segs];
for(auto j = 0U; j < seg.descriptor().field_count(); ++j) {
current.column(j).type().visit_tag([&current, &seg, i, j](auto&& tag) {
using DT = std::decay_t<decltype(tag)>;
const auto data_type = DT::DataTypeTag::data_type;
using RawType = typename DT::DataTypeTag::raw_type;
if constexpr(is_sequence_type(data_type)) {
current.set_string(j, seg.string_at(i, j).value());
} else {
using RawType = typename decltype(tdt)::DataTypeTag::raw_type;
const auto val1 = output_seg.scalar_at<RawType>(output_row, field.index);
const auto val2 = expected_seg.scalar_at<RawType>(expected_row, field.index);
ASSERT_EQ(val1, val2);
current.set_scalar<RawType>(j, seg.scalar_at<RawType>(i, j).value());
}
});
}
current.end_row();
}

for(auto x = 0u; x < num_segs; ++x) {
auto proc_unit = ProcessingUnit{std::move(segs[x])};
entity_ids.push_back(push_entities(component_manager, std::move(proc_unit)));
}

Composite<EntityIds> processed_ids = merge_clause.process(std::move(entity_ids));
std::vector<Composite<EntityIds>> vec;
vec.emplace_back(std::move(processed_ids));
auto repartitioned = merge_clause.repartition(std::move(vec));
auto res = gather_entities(component_manager, std::move(repartitioned->at(0))).as_range();
ASSERT_EQ(res.size(), 1u);
const auto& received = res[0];
ASSERT_EQ(*received.segments_->at(0), seg);
}
11 changes: 1 addition & 10 deletions cpp/arcticdb/processing/unsorted_aggregation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,16 +56,7 @@ SegmentInMemory MinMaxAggregatorData::finalize(const std::vector<ColumnName>& ou
return seg;
}

namespace
{
inline util::BitMagic::enumerator::value_type deref(util::BitMagic::enumerator iter) {
return *iter;
}

inline std::size_t deref(std::size_t index) {
return index;
}

namespace {
template<typename T, typename T2=void>
struct OutputType;

Expand Down
2 changes: 1 addition & 1 deletion cpp/arcticdb/storage/coalesced/multi_segment_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ uint64_t get_symbol_prefix(const entity::StreamId& stream_id) {
constexpr size_t begin = sizeof(InternalType) - sizeof(StorageType);
StorageType data{};
util::variant_match(stream_id,
[&data, begin, end] (const entity::StringId& string_id) {
[&] (const entity::StringId& string_id) {
auto* target = reinterpret_cast<char*>(&data);
for(size_t p = begin, i = 0; p < end && i < string_id.size(); ++p, ++i) {
const auto c = string_id[i];
Expand Down
3 changes: 0 additions & 3 deletions cpp/arcticdb/storage/library_path.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,6 @@ class DefaultStringViewable : public std::shared_ptr<std::string> {
std::make_shared<std::string>(args...)),
hash_(arcticdb::hash(std::string_view{*this})) {}

DefaultStringViewable(const DefaultStringViewable &that) :
std::shared_ptr<std::string>::shared_ptr(that), hash_(that.hash_) {}

operator std::string_view() const {
return *this->get();
}
Expand Down
9 changes: 5 additions & 4 deletions cpp/arcticdb/version/version_core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -657,7 +657,7 @@ std::vector<SliceAndKey> read_and_process(
// i.e. if the first processing unit needs ranges_and_keys[0] and ranges_and_keys[1], and the second needs ranges_and_keys[2] and ranges_and_keys[3]
// then the structure will be {{0, 1}, {2, 3}}
std::vector<std::vector<size_t>> processing_unit_indexes = read_query.clauses_[0]->structure_for_processing(ranges_and_keys, start_from);
component_manager->set_next_entity_id(ranges_and_keys.size());
component_manager->set_next_entity_id(ranges_and_keys.size());

// Start reading as early as possible
auto segment_and_slice_futures = store->batch_read_uncompressed(std::move(ranges_and_keys), columns_to_decode(pipeline_context));
Expand Down Expand Up @@ -1285,9 +1285,10 @@ VersionedItem sort_merge_impl(
[&](const stream::TimeseriesIndex &timeseries_index) {
read_query.clauses_.emplace_back(std::make_shared<Clause>(SortClause{timeseries_index.name()}));
read_query.clauses_.emplace_back(std::make_shared<Clause>(RemoveColumnPartitioningClause{}));
const auto split_size = ConfigsMap::instance()->get_int("Split.RowCount", 10000);
read_query.clauses_.emplace_back(std::make_shared<Clause>(SplitClause{static_cast<size_t>(split_size)}));
read_query.clauses_.emplace_back(std::make_shared<Clause>(MergeClause{timeseries_index, DenseColumnPolicy{}, stream_id, pipeline_context->descriptor()}));
//const auto split_size = ConfigsMap::instance()->get_int("Split.RowCount", 10000);
//read_query.clauses_.emplace_back(std::make_shared<Clause>(SplitClause{static_cast<size_t>(split_size)}));

read_query.clauses_.emplace_back(std::make_shared<Clause>(MergeClause{timeseries_index, SparseColumnPolicy{}, stream_id, pipeline_context->descriptor()}));
auto segments = read_and_process(store, pipeline_context, read_query, ReadOptions{}, pipeline_context->incompletes_after());
pipeline_context->total_rows_ = num_versioned_rows + get_slice_rowcounts(segments);

Expand Down
6 changes: 3 additions & 3 deletions python/arcticdb/version_store/_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -2455,9 +2455,9 @@ def is_symbol_pickled(self, symbol: str, as_of: Optional[VersionQueryInput] = No

def _get_time_range_from_ts(self, desc, min_ts, max_ts):
if desc.stream_descriptor.index.kind != IndexDescriptor.Type.TIMESTAMP or \
desc.stream_descriptor.sorted == SortedValue.UNSORTED or \
min_ts is None or \
max_ts is None:
desc.stream_descriptor.sorted == SortedValue.UNSORTED or \
min_ts is None or \
max_ts is None:
return datetime64("nat"), datetime64("nat")
input_type = desc.normalization.WhichOneof("input_type")
tz = None
Expand Down
Loading

0 comments on commit 875bb7b

Please sign in to comment.