Skip to content

Commit

Permalink
Apply spotless to reformat
Browse files Browse the repository at this point in the history
  • Loading branch information
shunping committed Feb 14, 2024
1 parent 8a3b719 commit ac40716
Show file tree
Hide file tree
Showing 5 changed files with 341 additions and 261 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -616,13 +616,13 @@ public <T> OrderedListState<T> bindOrderedList(
@Override
public Object apply(StateKey key) {
return new OrderedListState<T>() {
private final OrderedListUserState<T> impl = createOrderedListUserState(
key, elemCoder);
private final OrderedListUserState<T> impl =
createOrderedListUserState(key, elemCoder);

@Override
public void clear() {
clearRange(BoundedWindow.TIMESTAMP_MIN_VALUE,
BoundedWindow.TIMESTAMP_MAX_VALUE);
clearRange(
BoundedWindow.TIMESTAMP_MIN_VALUE, BoundedWindow.TIMESTAMP_MAX_VALUE);
}

@Override
Expand All @@ -648,18 +648,19 @@ public ReadableState<Boolean> readLater() {
@Nullable
@Override
public Iterable<TimestampedValue<T>> read() {
return readRange(BoundedWindow.TIMESTAMP_MIN_VALUE,
BoundedWindow.TIMESTAMP_MAX_VALUE);
return readRange(
BoundedWindow.TIMESTAMP_MIN_VALUE, BoundedWindow.TIMESTAMP_MAX_VALUE);
}

@Override
public GroupingState<TimestampedValue<T>, Iterable<TimestampedValue<T>>> readLater() {
public GroupingState<TimestampedValue<T>, Iterable<TimestampedValue<T>>>
readLater() {
throw new UnsupportedOperationException();
}

@Override
public Iterable<TimestampedValue<T>> readRange(Instant minTimestamp,
Instant limitTimestamp) {
public Iterable<TimestampedValue<T>> readRange(
Instant minTimestamp, Instant limitTimestamp) {
return impl.readRange(minTimestamp, limitTimestamp);
}

Expand All @@ -669,8 +670,8 @@ public void clearRange(Instant minTimestamp, Instant limitTimestamp) {
}

@Override
public OrderedListState<T> readRangeLater(Instant minTimestamp,
Instant limitTimestamp) {
public OrderedListState<T> readRangeLater(
Instant minTimestamp, Instant limitTimestamp) {
throw new UnsupportedOperationException();
}
};
Expand Down Expand Up @@ -925,7 +926,8 @@ private StateKey createMultimapKeysUserStateKey(String stateId) {
return builder.build();
}

private <T> OrderedListUserState<T> createOrderedListUserState(StateKey stateKey, Coder<T> valueCoder) {
private <T> OrderedListUserState<T> createOrderedListUserState(
StateKey stateKey, Coder<T> valueCoder) {
OrderedListUserState<T> rval =
new OrderedListUserState<>(
getCacheFor(stateKey),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,34 +24,25 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NavigableMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.apache.beam.fn.harness.Cache;
import org.apache.beam.fn.harness.Caches;
import org.apache.beam.fn.harness.state.StateFetchingIterators.CachingStateIterable;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.OrderedListEntry;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.OrderedListRange;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.OrderedListStateUpdateRequest;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateClearRequest;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateGetRequest;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateKey;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateResponse;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.OrderedListStateGetRequest;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.OrderedListStateGetResponse;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.OrderedListRange;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.OrderedListEntry;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.fn.stream.PrefetchableIterable;
import org.apache.beam.sdk.fn.stream.PrefetchableIterables;
import org.apache.beam.sdk.util.ByteStringOutputStream;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.sdk.values.TimestampedValue.TimestampedValueCoder;
import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Throwables;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.BoundType;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
Expand Down Expand Up @@ -121,11 +112,12 @@ public Iterable<TimestampedValue<T>> readRange(Instant minTimestamp, Instant lim
// (1) a sort key is added to or removed from pendingAdds, or
// (2) a new value is added to an existing sort key
ArrayList<PrefetchableIterable<TimestampedValue<T>>> pendingAddsInRange = new ArrayList<>();
for (Entry<Instant, Collection<T>> kv : pendingAdds.subMap(minTimestamp,
limitTimestamp).entrySet()) {
pendingAddsInRange.add(PrefetchableIterables.limit(
Iterables.transform(kv.getValue(), (v) -> TimestampedValue.of(v, kv.getKey())),
kv.getValue().size()));
for (Entry<Instant, Collection<T>> kv :
pendingAdds.subMap(minTimestamp, limitTimestamp).entrySet()) {
pendingAddsInRange.add(
PrefetchableIterables.limit(
Iterables.transform(kv.getValue(), (v) -> TimestampedValue.of(v, kv.getKey())),
kv.getValue().size()));
}
Iterable<TimestampedValue<T>> valuesInRange = Iterables.concat(pendingAddsInRange);

Expand All @@ -140,18 +132,22 @@ public Iterable<TimestampedValue<T>> readRange(Instant minTimestamp, Instant lim
// TODO: consider use cache here
CachingStateIterable<TimestampedValue<T>> persistentValues =
StateFetchingIterators.readAllAndDecodeStartingFrom(
Caches.noop(), this.beamFnStateClient, getRequestBuilder.build(),
Caches.noop(),
this.beamFnStateClient,
getRequestBuilder.build(),
this.timestampedValueCoder);

// Make a snapshot of the current pendingRemoves and use them to filter persistent values.
// The values of pendingRemoves are kept, so that they will still be accessible in
// pre-existing iterables even after a sort key is removed.
TreeRangeSet<Instant> pendingRemovesSnapshot = TreeRangeSet.create(pendingRemoves);
Iterable<TimestampedValue<T>> persistentValuesAfterRemoval =
Iterables.filter(persistentValues, v -> !pendingRemovesSnapshot.contains(v.getTimestamp()));
Iterables.filter(
persistentValues, v -> !pendingRemovesSnapshot.contains(v.getTimestamp()));

return Iterables.mergeSorted(ImmutableList.of(persistentValuesAfterRemoval,
valuesInRange), Comparator.comparing(TimestampedValue::getTimestamp));
return Iterables.mergeSorted(
ImmutableList.of(persistentValuesAfterRemoval, valuesInRange),
Comparator.comparing(TimestampedValue::getTimestamp));
}

return valuesInRange;
Expand All @@ -177,7 +173,8 @@ public void clearRange(Instant minTimestamp, Instant limitTimestamp) {
// pre-existing iterables even after the sort key is cleared.
pendingAdds.subMap(minTimestamp, true, limitTimestamp, false).clear();
if (!isCleared)
pendingRemoves.add(Range.range(minTimestamp, BoundType.CLOSED, limitTimestamp, BoundType.OPEN));
pendingRemoves.add(
Range.range(minTimestamp, BoundType.CLOSED, limitTimestamp, BoundType.OPEN));
}

public void clear() {
Expand All @@ -197,23 +194,30 @@ public void clear() {
public void asyncClose() throws Exception {
isClosed = true;

OrderedListStateUpdateRequest.Builder updateRequestBuilder = OrderedListStateUpdateRequest.newBuilder();
OrderedListStateUpdateRequest.Builder updateRequestBuilder =
OrderedListStateUpdateRequest.newBuilder();
if (!pendingRemoves.isEmpty()) {
updateRequestBuilder
.addAllDeletes(Iterables.transform(pendingRemoves.asRanges(),
(r) -> OrderedListRange.newBuilder()
.setStart(r.lowerEndpoint().getMillis())
.setEnd(r.upperEndpoint().getMillis()).build()));
updateRequestBuilder.addAllDeletes(
Iterables.transform(
pendingRemoves.asRanges(),
(r) ->
OrderedListRange.newBuilder()
.setStart(r.lowerEndpoint().getMillis())
.setEnd(r.upperEndpoint().getMillis())
.build()));
pendingRemoves.clear();
}

if (!pendingAdds.isEmpty()) {
for (Entry<Instant, Collection<T>> entry : pendingAdds.entrySet()) {
updateRequestBuilder
.addAllInserts(Iterables.transform(entry.getValue(),
(v) -> OrderedListEntry.newBuilder()
.setSortKey(entry.getKey().getMillis())
.setData(encodeValue(v)).build()));
updateRequestBuilder.addAllInserts(
Iterables.transform(
entry.getValue(),
(v) ->
OrderedListEntry.newBuilder()
.setSortKey(entry.getKey().getMillis())
.setData(encodeValue(v))
.build()));
}
pendingAdds.clear();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import org.apache.beam.fn.harness.state.StateFetchingIterators.CachingStateIterable.Blocks;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateGetRequest;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest.RequestCase;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateResponse;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.fn.data.WeightedList;
Expand Down Expand Up @@ -579,7 +578,8 @@ public CompletableFuture<StateResponse> loadPrefetchedResponse(ByteString contin
stateRequestForFirstChunk
.toBuilder()
.setOrderedListGet(
stateRequestForFirstChunk.getOrderedListGet()
stateRequestForFirstChunk
.getOrderedListGet()
.toBuilder()
.setContinuationToken(continuationToken)));
} else {
Expand Down Expand Up @@ -620,8 +620,7 @@ public ByteString next() {
ByteString tokenFromResponse;
if (stateRequestForFirstChunk.getStateKey().getTypeCase() == ORDERED_LIST_USER_STATE)
tokenFromResponse = stateResponse.getOrderedListGet().getContinuationToken();
else
tokenFromResponse = stateResponse.getGet().getContinuationToken();
else tokenFromResponse = stateResponse.getGet().getContinuationToken();

// If the continuation token is empty, that means we have reached EOF.
if (ByteString.EMPTY.equals(tokenFromResponse)) {
Expand All @@ -634,8 +633,7 @@ public ByteString next() {
ByteString ret;
if (stateRequestForFirstChunk.getStateKey().getTypeCase() == ORDERED_LIST_USER_STATE)
ret = stateResponse.getOrderedListGet().getData();
else
ret = stateResponse.getGet().getData();
else ret = stateResponse.getGet().getData();
return ret;
}
}
Expand Down
Loading

0 comments on commit ac40716

Please sign in to comment.