Skip to content

Commit

Permalink
Fix ArcticDB reading streaming data (#1647)
Browse files Browse the repository at this point in the history
#### Reference Issues/PRs
Fixes man-group/arcticdb-man#80

#### What does this implement or fix?

Allows ArcticDB to read incomplete segments written by arcticc tick
collectors.

#### Any other comments?

arcticc has a different append incompletes logic to ArcticDB. It writes
a dummy `StreamDescriptor` on the `TimeseriesDescriptor`, and relies
entirely on the `StreamDescriptor` in the segment header. That approach
is better than the existing ArcticDB approach, which writes the same
`StreamDescriptor` twice (directly in the header, and in the
`TimeseriesDescriptor`).

ArcticDB was reading this dummy stream descriptor and crashing.

Firstly, this PR changes readers to first check the StreamDescriptor on
the _header_ of incompletes, rather than using the one stamped on the
`TimeseriesDescriptor`. It also adds a backwards compat test for the
format.

Secondly, and this is not required for the PR to be logically correct,
we change writers so that, like arcticc, they do not duplicate the
`StreamDescriptor`. This is done in 99d0870.

#### Checklist

<details>
  <summary>
   Checklist for code changes...
  </summary>
 
- [x] Have you updated the relevant docstrings, documentation and
copyright notice?
- [x] Is this contribution tested against [all ArcticDB's
features](../docs/mkdocs/docs/technical/contributing.md)?
- [x] Do all exceptions introduced raise appropriate [error
messages](https://docs.arcticdb.io/error_messages/)?
 - [x] Are API changes highlighted in the PR description?
- [x] Is the PR labelled as enhancement or bug so it appears in
autogenerated release notes?
</details>

<!--
Thanks for contributing a Pull Request to ArcticDB! Please ensure you
have taken a look at:
- ArcticDB's Code of Conduct:
https://github.com/man-group/ArcticDB/blob/master/CODE_OF_CONDUCT.md
- ArcticDB's Contribution Licensing:
https://github.com/man-group/ArcticDB/blob/master/docs/mkdocs/docs/technical/contributing.md#contribution-licensing
-->

---------

Co-authored-by: Alex Seaton <alex.seaton@man.com>
  • Loading branch information
poodlewars and Alex Seaton authored Jun 28, 2024
1 parent 9b2cbb9 commit 8cf8279
Show file tree
Hide file tree
Showing 8 changed files with 196 additions and 15 deletions.
6 changes: 6 additions & 0 deletions cpp/arcticdb/async/async_store.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,12 @@ folly::Future<std::pair<VariantKey, TimeseriesDescriptor>> read_timeseries_descr
return read_and_continue(key, library_, opts, DecodeTimeseriesDescriptorTask{});
}

folly::Future<std::pair<VariantKey, TimeseriesDescriptor>> read_timeseries_descriptor_for_incompletes(
const entity::VariantKey &key,
storage::ReadKeyOpts opts = storage::ReadKeyOpts{}) override {
return read_and_continue(key, library_, opts, DecodeTimeseriesDescriptorForIncompletesTask{});
}

folly::Future<bool> key_exists(const entity::VariantKey &key) override {
return async::submit_io_task(KeyExistsTask{&key, library_});
}
Expand Down
24 changes: 23 additions & 1 deletion cpp/arcticdb/async/tasks.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,6 @@ struct DecodeMetadataTask : BaseTask {
}
};


struct DecodeTimeseriesDescriptorTask : BaseTask {
ARCTICDB_MOVE_ONLY_DEFAULT(DecodeTimeseriesDescriptorTask)

Expand All @@ -474,6 +473,29 @@ struct DecodeTimeseriesDescriptorTask : BaseTask {

}
};

struct DecodeTimeseriesDescriptorForIncompletesTask : BaseTask {
ARCTICDB_MOVE_ONLY_DEFAULT(DecodeTimeseriesDescriptorForIncompletesTask)

DecodeTimeseriesDescriptorForIncompletesTask() = default;

std::pair<VariantKey, TimeseriesDescriptor> operator()(storage::KeySegmentPair &&ks) const {
ARCTICDB_SAMPLE(DecodeTimeseriesDescriptorForIncompletesTask, 0)
auto key_seg = std::move(ks);
ARCTICDB_DEBUG(
log::storage(),
"DecodeTimeseriesDescriptorForIncompletesTask decoding segment with key {}",
variant_key_view(key_seg.variant_key()));

auto maybe_desc = decode_timeseries_descriptor_for_incompletes(key_seg.segment());

util::check(static_cast<bool>(maybe_desc), "Failed to decode timeseries descriptor");
return std::make_pair(
std::move(key_seg.variant_key()),
std::move(*maybe_desc));
}
};

struct DecodeMetadataAndDescriptorTask : BaseTask {
ARCTICDB_MOVE_ONLY_DEFAULT(DecodeMetadataAndDescriptorTask)

Expand Down
57 changes: 49 additions & 8 deletions cpp/arcticdb/codec/codec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -241,9 +241,17 @@ std::optional<FieldCollection> decode_descriptor_fields(
}

TimeseriesDescriptor unpack_timeseries_descriptor_from_proto(
const google::protobuf::Any& any) {
const google::protobuf::Any& any, const StreamDescriptor& stream_desc, bool is_decoding_incompletes) {

auto tsd = timeseries_descriptor_from_any(any);
if (is_decoding_incompletes) {
// Prefer the stream descriptor on the segment header for incompletes.
// See PR #1647.
arcticc::pb2::descriptors_pb2::StreamDescriptor desc_proto;
copy_stream_descriptor_to_proto(stream_desc, desc_proto);
tsd.mutable_stream_descriptor()->CopyFrom(desc_proto);
}

auto frame_meta = std::make_shared<arcticdb::proto::descriptors::FrameMetadata>();
exchange_timeseries_proto(tsd, *frame_meta);

Expand All @@ -259,12 +267,12 @@ std::optional<TimeseriesDescriptor> decode_timeseries_descriptor_v1(
const SegmentHeader& hdr,
const uint8_t* data,
const uint8_t* begin,
const uint8_t* end ARCTICDB_UNUSED) {
const StreamDescriptor& descriptor) {
auto maybe_any = decode_metadata(hdr, data, begin);
if(!maybe_any)
return std::nullopt;

return unpack_timeseries_descriptor_from_proto(*maybe_any);
return unpack_timeseries_descriptor_from_proto(*maybe_any, descriptor, false);
}

void skip_descriptor(const uint8_t*& data, const SegmentHeader& hdr) {
Expand Down Expand Up @@ -304,11 +312,12 @@ std::optional<TimeseriesDescriptor> decode_timeseries_descriptor(
const SegmentHeader& hdr,
const uint8_t* data,
const uint8_t* begin,
const uint8_t* end) {
const uint8_t* end,
const StreamDescriptor& descriptor) {
util::check(data != nullptr, "Got null data ptr from segment");
auto encoding_version = EncodingVersion(hdr.encoding_version());
if (encoding_version == EncodingVersion::V1)
return decode_timeseries_descriptor_v1(hdr, data, begin, end);
return decode_timeseries_descriptor_v1(hdr, data, begin, descriptor);
else
return decode_timeseries_descriptor_v2(hdr, data, begin, end);
}
Expand All @@ -322,7 +331,38 @@ std::optional<TimeseriesDescriptor> decode_timeseries_descriptor(
const uint8_t* begin = data;
const uint8_t* end = data + segment.buffer().bytes();

return decode_timeseries_descriptor(hdr, data, begin, end);
return decode_timeseries_descriptor(hdr, data, begin, end, segment.descriptor());
}

std::optional<TimeseriesDescriptor> decode_timeseries_descriptor_for_incompletes(
const SegmentHeader& hdr,
const StreamDescriptor& desc,
const uint8_t* data,
const uint8_t* begin,
const uint8_t* end) {
util::check(data != nullptr, "Got null data ptr from segment");
auto encoding_version = EncodingVersion(hdr.encoding_version());
if (encoding_version == EncodingVersion::V1) {
auto maybe_any = decode_metadata(hdr, data, begin);
if (!maybe_any)
return std::nullopt;

return unpack_timeseries_descriptor_from_proto(*maybe_any, desc, true);
} else {
return decode_timeseries_descriptor_v2(hdr, data, begin, end);
}
}

std::optional<TimeseriesDescriptor> decode_timeseries_descriptor_for_incompletes(
Segment& segment) {
auto &hdr = segment.header();
const uint8_t* data = segment.buffer().data();

util::check(data != nullptr, "Got null data ptr from segment");
const uint8_t* begin = data;
const uint8_t* end = data + segment.buffer().bytes();

return decode_timeseries_descriptor_for_incompletes(hdr, segment.descriptor(), data, begin, end);
}

std::pair<std::optional<google::protobuf::Any>, StreamDescriptor> decode_metadata_and_descriptor_fields(
Expand Down Expand Up @@ -446,7 +486,8 @@ void decode_v2(const Segment& segment,
void decode_v1(const Segment& segment,
const SegmentHeader& hdr,
SegmentInMemory& res,
const StreamDescriptor& desc) {
const StreamDescriptor& desc,
bool is_decoding_incompletes) {
ARCTICDB_SAMPLE(DecodeSegment, 0)
const uint8_t* data = segment.buffer().data();
if(data == nullptr) {
Expand All @@ -459,7 +500,7 @@ void decode_v1(const Segment& segment,
decode_metadata(hdr, data, begin, res);
if(res.has_metadata() && res.metadata()->Is<arcticdb::proto::descriptors::TimeSeriesDescriptor>()) {
ARCTICDB_DEBUG(log::version(), "Unpacking timeseries descriptor from metadata");
auto tsd = unpack_timeseries_descriptor_from_proto(*res.metadata());
auto tsd = unpack_timeseries_descriptor_from_proto(*res.metadata(), desc, is_decoding_incompletes);
res.set_timeseries_descriptor(tsd);
res.reset_metadata();
}
Expand Down
7 changes: 4 additions & 3 deletions cpp/arcticdb/codec/codec.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ Segment encode_v1(
void decode_v1(const Segment& segment,
const SegmentHeader& hdr,
SegmentInMemory& res,
const StreamDescriptor& desc);
const StreamDescriptor& desc,
bool is_decoding_incompletes = false);

void decode_v2(const Segment& segment,
const SegmentHeader& hdr,
Expand Down Expand Up @@ -82,12 +83,12 @@ std::pair<std::optional<google::protobuf::Any>, StreamDescriptor> decode_metadat
std::optional<TimeseriesDescriptor> decode_timeseries_descriptor(
Segment& segment);

std::optional<TimeseriesDescriptor> decode_timeseries_descriptor_for_incompletes(Segment& segment);

HashedValue get_segment_hash(Segment& seg);

SegmentDescriptorImpl read_segment_descriptor(const uint8_t*& data);

TimeseriesDescriptor unpack_timeseries_descriptor_from_proto(const google::protobuf::Any& any);

} // namespace arcticdb

#define ARCTICDB_SEGMENT_ENCODER_H_
Expand Down
18 changes: 17 additions & 1 deletion cpp/arcticdb/storage/test/in_memory_store.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,6 @@ namespace arcticdb {
return folly::makeFuture(std::move(components));
}


folly::Future<std::pair<std::variant<arcticdb::entity::AtomKeyImpl, arcticdb::entity::RefKey>, arcticdb::TimeseriesDescriptor>>
read_timeseries_descriptor(const entity::VariantKey& key,
storage::ReadKeyOpts /*opts*/) override {
Expand All @@ -428,6 +427,23 @@ namespace arcticdb {
});
}

folly::Future<std::pair<std::variant<arcticdb::entity::AtomKeyImpl, arcticdb::entity::RefKey>, arcticdb::TimeseriesDescriptor>>
read_timeseries_descriptor_for_incompletes(const entity::VariantKey& key,
storage::ReadKeyOpts /*opts*/) override {
return util::variant_match(key, [&](const AtomKey &ak) {
auto it = seg_by_atom_key_.find(ak);
if (it == seg_by_atom_key_.end())
throw storage::KeyNotFoundException(Composite<VariantKey>(ak));
ARCTICDB_DEBUG(log::storage(), "Mock store read for atom key {}", ak);
auto tsd = it->second->index_descriptor();
tsd.set_stream_descriptor(it->second->descriptor());
return std::make_pair(key, it->second->index_descriptor());
},
[&](const RefKey&) {
util::raise_rte("Not implemented");
return std::make_pair(key, TimeseriesDescriptor{});
});
}

void set_failure_sim(const arcticdb::proto::storage::VersionStoreConfig::StorageFailureSimulator &) override {}

Expand Down
6 changes: 4 additions & 2 deletions cpp/arcticdb/stream/append_map.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ std::pair<TimeseriesDescriptor, std::optional<SegmentInMemory>> get_descriptor_a
auto [key, seg] = store->read_sync(k, opts);
return std::make_pair(seg.index_descriptor(), std::make_optional<SegmentInMemory>(seg));
} else {
auto [key, tsd] = store->read_timeseries_descriptor(k, opts).get();
auto [key, tsd] = store->read_timeseries_descriptor_for_incompletes(k, opts).get();
return std::make_pair(std::move(tsd), std::nullopt);
}
}
Expand Down Expand Up @@ -399,8 +399,10 @@ void append_incomplete_segment(
auto end_index = TimeseriesIndex::end_value_for_segment(seg);
auto seg_row_count = seg.row_count();

auto tsd = pack_timeseries_descriptor(seg.descriptor().clone(), seg_row_count, std::move(next_key), {});
auto desc = stream_descriptor(stream_id, RowCountIndex{}, {});
auto tsd = pack_timeseries_descriptor(desc, seg_row_count, std::move(next_key), {});
seg.set_timeseries_descriptor(tsd);

auto new_key = store->write(
arcticdb::stream::KeyType::APPEND_DATA,
0,
Expand Down
3 changes: 3 additions & 0 deletions cpp/arcticdb/stream/stream_source.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ struct StreamSource {
read_timeseries_descriptor(const entity::VariantKey& key,
storage::ReadKeyOpts opts = storage::ReadKeyOpts{}) = 0;

virtual folly::Future<std::pair<VariantKey, TimeseriesDescriptor>>
read_timeseries_descriptor_for_incompletes(const entity::VariantKey& key,
storage::ReadKeyOpts opts = storage::ReadKeyOpts{}) = 0;

};

Expand Down
90 changes: 90 additions & 0 deletions cpp/arcticdb/version/test/test_sparse.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,96 @@ TEST_F(SparseTestStore, SimpleRoundtrip) {
ASSERT_EQ(val4, 6);
}

// Just write a dummy StreamDescriptor to the TimeseriesDescriptor to avoid duplicating it
// as the same information is available in the segment header. This is how arcticc behaves so the test is important to
// check compat with existing data.
//
// This functions helps to test that readers look for the StreamDescriptor on the header, not in the
// TimeseriesDescriptor.
//
// Compare with `append_incomplete_segment`. Keep this function even if it duplicates `append_incomplete_segment` so
// that we have protection against `append_incomplete_segment` changing how it writes the descriptors in future.
void append_incomplete_segment_backwards_compat(
const std::shared_ptr<arcticdb::Store>& store,
const arcticdb::StreamId& stream_id,
arcticdb::SegmentInMemory &&seg) {
using namespace arcticdb::proto::descriptors;
using namespace arcticdb::stream;

auto [next_key, total_rows] = read_head(store, stream_id);

auto start_index = TimeseriesIndex::start_value_for_segment(seg);
auto end_index = TimeseriesIndex::end_value_for_segment(seg);
auto seg_row_count = seg.row_count();

// Dummy StreamDescriptor
auto desc = stream_descriptor(stream_id, RowCountIndex{}, {});

auto tsd = arcticdb::make_timeseries_descriptor(
seg_row_count,
std::move(desc),
NormalizationMetadata{},
std::nullopt,
std::nullopt,
std::move(next_key),
false);

seg.set_timeseries_descriptor(std::move(tsd));
auto new_key = store->write(
arcticdb::stream::KeyType::APPEND_DATA,
0,
stream_id,
start_index,
end_index,
std::move(seg)).get();

total_rows += seg_row_count;
write_head(store, to_atom(std::move(new_key)), total_rows);
}

TEST_F(SparseTestStore, SimpleRoundtripBackwardsCompat) {
using namespace arcticdb;
using namespace arcticdb::stream;
using DynamicAggregator = Aggregator<TimeseriesIndex, DynamicSchema, stream::NeverSegmentPolicy, stream::SparseColumnPolicy>;
using DynamicSinkWrapper = SinkWrapperImpl<DynamicAggregator>;

const std::string stream_id("test_sparse");

DynamicSinkWrapper wrapper(stream_id, {});
auto& aggregator = wrapper.aggregator_;

aggregator.start_row(timestamp{0})([](auto& rb) {
rb.set_scalar_by_name("first", uint32_t(5), DataType::UINT32);
});

aggregator.start_row(timestamp{1})([](auto& rb) {
rb.set_scalar_by_name("second", uint64_t(6), DataType::UINT64);
});

wrapper.aggregator_.commit();

auto segment = wrapper.segment();
append_incomplete_segment_backwards_compat(test_store_->_test_get_store(), stream_id, std::move(segment));

ReadOptions read_options;
read_options.set_dynamic_schema(true);
read_options.set_incompletes(true);
pipelines::ReadQuery read_query;
read_query.row_filter = universal_range();
auto read_result = test_store_->read_dataframe_version(stream_id, pipelines::VersionQuery{}, read_query, read_options);
const auto& frame =read_result.frame_data.frame();;

ASSERT_EQ(frame.row_count(), 2);
auto val1 = frame.scalar_at<uint32_t>(0, 1);
ASSERT_EQ(val1, 5);
auto val2 = frame.scalar_at<uint64_t>(0, 2);
ASSERT_EQ(val2, 0);
auto val3 = frame.scalar_at<uint32_t>(1, 1);
ASSERT_EQ(val3, 0);
auto val4 = frame.scalar_at<uint64_t>(1, 2);
ASSERT_EQ(val4, 6);
}

TEST_F(SparseTestStore, DenseToSparse) {
using namespace arcticdb;
using namespace arcticdb::stream;
Expand Down

0 comments on commit 8cf8279

Please sign in to comment.