Skip to content

Commit

Permalink
Bugfix 1609: Allow parallel appends when incomplete index values matc…
Browse files Browse the repository at this point in the history
…h last existing index value in the symbol (#1619)

#### Reference Issues/PRs
Closes #1609
  • Loading branch information
alexowens90 authored Jun 12, 2024
1 parent 0a347e8 commit 357e6b7
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 2 deletions.
4 changes: 2 additions & 2 deletions cpp/arcticdb/version/version_core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -859,8 +859,8 @@ void check_incompletes_index_ranges_dont_overlap(const std::shared_ptr<PipelineC
std::set<TimestampRange> unique_timestamp_ranges;
for (auto it = pipeline_context->incompletes_begin(); it!= pipeline_context->end(); it++) {
sorting::check<ErrorCode::E_UNSORTED_DATA>(
!last_existing_index_value.has_value() || it->slice_and_key().key().start_time() > *last_existing_index_value,
"Cannot append staged segments to existing data as incomplete segment contains index value <= existing data: {} <= {}",
!last_existing_index_value.has_value() || it->slice_and_key().key().start_time() >= *last_existing_index_value,
"Cannot append staged segments to existing data as incomplete segment contains index value < existing data: {} <= {}",
it->slice_and_key().key().start_time(),
*last_existing_index_value);
unique_timestamp_ranges.insert({it->slice_and_key().key().start_time(), it->slice_and_key().key().end_time()});
Expand Down
15 changes: 15 additions & 0 deletions python/tests/unit/arcticdb/version_store/test_append.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,21 @@ def test_append_not_sorted_exception(lmdb_version_store):
lmdb_version_store.append(symbol, df2, validate_index=True)


@pytest.mark.parametrize("validate_index", (True, False))
def test_append_same_index_value(lmdb_version_store_v1, validate_index):
lib = lmdb_version_store_v1
sym = "test_append_same_index_value"
df_0 = pd.DataFrame({"col": [1, 2]}, index=pd.date_range("2024-01-01", periods=2))
lib.write(sym, df_0)

df_1 = pd.DataFrame({"col": [3, 4]}, index=pd.date_range(df_0.index[-1], periods=2))
lib.append(sym, df_1, validate_index=validate_index)
expected = pd.concat([df_0, df_1])
received = lib.read(sym).data
assert_frame_equal(expected, received)
assert lib.get_info(sym)["sorted"] == "ASCENDING"


def test_append_existing_not_sorted_exception(lmdb_version_store):
symbol = "bad_append"

Expand Down
42 changes: 42 additions & 0 deletions python/tests/unit/arcticdb/version_store/test_parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,33 @@ def test_parallel_non_timestamp_index(lmdb_version_store, append):
assert_frame_equal(expected_df, read_df)


@pytest.mark.parametrize("append", (True, False))
def test_parallel_all_same_index_values(lmdb_version_store, append):
lib = lmdb_version_store
sym = "test_parallel_all_same_index_values"
if append:
df_0 = pd.DataFrame({"col": [1, 2]}, index=[pd.Timestamp("2024-01-01"), pd.Timestamp("2024-01-01")])
lib.write(sym, df_0)
df_1 = pd.DataFrame({"col": [3, 4]}, index=[pd.Timestamp("2024-01-02"), pd.Timestamp("2024-01-02")])
df_2 = pd.DataFrame({"col": [5, 6]}, index=[pd.Timestamp("2024-01-02"), pd.Timestamp("2024-01-02")])
if append:
lib.append(sym, df_2, incomplete=True)
lib.append(sym, df_1, incomplete=True)
else:
lib.write(sym, df_2, parallel=True)
lib.write(sym, df_1, parallel=True)
lib.compact_incomplete(sym, append, False)
received = lib.read(sym).data
# Index values in incompletes all the same, so order of values in col could be [3, 4, 5, 6] or [5, 6, 3, 4]
if append:
expected = pd.concat([df_0, df_1, df_2]) if received["col"][2] == 3 else pd.concat([df_0, df_2, df_1])
assert_frame_equal(expected, received)
else:
expected = pd.concat([df_1, df_2]) if received["col"][0] == 3 else pd.concat([df_2, df_1])
assert_frame_equal(expected, received)
assert lib.get_info(sym)["sorted"] == "ASCENDING"


@pytest.mark.parametrize("append", (True, False))
def test_parallel_overlapping_incomplete_segments(lmdb_version_store, append):
lib = lmdb_version_store
Expand All @@ -360,6 +387,21 @@ def test_parallel_overlapping_incomplete_segments(lmdb_version_store, append):
lib.compact_incomplete(sym, append, False)


def test_parallel_append_exactly_matches_existing(lmdb_version_store):
lib = lmdb_version_store
sym = "test_parallel_append_exactly_matches_existing"
df_0 = pd.DataFrame({"col": [1, 2]}, index=[pd.Timestamp("2024-01-01"), pd.Timestamp("2024-01-02")])
lib.write(sym, df_0)
df_1 = pd.DataFrame({"col": [3, 4]}, index=[pd.Timestamp("2024-01-02"), pd.Timestamp("2024-01-03")])
lib.append(sym, df_1, incomplete=True)
lib.compact_incomplete(sym, True, False)
expected = pd.concat([df_0, df_1])
received = lib.read(sym).data
assert_frame_equal(expected, received)
assert lib.get_info(sym)["sorted"] == "ASCENDING"



def test_parallel_append_overlapping_with_existing(lmdb_version_store):
lib = lmdb_version_store
sym = "test_parallel_append_overlapping_with_existing"
Expand Down

0 comments on commit 357e6b7

Please sign in to comment.