Skip to content

Commit

Permalink
[FLINK-21151] Pass whole resources instead of fields in RocksFullSnap…
Browse files Browse the repository at this point in the history
…shotStrategy

We also move the fillMetaData() method and hide it in the resources.
  • Loading branch information
aljoscha committed Jan 29, 2021
1 parent 033e3e3 commit dfb0275
Showing 1 changed file with 34 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -153,10 +153,7 @@ public SnapshotResultSupplier<KeyedStateHandle> asyncSnapshot(
checkpointId, checkpointStreamFactory, checkpointOptions);

return new SnapshotAsynchronousPartCallable(
checkpointStreamSupplier,
fullRocksDBSnapshotResources.snapshot,
fullRocksDBSnapshotResources.stateMetaInfoSnapshots,
fullRocksDBSnapshotResources.metaDataCopy);
checkpointStreamSupplier, fullRocksDBSnapshotResources);
}

@Override
Expand Down Expand Up @@ -198,24 +195,16 @@ private class SnapshotAsynchronousPartCallable
checkpointStreamSupplier;

/** RocksDB snapshot. */
@Nonnull private final Snapshot snapshot;

@Nonnull private final List<StateMetaInfoSnapshot> stateMetaInfoSnapshots;

@Nonnull private final List<MetaData> metaData;
@Nonnull private final FullRocksDBSnapshotResources snapshotResources;

SnapshotAsynchronousPartCallable(
@Nonnull
SupplierWithException<CheckpointStreamWithResultProvider, Exception>
checkpointStreamSupplier,
@Nonnull Snapshot snapshot,
@Nonnull List<StateMetaInfoSnapshot> stateMetaInfoSnapshots,
@Nonnull List<RocksDbKvStateInfo> metaDataCopy) {
@Nonnull FullRocksDBSnapshotResources snapshotResources) {

this.checkpointStreamSupplier = checkpointStreamSupplier;
this.snapshot = snapshot;
this.stateMetaInfoSnapshots = stateMetaInfoSnapshots;
this.metaData = fillMetaData(metaDataCopy);
this.snapshotResources = snapshotResources;
}

@Override
Expand Down Expand Up @@ -253,7 +242,7 @@ private void writeSnapshotToOutputStream(
final ReadOptions readOptions = new ReadOptions();

try {
readOptions.setSnapshot(snapshot);
readOptions.setSnapshot(snapshotResources.snapshot);
kvStateIterators = createKVStateIterators(readOptions);

writeKVStateData(
Expand All @@ -272,6 +261,8 @@ private void writeSnapshotToOutputStream(

private List<Tuple2<RocksIteratorWrapper, Integer>> createKVStateIterators(
ReadOptions readOptions) {

List<MetaData> metaData = snapshotResources.getMetaData();
final List<Tuple2<RocksIteratorWrapper, Integer>> kvStateIterators =
new ArrayList<>(metaData.size());

Expand Down Expand Up @@ -300,7 +291,7 @@ private void writeKVStateMetaData(final DataOutputView outputView) throws IOExce
// get a serialized form already at state registration time in the
// future
keySerializer,
stateMetaInfoSnapshots,
snapshotResources.stateMetaInfoSnapshots,
!Objects.equals(
UncompressedStreamCompressionDecorator.INSTANCE,
keyGroupCompressionDecorator));
Expand Down Expand Up @@ -429,22 +420,6 @@ private void checkInterrupted() throws InterruptedException {
}
}

private static List<MetaData> fillMetaData(List<RocksDbKvStateInfo> metaDataCopy) {
List<MetaData> metaData = new ArrayList<>(metaDataCopy.size());
for (RocksDbKvStateInfo rocksDbKvStateInfo : metaDataCopy) {
StateSnapshotTransformer<byte[]> stateSnapshotTransformer = null;
if (rocksDbKvStateInfo.metaInfo instanceof RegisteredKeyValueStateBackendMetaInfo) {
stateSnapshotTransformer =
((RegisteredKeyValueStateBackendMetaInfo<?, ?>) rocksDbKvStateInfo.metaInfo)
.getStateSnapshotTransformFactory()
.createForSerializedState()
.orElse(null);
}
metaData.add(new MetaData(rocksDbKvStateInfo, stateSnapshotTransformer));
}
return metaData;
}

private static RocksIteratorWrapper getRocksIterator(
RocksDB db,
ColumnFamilyHandle columnFamilyHandle,
Expand All @@ -471,10 +446,10 @@ private MetaData(

static class FullRocksDBSnapshotResources implements SnapshotResources {
private final List<StateMetaInfoSnapshot> stateMetaInfoSnapshots;
private final List<RocksDbKvStateInfo> metaDataCopy;
private final ResourceGuard.Lease lease;
private final Snapshot snapshot;
private final RocksDB db;
private final List<MetaData> metaData;

public FullRocksDBSnapshotResources(
ResourceGuard.Lease lease,
Expand All @@ -484,9 +459,33 @@ public FullRocksDBSnapshotResources(
RocksDB db) {
this.lease = lease;
this.snapshot = snapshot;
this.metaDataCopy = metaDataCopy;
this.stateMetaInfoSnapshots = stateMetaInfoSnapshots;
this.db = db;

// we need to to this in the constructor, i.e. in the synchronous part of the snapshot
// TODO: better yet, we can do it outside the constructor
this.metaData = fillMetaData(metaDataCopy);
}

private List<MetaData> fillMetaData(List<RocksDbKvStateInfo> metaDataCopy) {
List<MetaData> metaData = new ArrayList<>(metaDataCopy.size());
for (RocksDbKvStateInfo rocksDbKvStateInfo : metaDataCopy) {
StateSnapshotTransformer<byte[]> stateSnapshotTransformer = null;
if (rocksDbKvStateInfo.metaInfo instanceof RegisteredKeyValueStateBackendMetaInfo) {
stateSnapshotTransformer =
((RegisteredKeyValueStateBackendMetaInfo<?, ?>)
rocksDbKvStateInfo.metaInfo)
.getStateSnapshotTransformFactory()
.createForSerializedState()
.orElse(null);
}
metaData.add(new MetaData(rocksDbKvStateInfo, stateSnapshotTransformer));
}
return metaData;
}

private List<MetaData> getMetaData() {
return metaData;
}

@Override
Expand Down

0 comments on commit dfb0275

Please sign in to comment.