Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-22340][runtime] Make ThresholdMeter thread safe. #15662

Closed
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.apache.flink.util.clock.Clock;
import org.apache.flink.util.clock.SystemClock;

import javax.annotation.concurrent.GuardedBy;

import java.time.Duration;
import java.util.ArrayDeque;
import java.util.Queue;
Expand All @@ -33,7 +35,11 @@ public class ThresholdMeter implements Meter {
private final Clock clock;
private final double maxEventsPerInterval;
private final Duration interval;

@GuardedBy("this")
private final Queue<Long> eventTimestamps;

@GuardedBy("this")
private long eventCount = 0;

public ThresholdMeter(double maxEventsPerInterval, Duration interval) {
xintongsong marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -52,13 +58,13 @@ public ThresholdMeter(double maxEventsPerInterval, Duration interval, Clock cloc
}

@Override
public void markEvent() {
public synchronized void markEvent() {
eventTimestamps.add(clock.absoluteTimeMillis());
eventCount++;
}

@Override
public void markEvent(long n) {
public synchronized void markEvent(long n) {
long timestamp = clock.absoluteTimeMillis();
for (int i = 0; i < n; i++) {
eventTimestamps.add(timestamp);
Expand All @@ -72,20 +78,21 @@ public double getRate() {
}

@Override
public long getCount() {
public synchronized long getCount() {
return eventCount;
}

public void checkAgainstThreshold() throws ThresholdExceedException {
if (getEventCountsRecentInterval() >= maxEventsPerInterval) {
int recentEvents = getEventCountsRecentInterval();
if (recentEvents >= maxEventsPerInterval) {
throw new ThresholdExceedException(
String.format(
"%d events detected in the recent interval, reaching the threshold %f.",
getEventCountsRecentInterval(), maxEventsPerInterval));
recentEvents, maxEventsPerInterval));
}
}

private int getEventCountsRecentInterval() {
private synchronized int getEventCountsRecentInterval() {
Long currentTimeStamp = clock.absoluteTimeMillis();
while (!eventTimestamps.isEmpty()
&& currentTimeStamp - eventTimestamps.peek() > interval.toMillis()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,13 @@
import org.junit.Test;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

import static org.hamcrest.Matchers.closeTo;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;

Expand Down Expand Up @@ -121,6 +124,49 @@ public void testUpdateInterval() {
thresholdMeter.checkAgainstThreshold();
}

@Test
public void testConcurrentAccess() throws Exception {
final ThresholdMeter thresholdMeter = new ThresholdMeter(THRESHOLD_LARGE, INTERVAL);
final int repeatNum = 100;
final int concurrency = 2;

final List<Thread> threads = new ArrayList<>();

threads.addAll(
getConcurrentThreads(repeat(thresholdMeter::markEvent, repeatNum), concurrency));
threads.addAll(
getConcurrentThreads(repeat(thresholdMeter::getRate, repeatNum), concurrency));
threads.addAll(
getConcurrentThreads(
repeat(thresholdMeter::checkAgainstThreshold, repeatNum), concurrency));

for (Thread thread : threads) {
thread.start();
}

for (Thread thread : threads) {
thread.join();
}

assertEquals(repeatNum * concurrency, thresholdMeter.getCount());
}

private static Runnable repeat(Runnable task, int repeateNum) {
xintongsong marked this conversation as resolved.
Show resolved Hide resolved
return () -> {
for (int i = 0; i < repeateNum; ++i) {
task.run();
}
};
}

private static List<Thread> getConcurrentThreads(Runnable task, int concurrency) {
List<Thread> threads = new ArrayList<>();
for (int i = 0; i < concurrency; ++i) {
threads.add(new Thread(task));
}
return threads;
}

private static ThresholdMeter createLargeThresholdMeter() {
return new ThresholdMeter(THRESHOLD_LARGE, INTERVAL, clock);
}
Expand Down