Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement ordered list state for FnApi. #30317

Merged
merged 28 commits into from
Jun 13, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
b978ca7
Add request and response proto messages for ordered list state.
shunping Jan 30, 2024
68abe65
Initial implementation of OrderedListState for fnApi.
shunping Jan 30, 2024
d06e0a5
Discard the use of the value coder in FakeBeamFnStateClient.
shunping Jan 31, 2024
ffd9627
Fix the behavior of pre-existing iterators on local change of state
shunping Feb 14, 2024
f74fc60
Support continuation token for ordered list get request in fake client
shunping Feb 14, 2024
f9e48a3
Add copyright notices to the new files
shunping Feb 14, 2024
a76b523
Add binding for ordered list state in fnapi state accessor
shunping Feb 14, 2024
8a3b719
Clean up comments
shunping Feb 14, 2024
5d7cd5e
Apply spotless and checkStyle to reformat
shunping Feb 14, 2024
33e748d
Add an encode-only coder for the use in the fake client.
shunping Feb 21, 2024
91b770e
Remove request and response messages for ordered list state get.
shunping Mar 4, 2024
4b512ea
Remove request and response messages for ordered list state update
shunping Mar 4, 2024
09db01c
Minor fixes based on feedbacks from reviewers
shunping Mar 4, 2024
44b6679
Apply spotless
shunping Mar 4, 2024
868e493
Use data field in AppendRequest for ordered list state
shunping Mar 5, 2024
ea7ca70
Merge branch 'master' into ordered-list-state-2
shunping Mar 5, 2024
83bac80
Apply spotless
shunping Mar 5, 2024
90b7f7d
Minor renaming of a variable
shunping Mar 6, 2024
8900646
Sync with master and apply proto changes from PR 31092
shunping May 3, 2024
f881b62
Merge branch 'master' into ordered-list-state-2
shunping Jun 6, 2024
361deb0
Create a new coder for TimestampedValue according to the notes in proto.
shunping Jun 6, 2024
7225a89
Address feedback from the reviewer
shunping Jun 7, 2024
10eddad
Apply spotless
shunping Jun 7, 2024
a76034f
Add urn for ordered list state.
shunping Jun 8, 2024
b481880
Add ordered list spec to ParDoTranslation.
shunping Jun 8, 2024
65aef1a
Fix an edge case when async called after clear. Minor fix based on re…
shunping Jun 11, 2024
c158b32
Refactor some variable names. Add a notes on the order of pendingAdds…
shunping Jun 12, 2024
f4c3440
Merge branch 'master' into ordered-list-state-2
shunping Jun 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Apply spotless and checkStyle to reformat
  • Loading branch information
shunping committed Feb 15, 2024
commit 5d7cd5e2e2e6b966ddb47cd78135b58013622d23
Original file line number Diff line number Diff line change
Expand Up @@ -616,13 +616,13 @@ public <T> OrderedListState<T> bindOrderedList(
@Override
public Object apply(StateKey key) {
return new OrderedListState<T>() {
private final OrderedListUserState<T> impl = createOrderedListUserState(
key, elemCoder);
private final OrderedListUserState<T> impl =
createOrderedListUserState(key, elemCoder);

@Override
public void clear() {
shunping marked this conversation as resolved.
Show resolved Hide resolved
clearRange(BoundedWindow.TIMESTAMP_MIN_VALUE,
BoundedWindow.TIMESTAMP_MAX_VALUE);
clearRange(
BoundedWindow.TIMESTAMP_MIN_VALUE, BoundedWindow.TIMESTAMP_MAX_VALUE);
}

@Override
Expand All @@ -648,18 +648,19 @@ public ReadableState<Boolean> readLater() {
@Nullable
@Override
public Iterable<TimestampedValue<T>> read() {
return readRange(BoundedWindow.TIMESTAMP_MIN_VALUE,
BoundedWindow.TIMESTAMP_MAX_VALUE);
return readRange(
BoundedWindow.TIMESTAMP_MIN_VALUE, BoundedWindow.TIMESTAMP_MAX_VALUE);
}

@Override
public GroupingState<TimestampedValue<T>, Iterable<TimestampedValue<T>>> readLater() {
public GroupingState<TimestampedValue<T>, Iterable<TimestampedValue<T>>>
readLater() {
throw new UnsupportedOperationException();
shunping marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
public Iterable<TimestampedValue<T>> readRange(Instant minTimestamp,
Instant limitTimestamp) {
public Iterable<TimestampedValue<T>> readRange(
Instant minTimestamp, Instant limitTimestamp) {
return impl.readRange(minTimestamp, limitTimestamp);
}

Expand All @@ -669,8 +670,8 @@ public void clearRange(Instant minTimestamp, Instant limitTimestamp) {
}

@Override
public OrderedListState<T> readRangeLater(Instant minTimestamp,
Instant limitTimestamp) {
public OrderedListState<T> readRangeLater(
Instant minTimestamp, Instant limitTimestamp) {
throw new UnsupportedOperationException();
}
};
Expand Down Expand Up @@ -925,7 +926,8 @@ private StateKey createMultimapKeysUserStateKey(String stateId) {
return builder.build();
}

private <T> OrderedListUserState<T> createOrderedListUserState(StateKey stateKey, Coder<T> valueCoder) {
private <T> OrderedListUserState<T> createOrderedListUserState(
StateKey stateKey, Coder<T> valueCoder) {
OrderedListUserState<T> rval =
new OrderedListUserState<>(
getCacheFor(stateKey),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,34 +24,25 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NavigableMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.apache.beam.fn.harness.Cache;
import org.apache.beam.fn.harness.Caches;
import org.apache.beam.fn.harness.state.StateFetchingIterators.CachingStateIterable;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.OrderedListEntry;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.OrderedListRange;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.OrderedListStateUpdateRequest;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateClearRequest;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateGetRequest;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateKey;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateResponse;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.OrderedListStateGetRequest;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.OrderedListStateGetResponse;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.OrderedListRange;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.OrderedListEntry;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.fn.stream.PrefetchableIterable;
import org.apache.beam.sdk.fn.stream.PrefetchableIterables;
import org.apache.beam.sdk.util.ByteStringOutputStream;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.sdk.values.TimestampedValue.TimestampedValueCoder;
import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Throwables;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.BoundType;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
Expand Down Expand Up @@ -121,11 +112,12 @@ public Iterable<TimestampedValue<T>> readRange(Instant minTimestamp, Instant lim
// (1) a sort key is added to or removed from pendingAdds, or
// (2) a new value is added to an existing sort key
ArrayList<PrefetchableIterable<TimestampedValue<T>>> pendingAddsInRange = new ArrayList<>();
for (Entry<Instant, Collection<T>> kv : pendingAdds.subMap(minTimestamp,
limitTimestamp).entrySet()) {
pendingAddsInRange.add(PrefetchableIterables.limit(
Iterables.transform(kv.getValue(), (v) -> TimestampedValue.of(v, kv.getKey())),
kv.getValue().size()));
for (Entry<Instant, Collection<T>> kv :
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not super-clear on how Iterables work in Java. Does this actually copy the elements, or is it just wrapping the values in pendingAdds in an Iterable? I'm curious about the statements in the comments saying "the values are kept" so that we can make modifications to the ordered list and not mess up existing Iterables.

Copy link
Contributor Author

@shunping shunping May 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this actually copy the elements, or is it just wrapping the values in pendingAdds in an Iterable?

Iterables in java work like iterables in other languages. Basically, we use it to traverse an existing collection of elements without making a copy of them.

I'm curious about the statements in the comments saying "the values are kept" so that we can make modifications to the ordered list and not mess up existing Iterables.

The PrefetchableIterable interface in Beam is specifically for iterables that support prefetching. The function PrefetchableIterables.limit() is applied on an iterable, resulting in a new iterable, which can be used to traverse its backend collection until the number of elements reaches a preset limit.

Here is the tricky part.

  • Note that pendingAddsInRange is a list of iterables. For each of these iterables, when an iteration is performed at any time after its creation, the result will be truncated to the last element we see at the time we initialize the iterable with PrefetchableIterables.limit(). In other words, appending a new element to the end of an existing key of pendingAdds would not affect the outcome above, because the iterable in pendingAddsInRange is designed to be truncated to the size when it is created.
  • When we add a new key to pendingAdds, it won't change the outcome of traversing an pre-existing pendingAddsInRange either, because the iterable of the new key is not included in pendingAddsInRange.
  • When we delete an existing key from pendingAdds, we remove the key-value mapping, but not touch its value (i.e. the collection of elements). Therefore, the iterables in pre-existing pendingAddsInRange will still work as the backend collection is accessible. Only when no iterables reference these collections will they be garbage collected.

pendingAdds.subMap(minTimestamp, limitTimestamp).entrySet()) {
pendingAddsInRange.add(
PrefetchableIterables.limit(
Iterables.transform(kv.getValue(), (v) -> TimestampedValue.of(v, kv.getKey())),
kv.getValue().size()));
}
Iterable<TimestampedValue<T>> valuesInRange = Iterables.concat(pendingAddsInRange);

Expand All @@ -140,18 +132,22 @@ public Iterable<TimestampedValue<T>> readRange(Instant minTimestamp, Instant lim
// TODO: consider use cache here
CachingStateIterable<TimestampedValue<T>> persistentValues =
StateFetchingIterators.readAllAndDecodeStartingFrom(
Caches.noop(), this.beamFnStateClient, getRequestBuilder.build(),
Caches.noop(),
this.beamFnStateClient,
getRequestBuilder.build(),
this.timestampedValueCoder);

// Make a snapshot of the current pendingRemoves and use them to filter persistent values.
// The values of pendingRemoves are kept, so that they will still be accessible in
shunping marked this conversation as resolved.
Show resolved Hide resolved
// pre-existing iterables even after a sort key is removed.
TreeRangeSet<Instant> pendingRemovesSnapshot = TreeRangeSet.create(pendingRemoves);
Iterable<TimestampedValue<T>> persistentValuesAfterRemoval =
Iterables.filter(persistentValues, v -> !pendingRemovesSnapshot.contains(v.getTimestamp()));
Iterables.filter(
persistentValues, v -> !pendingRemovesSnapshot.contains(v.getTimestamp()));

return Iterables.mergeSorted(ImmutableList.of(persistentValuesAfterRemoval,
valuesInRange), Comparator.comparing(TimestampedValue::getTimestamp));
return Iterables.mergeSorted(
ImmutableList.of(persistentValuesAfterRemoval, valuesInRange),
Comparator.comparing(TimestampedValue::getTimestamp));
}

return valuesInRange;
Expand All @@ -176,8 +172,10 @@ public void clearRange(Instant minTimestamp, Instant limitTimestamp) {
// The old values of the removed sub map are kept, so that they will still be accessible in
// pre-existing iterables even after the sort key is cleared.
pendingAdds.subMap(minTimestamp, true, limitTimestamp, false).clear();
shunping marked this conversation as resolved.
Show resolved Hide resolved
if (!isCleared)
pendingRemoves.add(Range.range(minTimestamp, BoundType.CLOSED, limitTimestamp, BoundType.OPEN));
if (!isCleared) {
shunping marked this conversation as resolved.
Show resolved Hide resolved
pendingRemoves.add(
Range.range(minTimestamp, BoundType.CLOSED, limitTimestamp, BoundType.OPEN));
}
}

public void clear() {
Expand All @@ -197,23 +195,30 @@ public void clear() {
public void asyncClose() throws Exception {
isClosed = true;

OrderedListStateUpdateRequest.Builder updateRequestBuilder = OrderedListStateUpdateRequest.newBuilder();
OrderedListStateUpdateRequest.Builder updateRequestBuilder =
OrderedListStateUpdateRequest.newBuilder();
if (!pendingRemoves.isEmpty()) {
shunping marked this conversation as resolved.
Show resolved Hide resolved
shunping marked this conversation as resolved.
Show resolved Hide resolved
updateRequestBuilder
.addAllDeletes(Iterables.transform(pendingRemoves.asRanges(),
(r) -> OrderedListRange.newBuilder()
.setStart(r.lowerEndpoint().getMillis())
.setEnd(r.upperEndpoint().getMillis()).build()));
updateRequestBuilder.addAllDeletes(
Iterables.transform(
pendingRemoves.asRanges(),
(r) ->
OrderedListRange.newBuilder()
.setStart(r.lowerEndpoint().getMillis())
.setEnd(r.upperEndpoint().getMillis())
.build()));
pendingRemoves.clear();
}

if (!pendingAdds.isEmpty()) {
for (Entry<Instant, Collection<T>> entry : pendingAdds.entrySet()) {
updateRequestBuilder
.addAllInserts(Iterables.transform(entry.getValue(),
(v) -> OrderedListEntry.newBuilder()
.setSortKey(entry.getKey().getMillis())
.setData(encodeValue(v)).build()));
updateRequestBuilder.addAllInserts(
Iterables.transform(
entry.getValue(),
(v) ->
OrderedListEntry.newBuilder()
.setSortKey(entry.getKey().getMillis())
.setData(encodeValue(v))
.build()));
}
pendingAdds.clear();
}
Expand All @@ -223,8 +228,9 @@ public void asyncClose() throws Exception {
stateRequest.setOrderedListUpdate(updateRequestBuilder);

CompletableFuture<StateResponse> response = beamFnStateClient.handle(stateRequest);
if (!response.get().getError().isEmpty())
if (!response.get().getError().isEmpty()) {
throw new IllegalStateException(response.get().getError());
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import org.apache.beam.fn.harness.state.StateFetchingIterators.CachingStateIterable.Blocks;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateGetRequest;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest.RequestCase;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateResponse;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.fn.data.WeightedList;
Expand Down Expand Up @@ -579,7 +578,8 @@ public CompletableFuture<StateResponse> loadPrefetchedResponse(ByteString contin
stateRequestForFirstChunk
.toBuilder()
.setOrderedListGet(
stateRequestForFirstChunk.getOrderedListGet()
stateRequestForFirstChunk
.getOrderedListGet()
.toBuilder()
.setContinuationToken(continuationToken)));
} else {
Expand Down Expand Up @@ -618,10 +618,11 @@ public ByteString next() {
prefetchedResponse = null;

ByteString tokenFromResponse;
if (stateRequestForFirstChunk.getStateKey().getTypeCase() == ORDERED_LIST_USER_STATE)
if (stateRequestForFirstChunk.getStateKey().getTypeCase() == ORDERED_LIST_USER_STATE) {
tokenFromResponse = stateResponse.getOrderedListGet().getContinuationToken();
else
} else {
tokenFromResponse = stateResponse.getGet().getContinuationToken();
}

// If the continuation token is empty, that means we have reached EOF.
if (ByteString.EMPTY.equals(tokenFromResponse)) {
Expand All @@ -632,10 +633,11 @@ public ByteString next() {
}

ByteString ret;
if (stateRequestForFirstChunk.getStateKey().getTypeCase() == ORDERED_LIST_USER_STATE)
if (stateRequestForFirstChunk.getStateKey().getTypeCase() == ORDERED_LIST_USER_STATE) {
ret = stateResponse.getOrderedListGet().getData();
else
} else {
ret = stateResponse.getGet().getData();
}
return ret;
}
}
Expand Down
Loading
Loading