Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement ordered list state for FnApi. #30317

Merged
merged 28 commits into from
Jun 13, 2024

Conversation

shunping
Copy link
Contributor

@shunping shunping commented Feb 14, 2024

First attempt to implement ordered list state for FnApi. Notice that caching has not been implemented yet.

Reference:
https://docs.google.com/document/d/1U77sAvE6Iy9XsVruRYHxPdFji7nqS6HPi1XU8fhyrxs/


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

@shunping shunping marked this pull request as draft February 14, 2024 20:49
@shunping
Copy link
Contributor Author

Run Python PreCommit 3.9

Copy link
Contributor

@scwhittle scwhittle left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't get yet to implementation/test

// A response to the get state request for an ordered list.
message OrderedListStateGetResponse {
bytes continuation_token = 1;
bytes data = 2;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how do we return multiple elements if the request was a range?

should we return the sort-key for elements, that seems like part of the user-data for example if it's some id/timestamp the user might want it back instead of having to duplicate it in the payload as well.

should this be repeated OrderedListEntry instead of data?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For efficiency reasons we have generally not split individual elements up into individual field protos, and provided them as contiguous bytes of data. Makes sense to do that here. But the coding should be specified (e.g. is this a concatenation of several encoded KV<sort-key, value>s?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also consider if there are considerations in the encoding of sort-key. If they're (typically) timestamps, bigendian might be preferable to varint.

Copy link
Contributor Author

@shunping shunping Feb 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how do we return multiple elements if the request was a range?

We concatenate the encoded entries in a byte array and send them back in chunks with corresponding continuation token.

should this be repeated OrderedListEntry instead of data?

I have considered this option. Besides the efficiency reason @robertwb mentioned, I also find that representing the response as bytes has an advantage of reusing the existing code in https://github.com/apache/beam/blob/3693174c0421d0ff049042ca283db633431892ef/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/StateFetchingIterators.java. This iterator is used to parsed the returned data block (in not only OrderedListState but also Multimap, Bag, etc) and it supports blocks even if the boundary is not aligned with entries/elements. I think this is not achievable with OrderedListEntry.

But the coding should be specified (e.g. is this a concatenation of several encoded KV<sort-key, value>s

That's right. It is basically the concatenation of encoded TimestampedValue, since TimestampedValue is already in use in the sdk interface of OrderedListState

extends GroupingState<TimestampedValue<T>, Iterable<TimestampedValue<T>>> {

// timestamp range is not specified, the runner should use [MIN_INT64,
// MAX_INT64) by default.
message OrderedListStateGetRequest {
bytes continuation_token = 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment on continuation token, ie should be returned by previous response and the key/range should match the previous request generating the token

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to require returning the range if there's a continuation token involved? Or should they be mutually exclusive?

Copy link
Contributor Author

@shunping shunping Feb 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment on continuation token ...

Good idea! I will add that when I revise the code.

Do we want to require returning the range if there's a continuation token involved? Or should they be mutually exclusive?

I don't think the range is a hard requirement.
In fact in my simple implementation of the fake client, I put the current sort key and the current block index into the continuation token:

// The continuation format here is the sort key (long) followed by an index (int)
.
In other implementation, you may need the range, but I think it is implementation-dependent. That's why I hesitate to put a range/sort key as a separate field when continuation token is present.

I thought the continuation token should allow the runner to uniquely identify where to find the next block of data. Is that what you mean by "mutually exclusive"?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's what I meant by mutually exclusive. If we allow providing both, we have to figure out what to do when they disagree, and there's no good usecase for that.

// timestamp range is not specified, the runner should use [MIN_INT64,
// MAX_INT64) by default.
message OrderedListStateGetRequest {
bytes continuation_token = 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to require returning the range if there's a continuation token involved? Or should they be mutually exclusive?

// A response to the get state request for an ordered list.
message OrderedListStateGetResponse {
bytes continuation_token = 1;
bytes data = 2;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For efficiency reasons we have generally not split individual elements up into individual field protos, and provided them as contiguous bytes of data. Makes sense to do that here. But the coding should be specified (e.g. is this a concatenation of several encoded KV<sort-key, value>s?


// A request to update an ordered list
message OrderedListStateUpdateRequest {
// when the request is processed, deletes should always happen before inserts.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we instead split this up into two separate requests, e.g. OrderedListStateInsertRequest and OrderedListStateDeleteRequest?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussion 2 in the design doc talked about this, but there is no conclusion on that https://docs.google.com/document/d/1U77sAvE6Iy9XsVruRYHxPdFji7nqS6HPi1XU8fhyrxs/

I am fine with splitting them though.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be helpful to outline the pro/con in the design doc of little decisions like, and note which one was chosen and why.

For example one benefit to splitting the requests is to avoid ordering issues. We would have to spec that either the inserts or deletes happen first, even though they are in one request together. It is a bit confusing. And then if you want them in the other order, you still have to make two requests but each one has an empty field.

And note whether there is an efficiency consideration.

Copy link
Contributor Author

@shunping shunping Feb 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep. I am planning to add an addendum to the original design doc to summarize the decisions we make here. We should have that after this round of review completes.

// A response to the get state request for an ordered list.
message OrderedListStateGetResponse {
bytes continuation_token = 1;
bytes data = 2;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also consider if there are considerations in the encoding of sort-key. If they're (typically) timestamps, bigendian might be preferable to varint.

* The range information is placed in the state key of ordered list
* For consistency, we reuse the existing get request and response
  mesasages of other states like Bag, MultiMap, etc.
* Reuse existing messages of clear and append.
* Replace String::size() > 0 with String::isEmpty()
* Return this in readLater and readRangeLater instead of throwing
  an exception
* Remove the added SupressWarnings("unchecked")
Previously, we used a repeated OrderedListEntry field in the
AppendRequest particularly for ordered list state. For consistency,
we now get rid of that and use the same data field as other states.
@shunping shunping marked this pull request as ready for review March 6, 2024 16:42
Copy link
Contributor

github-actions bot commented Mar 6, 2024

Assigning reviewers. If you would like to opt out of this review, comment assign to next reviewer:

R: @robertwb for label java.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

@shunping
Copy link
Contributor Author

shunping commented Mar 6, 2024

Run Java PreCommit

@github-actions github-actions bot added the model label Jun 8, 2024
@@ -47,6 +60,7 @@ public class FakeBeamFnStateClient implements BeamFnStateClient {
private static final int DEFAULT_CHUNK_SIZE = 6;
private final Map<StateKey, List<ByteString>> data;
private int currentId;
private final Map<StateKey, NavigableSet<Long>> orderedListSortKeysFromStateKey;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like a pretty inefficient data structure to use. You might look at SortedSet (which requires adding a deduplicating id) unless we can use SortedMultiset from Guava. This would allow us to do things like remove all elements in a range without first copying over all keys in the range and then removing them one by one (which requires a hash each time).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I see this is part of the "fake" client, so maybe we don't care as much about performance?

Copy link
Contributor Author

@shunping shunping Jun 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The structure is used to support multiple ordered list states within one DoFn.

For example, we can have order list state A and B, which have different StateKeys. The outer map is only used once when looking up the right ordered list.

Then under each state key, the NavigableSet, which is internally initialized with a TreeSet, can support removing a range of keys efficiently.


StateKey.Builder keyBuilder = key.toBuilder();

// clear the range in the state key before using it as a key to store, because ordered list
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand this comment. Are we trying to handle the case where the same state key shows up in initialData multiple times?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The initialData is a list of entries to be stored in the ordered list, which is an idea borrowed from the MultimapUserStateTest.

An entry is specified by the value and a state key (including the name of the state and the range this entry will fall in). Multiple entries may belong to the same state because only the name of the state is the identifier.

When we need to put these entries in the internal structure, we will remove the range field in the state key so that entries of the same state identifier will be put into one bucket.

@Override
public void verifyDeterministic() throws NonDeterministicException {
verifyDeterministic(
this, "TimestampedValueCoder requires a deterministic valueCoder", valueCoder);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious, why is this? It seems like we have no way of overwriting an individual element in the ordered list, so if it gets coded differently it wouldn't matter... It seems weird to be using non-deterministic coders anyway, but how would it affect correctness here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The coder here is based on the one defined at

public void verifyDeterministic() throws NonDeterministicException {
.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just part of the contract that coders must define, though I agree it's not something that would be likely to get leaked to somewhere that it matters...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this first version of implementation. I prefer keeping it there so it is consistent with the existing TimeStampedValueCoder defined in TimestampedValue.java. We can surely improve that later.

Copy link

codecov bot commented Jun 11, 2024

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 71.13%. Comparing base (8a64641) to head (b481880).
Report is 1 commits behind head on master.

Current head b481880 differs from pull request most recent head f4c3440

Please upload reports for the commit f4c3440 to get more accurate results.

Additional details and impacted files
@@             Coverage Diff              @@
##             master   #30317      +/-   ##
============================================
- Coverage     71.18%   71.13%   -0.06%     
  Complexity     3014     3014              
============================================
  Files          1058     1055       -3     
  Lines        134076   133466     -610     
  Branches       3254     3254              
============================================
- Hits          95437    94935     -502     
+ Misses        35498    35400      -98     
+ Partials       3141     3131      -10     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
public class OrderedListUserStateTest {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there also pipeline-level tests that we can now enable for portable runners (and, once it's implemented, Dataflow)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have pipeline level tests for states including OrderedListState at java/org/apache/beam/sdk/transforms/ParDoTest.java.

@Override
public void verifyDeterministic() throws NonDeterministicException {
verifyDeterministic(
this, "TimestampedValueCoder requires a deterministic valueCoder", valueCoder);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just part of the contract that coders must define, though I agree it's not something that would be likely to get leaked to somewhere that it matters...


public void add(TimestampedValue<T> value) {
checkState(
!isClosed,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When would the user see a closed state? Is it if they held onto it after the bundle closed or something like that?

Copy link
Contributor Author

@shunping shunping Jun 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The way I implement the closed state in OrderedListUserState is consistent with the other states. The flag isClosed is set when the class method asyncClose() is called.

This class method is registered in stateFinalizers at FnApiStateAccessor.java

It is then called after each processElement in FnApiDoFnRunner.java

doFnInvoker.invokeGetInitialWatermarkEstimatorState(processContext)))));
} finally {
currentElement = null;
currentRestriction = null;
}
this.stateAccessor.finalizeState();
.

OrderedListRange.newBuilder().setStart(sortKey).setEnd(sortKey + 1).build()))
.build();
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You've covered a lot of cases here, but this seems like the kind of thing where it'd be good to have fuzz tests too. Say you do dozens of random interspersed reads, writes, and clears. You could do the same thing on a naive List (as long as you kept your range small enough) and verify you got the same result. (Make sure you record a seed so you can reproduce failures.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good idea. Maybe we can do that in a follow-up PR?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. Please file an issue and assign it to yourself.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Opened a issue: #31592.

Thanks @robertwb!

private NavigableMap<Instant, Collection<T>> pendingAdds = Maps.newTreeMap();
private TreeRangeSet<Instant> pendingRemoves = TreeRangeSet.create();

private boolean isCleared = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this represent, as one can clear various sub-ranges.

Copy link
Contributor Author

@shunping shunping Jun 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a flag when a user calls the class method clear() to wipe out all elements in the order list. In this case, we have some special code path, as any previously persistent storage for this state will be cleared once async_close() is invoked.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be good to add that to a comment for this variable.

@shunping
Copy link
Contributor Author

Run Java PreCommit

@shunping
Copy link
Contributor Author

Run Java_IOs_Direct PreCommit

@shunping
Copy link
Contributor Author

Run Go PreCommit

OrderedListRange.newBuilder().setStart(sortKey).setEnd(sortKey + 1).build()))
.build();
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. Please file an issue and assign it to yourself.

Copy link
Contributor

@damccorm damccorm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

@damccorm damccorm merged commit af31d35 into apache:master Jun 13, 2024
112 of 113 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants