Skip to content

Commit

Permalink
fix: use sendThreadPool in sendEvents (#107)
Browse files Browse the repository at this point in the history
* fix: use sendThreadPool in sendEvents

* remove redundant thread pools in Amplitude.java

* chore: fix build

* Separate executors

* Separate executors

---------

Co-authored-by: izaaz.yunus <izaaz@amplitude.com>
  • Loading branch information
kevink-sq and izaaz authored Oct 10, 2024
1 parent 6dfc6bb commit 88a9589
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 5 deletions.
21 changes: 21 additions & 0 deletions src/main/java/com/amplitude/Amplitude.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.net.Proxy;
import java.util.*;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;

public class Amplitude {
private static Map<String, Amplitude> instances = new HashMap<>();
Expand Down Expand Up @@ -206,6 +207,26 @@ public Amplitude setFlushTimeout(long timeout) {
return this;
}

/**
* Set the thread pool for sending events via {@link HttpTransport}
*
* @param sendThreadPool the thread pool for sending events
*/
public Amplitude setSendThreadPool(ExecutorService sendThreadPool) {
this.httpTransport.setSendThreadPool(sendThreadPool);
return this;
}

/**
* Set the thread pool for retrying events via {@link HttpTransport}
*
* @param retryThreadPool the thread pool for retrying events
*/
public Amplitude setRetryThreadPool(ExecutorService retryThreadPool) {
this.httpTransport.setRetryThreadPool(retryThreadPool);
return this;
}

/** Add middleware to the middleware runner */
public synchronized void addEventMiddleware(Middleware middleware) {
middlewareRunner.add(middleware);
Expand Down
25 changes: 20 additions & 5 deletions src/main/java/com/amplitude/HttpTransport.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,22 +38,29 @@ class HttpTransport {
private int eventsInRetry = 0;
private Object bufferLock = new Object();
private Object counterLock = new Object();
private ExecutorService retryThreadPool;
private ExecutorService sendThreadPool;

private HttpCall httpCall;
private AmplitudeLog logger;
private AmplitudeCallbacks callbacks;
private long flushTimeout;

// Managed by setters
private ExecutorService retryThreadPool = Executors.newFixedThreadPool(10);

// The supplyAsyncPool is only used within the sendThreadPool so only when
// the sendThreadPool is increased will the supplyAsyncPool be more utilized.
// We are using the supplyAsyncPool rather than the default fork join common
// pool because the fork join common pool scales with cpu... and we do not
// want to perform network requests in that small pool.
private ExecutorService sendThreadPool = Executors.newFixedThreadPool(20);
private ExecutorService supplyAsyncPool = Executors.newCachedThreadPool();

HttpTransport(
HttpCall httpCall, AmplitudeCallbacks callbacks, AmplitudeLog logger, long flushTimeout) {
this.httpCall = httpCall;
this.callbacks = callbacks;
this.logger = logger;
this.flushTimeout = flushTimeout;
retryThreadPool = Executors.newFixedThreadPool(10);
sendThreadPool = Executors.newFixedThreadPool(20);
}

public void sendEventsWithRetry(List<Event> events) {
Expand Down Expand Up @@ -98,6 +105,14 @@ public void setFlushTimeout(long timeout) {
flushTimeout = timeout;
}

public void setSendThreadPool(ExecutorService sendThreadPool) {
this.sendThreadPool = sendThreadPool;
}

public void setRetryThreadPool(ExecutorService retryThreadPool) {
this.retryThreadPool = retryThreadPool;
}

public void setCallbacks(AmplitudeCallbacks callbacks) {
this.callbacks = callbacks;
}
Expand All @@ -118,7 +133,7 @@ private CompletableFuture<Response> sendEvents(List<Event> events) {
throw new CompletionException(e);
}
return response;
});
}, supplyAsyncPool);
}

// Call this function if event not in current Retry list.
Expand Down

0 comments on commit 88a9589

Please sign in to comment.