From 6c58832852239cf7c04c72ac337dc12dbee0c178 Mon Sep 17 00:00:00 2001 From: Xintong Song Date: Mon, 19 Apr 2021 14:28:11 +0800 Subject: [PATCH] [FLINK-22340][runtime] Make ThresholdMeter thread safe. This closes #15662 --- .../flink/runtime/metrics/ThresholdMeter.java | 21 +++++++-- .../runtime/metrics/ThresholdMeterTest.java | 46 +++++++++++++++++++ 2 files changed, 62 insertions(+), 5 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ThresholdMeter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ThresholdMeter.java index 96f3da608535b..fbba4df703349 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ThresholdMeter.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/ThresholdMeter.java @@ -23,17 +23,28 @@ import org.apache.flink.util.clock.Clock; import org.apache.flink.util.clock.SystemClock; +import javax.annotation.concurrent.GuardedBy; + import java.time.Duration; import java.util.ArrayDeque; import java.util.Queue; -/** A timestamp queue based threshold meter. */ +/** + * A timestamp queue based threshold meter. + * + *

Note: This class is thread safe, at the price of synchronization overhead. Do not use this in + * performance sensitive scenarios, e.g., per-record updated metrics. + */ public class ThresholdMeter implements Meter { private static final double MILLISECONDS_PER_SECOND = 1000.0; private final Clock clock; private final double maxEventsPerInterval; private final Duration interval; + + @GuardedBy("this") private final Queue eventTimestamps; + + @GuardedBy("this") private long eventCount = 0; public ThresholdMeter(double maxEventsPerInterval, Duration interval) { @@ -52,13 +63,13 @@ public ThresholdMeter(double maxEventsPerInterval, Duration interval, Clock cloc } @Override - public void markEvent() { + public synchronized void markEvent() { eventTimestamps.add(clock.absoluteTimeMillis()); eventCount++; } @Override - public void markEvent(long n) { + public synchronized void markEvent(long n) { long timestamp = clock.absoluteTimeMillis(); for (int i = 0; i < n; i++) { eventTimestamps.add(timestamp); @@ -72,7 +83,7 @@ public double getRate() { } @Override - public long getCount() { + public synchronized long getCount() { return eventCount; } @@ -86,7 +97,7 @@ public void checkAgainstThreshold() throws ThresholdExceedException { } } - private int getEventCountsRecentInterval() { + private synchronized int getEventCountsRecentInterval() { Long currentTimeStamp = clock.absoluteTimeMillis(); while (!eventTimestamps.isEmpty() && currentTimeStamp - eventTimestamps.peek() > interval.toMillis()) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/ThresholdMeterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/ThresholdMeterTest.java index 105809365fb5a..2a38541574477 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/ThresholdMeterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/ThresholdMeterTest.java @@ -26,10 +26,13 @@ import org.junit.Test; import java.time.Duration; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.TimeUnit; import static org.hamcrest.Matchers.closeTo; import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; @@ -121,6 +124,49 @@ public void testUpdateInterval() { thresholdMeter.checkAgainstThreshold(); } + @Test + public void testConcurrentAccess() throws Exception { + final ThresholdMeter thresholdMeter = new ThresholdMeter(THRESHOLD_LARGE, INTERVAL); + final int repeatNum = 100; + final int concurrency = 2; + + final List threads = new ArrayList<>(); + + threads.addAll( + getConcurrentThreads(repeat(thresholdMeter::markEvent, repeatNum), concurrency)); + threads.addAll( + getConcurrentThreads(repeat(thresholdMeter::getRate, repeatNum), concurrency)); + threads.addAll( + getConcurrentThreads( + repeat(thresholdMeter::checkAgainstThreshold, repeatNum), concurrency)); + + for (Thread thread : threads) { + thread.start(); + } + + for (Thread thread : threads) { + thread.join(); + } + + assertEquals(repeatNum * concurrency, thresholdMeter.getCount()); + } + + private static Runnable repeat(Runnable task, int repeatNum) { + return () -> { + for (int i = 0; i < repeatNum; ++i) { + task.run(); + } + }; + } + + private static List getConcurrentThreads(Runnable task, int concurrency) { + List threads = new ArrayList<>(); + for (int i = 0; i < concurrency; ++i) { + threads.add(new Thread(task)); + } + return threads; + } + private static ThresholdMeter createLargeThresholdMeter() { return new ThresholdMeter(THRESHOLD_LARGE, INTERVAL, clock); }