Skip to content

Commit

Permalink
[refactor][state-processor-api] Remove usages of deprecated class
Browse files Browse the repository at this point in the history
  • Loading branch information
sjwiesman committed Feb 2, 2022
1 parent 77adfb9 commit cefdfc1
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import org.apache.flink.state.api.input.operator.window.ProcessEvictingWindowReader;
import org.apache.flink.state.api.input.operator.window.ReduceEvictingWindowReaderFunction;
import org.apache.flink.state.api.runtime.MutableConfig;
import org.apache.flink.state.api.runtime.metadata.SavepointMetadata;
import org.apache.flink.state.api.runtime.metadata.SavepointMetadataV2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.windows.Window;
Expand All @@ -58,7 +58,7 @@ public class EvictingWindowSavepointReader<W extends Window> {
* The savepoint metadata, which maintains the current set of existing / newly added operator
* states.
*/
private final SavepointMetadata metadata;
private final SavepointMetadataV2 metadata;

/**
* The state backend that was previously used to write existing operator states in this
Expand All @@ -72,7 +72,7 @@ public class EvictingWindowSavepointReader<W extends Window> {

EvictingWindowSavepointReader(
StreamExecutionEnvironment env,
SavepointMetadata metadata,
SavepointMetadataV2 metadata,
StateBackend stateBackend,
TypeSerializer<W> windowSerializer) {
Preconditions.checkNotNull(env, "The execution environment must not be null");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
import org.apache.flink.state.api.input.operator.KeyedStateReaderOperator;
import org.apache.flink.state.api.runtime.MutableConfig;
import org.apache.flink.state.api.runtime.SavepointLoader;
import org.apache.flink.state.api.runtime.metadata.SavepointMetadata;
import org.apache.flink.state.api.runtime.metadata.SavepointMetadataV2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
Expand Down Expand Up @@ -77,8 +77,8 @@ public static SavepointReader read(
new RuntimeException(
"Savepoint must contain at least one operator state."));

SavepointMetadata savepointMetadata =
new SavepointMetadata(
SavepointMetadataV2 savepointMetadata =
new SavepointMetadataV2(
maxParallelism, metadata.getMasterStates(), metadata.getOperatorStates());
return new SavepointReader(env, savepointMetadata, stateBackend);
}
Expand All @@ -90,7 +90,7 @@ public static SavepointReader read(
* The savepoint metadata, which maintains the current set of existing / newly added operator
* states.
*/
private final SavepointMetadata metadata;
private final SavepointMetadataV2 metadata;

/**
* The state backend that was previously used to write existing operator states in this
Expand All @@ -100,7 +100,7 @@ public static SavepointReader read(
private final StateBackend stateBackend;

SavepointReader(
StreamExecutionEnvironment env, SavepointMetadata metadata, StateBackend stateBackend) {
StreamExecutionEnvironment env, SavepointMetadataV2 metadata, StateBackend stateBackend) {
Preconditions.checkNotNull(env, "The execution environment must not be null");
Preconditions.checkNotNull(metadata, "The savepoint metadata must not be null");
Preconditions.checkNotNull(stateBackend, "The state backend must not be null");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.apache.flink.state.api.input.operator.WindowReaderOperator;
import org.apache.flink.state.api.input.operator.window.PassThroughReader;
import org.apache.flink.state.api.runtime.MutableConfig;
import org.apache.flink.state.api.runtime.metadata.SavepointMetadata;
import org.apache.flink.state.api.runtime.metadata.SavepointMetadataV2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.windows.Window;
Expand All @@ -54,7 +54,7 @@ public class WindowSavepointReader<W extends Window> {
* The savepoint metadata, which maintains the current set of existing / newly added operator
* states.
*/
private final SavepointMetadata metadata;
private final SavepointMetadataV2 metadata;

/**
* The state backend that was previously used to write existing operator states in this
Expand All @@ -68,7 +68,7 @@ public class WindowSavepointReader<W extends Window> {

WindowSavepointReader(
StreamExecutionEnvironment env,
SavepointMetadata metadata,
SavepointMetadataV2 metadata,
StateBackend stateBackend,
TypeSerializer<W> windowSerializer) {
Preconditions.checkNotNull(env, "The execution environment must not be null");
Expand Down

0 comments on commit cefdfc1

Please sign in to comment.