Skip to content

Commit

Permalink
[FLINK-22340][runtime] Make ThresholdMeter thread safe.
Browse files Browse the repository at this point in the history
This closes apache#15662
  • Loading branch information
xintongsong committed Apr 19, 2021
1 parent 8df651d commit 6c58832
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,28 @@
import org.apache.flink.util.clock.Clock;
import org.apache.flink.util.clock.SystemClock;

import javax.annotation.concurrent.GuardedBy;

import java.time.Duration;
import java.util.ArrayDeque;
import java.util.Queue;

/** A timestamp queue based threshold meter. */
/**
* A timestamp queue based threshold meter.
*
* <p>Note: This class is thread safe, at the price of synchronization overhead. Do not use this in
* performance sensitive scenarios, e.g., per-record updated metrics.
*/
public class ThresholdMeter implements Meter {
private static final double MILLISECONDS_PER_SECOND = 1000.0;
private final Clock clock;
private final double maxEventsPerInterval;
private final Duration interval;

@GuardedBy("this")
private final Queue<Long> eventTimestamps;

@GuardedBy("this")
private long eventCount = 0;

public ThresholdMeter(double maxEventsPerInterval, Duration interval) {
Expand All @@ -52,13 +63,13 @@ public ThresholdMeter(double maxEventsPerInterval, Duration interval, Clock cloc
}

@Override
public void markEvent() {
public synchronized void markEvent() {
eventTimestamps.add(clock.absoluteTimeMillis());
eventCount++;
}

@Override
public void markEvent(long n) {
public synchronized void markEvent(long n) {
long timestamp = clock.absoluteTimeMillis();
for (int i = 0; i < n; i++) {
eventTimestamps.add(timestamp);
Expand All @@ -72,7 +83,7 @@ public double getRate() {
}

@Override
public long getCount() {
public synchronized long getCount() {
return eventCount;
}

Expand All @@ -86,7 +97,7 @@ public void checkAgainstThreshold() throws ThresholdExceedException {
}
}

private int getEventCountsRecentInterval() {
private synchronized int getEventCountsRecentInterval() {
Long currentTimeStamp = clock.absoluteTimeMillis();
while (!eventTimestamps.isEmpty()
&& currentTimeStamp - eventTimestamps.peek() > interval.toMillis()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,13 @@
import org.junit.Test;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

import static org.hamcrest.Matchers.closeTo;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;

Expand Down Expand Up @@ -121,6 +124,49 @@ public void testUpdateInterval() {
thresholdMeter.checkAgainstThreshold();
}

@Test
public void testConcurrentAccess() throws Exception {
final ThresholdMeter thresholdMeter = new ThresholdMeter(THRESHOLD_LARGE, INTERVAL);
final int repeatNum = 100;
final int concurrency = 2;

final List<Thread> threads = new ArrayList<>();

threads.addAll(
getConcurrentThreads(repeat(thresholdMeter::markEvent, repeatNum), concurrency));
threads.addAll(
getConcurrentThreads(repeat(thresholdMeter::getRate, repeatNum), concurrency));
threads.addAll(
getConcurrentThreads(
repeat(thresholdMeter::checkAgainstThreshold, repeatNum), concurrency));

for (Thread thread : threads) {
thread.start();
}

for (Thread thread : threads) {
thread.join();
}

assertEquals(repeatNum * concurrency, thresholdMeter.getCount());
}

private static Runnable repeat(Runnable task, int repeatNum) {
return () -> {
for (int i = 0; i < repeatNum; ++i) {
task.run();
}
};
}

private static List<Thread> getConcurrentThreads(Runnable task, int concurrency) {
List<Thread> threads = new ArrayList<>();
for (int i = 0; i < concurrency; ++i) {
threads.add(new Thread(task));
}
return threads;
}

private static ThresholdMeter createLargeThresholdMeter() {
return new ThresholdMeter(THRESHOLD_LARGE, INTERVAL, clock);
}
Expand Down

0 comments on commit 6c58832

Please sign in to comment.