diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java index 87f4e0c6724a6..eef69184a6ab6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java @@ -87,7 +87,7 @@ public class TaskLocalStateStoreImpl implements OwnedTaskLocalStateStore { @Nonnull private final Executor discardExecutor; /** Lock for synchronisation on the storage map and the discarded status. */ - @Nonnull private final Object lock; + @Nonnull private final Object lock = new Object(); /** Status flag if this store was already discarded. */ @GuardedBy("lock") @@ -106,36 +106,13 @@ public TaskLocalStateStoreImpl( @Nonnull LocalRecoveryConfig localRecoveryConfig, @Nonnull Executor discardExecutor) { - this( - jobID, - allocationID, - jobVertexID, - subtaskIndex, - localRecoveryConfig, - discardExecutor, - new TreeMap<>(), - new Object()); - } - - @VisibleForTesting - TaskLocalStateStoreImpl( - @Nonnull JobID jobID, - @Nonnull AllocationID allocationID, - @Nonnull JobVertexID jobVertexID, - @Nonnegative int subtaskIndex, - @Nonnull LocalRecoveryConfig localRecoveryConfig, - @Nonnull Executor discardExecutor, - @Nonnull SortedMap storedTaskStateByCheckpointID, - @Nonnull Object lock) { - this.jobID = jobID; this.allocationID = allocationID; this.jobVertexID = jobVertexID; this.subtaskIndex = subtaskIndex; this.discardExecutor = discardExecutor; this.localRecoveryConfig = localRecoveryConfig; - this.storedTaskStateByCheckpointID = storedTaskStateByCheckpointID; - this.lock = lock; + this.storedTaskStateByCheckpointID = new TreeMap<>(); this.disposed = false; }