Skip to content

Commit

Permalink
[grid] Make event bus listener for Queuer and LocalQueue thread-safe (S…
Browse files Browse the repository at this point in the history
…eleniumHQ#9161)

* [grid] Make event bus listener for Queuer and LocalQueue thread-safe

* [grid] Make LocalNewSessionQueuer use single instance of GetNewSessionResponse.

Co-authored-by: Diego Molina <diemol@users.noreply.github.com>
  • Loading branch information
pujagani and diemol authored Feb 10, 2021
1 parent 2e7f6e7 commit b54ff65
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.logging.Level;
import java.util.logging.Logger;

Expand All @@ -48,59 +51,60 @@ public class GetNewSessionResponse {
private final EventBus bus;
private final Tracer tracer;
private final NewSessionQueue sessionRequests;
private final Map<RequestId, NewSessionRequest> knownRequests = new ConcurrentHashMap<>();
private static final Map<RequestId, NewSessionRequest> knownRequests = new ConcurrentHashMap<>();
private final ReadWriteLock lock = new ReentrantReadWriteLock(true);

public GetNewSessionResponse(Tracer tracer, EventBus bus,
NewSessionQueue sessionRequests) {
this.tracer = Require.nonNull("Tracer", tracer);
this.bus = Require.nonNull("Event bus", bus);
this.sessionRequests = Require.nonNull("New Session Request Queue", sessionRequests);

this.bus.addListener(NewSessionResponseEvent.listener(sessionResponse -> {
try {
this.setResponse(sessionResponse);
} catch (Exception ignore) {
// Ignore any exception. Do not want to block the eventbus thread.
}
}));
this.bus.addListener(NewSessionResponseEvent.listener(this::setResponse));

this.bus.addListener(NewSessionRejectedEvent.listener(sessionResponse -> {
try {
this.setErrorResponse(sessionResponse);
} catch (Exception ignore) {
// Ignore any exception. Do not want to block the eventbus thread.
}
}));
this.bus.addListener(NewSessionRejectedEvent.listener(this::setErrorResponse));
}

private void setResponse(NewSessionResponse sessionResponse) {
// Each thread will get its own CountDownLatch and it is stored in the Map using request id as the key.
// EventBus thread will retrieve the same request and set it's response and unblock waiting request thread.
RequestId id = sessionResponse.getRequestId();
Optional<NewSessionRequest> sessionRequest = Optional.ofNullable(knownRequests.get(id));

if (sessionRequest.isPresent()) {
NewSessionRequest request = sessionRequest.get();
request.setSessionResponse(
new HttpResponse().setContent(bytes(sessionResponse.getDownstreamEncodedResponse())));
request.getLatch().countDown();
Lock writeLock = lock.writeLock();
writeLock.lock();
try {
// Each thread will get its own CountDownLatch and it is stored in the Map using request id as the key.
// EventBus thread will retrieve the same request and set it's response and unblock waiting request thread.
RequestId id = sessionResponse.getRequestId();
Optional<NewSessionRequest> sessionRequest = Optional.ofNullable(knownRequests.get(id));

if (sessionRequest.isPresent()) {
NewSessionRequest request = sessionRequest.get();
request.setSessionResponse(
new HttpResponse().setContent(bytes(sessionResponse.getDownstreamEncodedResponse())));
request.getLatch().countDown();
}
} finally {
writeLock.unlock();
}
}

private void setErrorResponse(NewSessionErrorResponse sessionResponse) {
RequestId id = sessionResponse.getRequestId();
Optional<NewSessionRequest> sessionRequest = Optional.ofNullable(knownRequests.get(id));

// There could be a situation where the session request in the queue is scheduled for retry.
// Meanwhile the request queue is cleared.
// This will fire a error response event and remove the request id from the knownRequests map.
// Another error response event will be fired by the Distributor when the request is retried.
// Since a response is already provided for the request, the event listener should not take any action.

if (sessionRequest.isPresent()) {
NewSessionRequest request = sessionRequest.get();
request.setSessionResponse(internalErrorResponse(sessionResponse.getMessage()));
request.getLatch().countDown();
Lock writeLock = lock.writeLock();
writeLock.lock();
try {
RequestId id = sessionResponse.getRequestId();
Optional<NewSessionRequest> sessionRequest = Optional.ofNullable(knownRequests.get(id));

// There could be a situation where the session request in the queue is scheduled for retry.
// Meanwhile the request queue is cleared.
// This will fire a error response event and remove the request id from the knownRequests map.
// Another error response event will be fired by the Distributor when the request is retried.
// Since a response is already provided for the request, the event listener should not take any action.

if (sessionRequest.isPresent()) {
NewSessionRequest request = sessionRequest.get();
request.setSessionResponse(internalErrorResponse(sessionResponse.getMessage()));
request.getLatch().countDown();
}
} finally {
writeLock.unlock();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,6 @@ public boolean offerLast(HttpRequest request, RequestId requestId) {
Require.nonNull("New Session request", request);

Span span = tracer.getCurrentContext().createSpan("local_sessionqueue.add");
boolean added = false;
SessionRequest sessionRequest = new SessionRequest(requestId, request);

Lock writeLock = lock.writeLock();
writeLock.lock();
Expand All @@ -148,63 +146,63 @@ public boolean offerLast(HttpRequest request, RequestId requestId) {
attributeMap.put(AttributeKey.LOGGER_CLASS.getKey(),
EventAttribute.setValue(getClass().getName()));

added = sessionRequests.offerLast(sessionRequest);
SessionRequest sessionRequest = new SessionRequest(requestId, request);
addRequestHeaders(request, requestId);
boolean added = sessionRequests.offerLast(sessionRequest);

attributeMap.put(
AttributeKey.REQUEST_ID.getKey(), EventAttribute.setValue(requestId.toString()));
attributeMap.put("request.added", EventAttribute.setValue(added));
span.addEvent("Add new session request to the queue", attributeMap);

if (added) {
bus.fire(new NewSessionRequestEvent(requestId));
}

return added;
} finally {
writeLock.unlock();
span.close();
if (added) {
bus.fire(new NewSessionRequestEvent(requestId));
}
}
}

@Override
public boolean offerFirst(HttpRequest request, RequestId requestId) {
Require.nonNull("New Session request", request);
boolean added = false;
SessionRequest sessionRequest = new SessionRequest(requestId, request);

Lock writeLock = lock.writeLock();
writeLock.lock();
try {
added = sessionRequests.offerFirst(sessionRequest);
return added;
} finally {
writeLock.unlock();
SessionRequest sessionRequest = new SessionRequest(requestId, request);
boolean added = sessionRequests.offerFirst(sessionRequest);
if (added) {
executorService.schedule(() -> retryRequest(sessionRequest),
super.retryInterval.getSeconds(), TimeUnit.SECONDS);
}
return added;
} finally {
writeLock.unlock();
}
}

private void retryRequest(SessionRequest sessionRequest) {
HttpRequest request = sessionRequest.getHttpRequest();
RequestId requestId = sessionRequest.getRequestId();
if (hasRequestTimedOut(request)) {
LOG.log(Level.INFO, "Request {0} timed out", requestId);
Lock writeLock = lock.writeLock();
writeLock.lock();
try {
Lock writeLock = lock.writeLock();
writeLock.lock();
try {
HttpRequest request = sessionRequest.getHttpRequest();
RequestId requestId = sessionRequest.getRequestId();
if (hasRequestTimedOut(request)) {
LOG.log(Level.INFO, "Request {0} timed out", requestId);
sessionRequests.remove(sessionRequest);
} finally {
writeLock.unlock();
bus.fire(new NewSessionRejectedEvent(
new NewSessionErrorResponse(requestId, "New session request timed out")));
} else {
LOG.log(Level.INFO,
"Adding request back to the queue. All slots are busy. Request: {0}",
requestId);
bus.fire(new NewSessionRequestEvent(requestId));
}
} else {
LOG.log(Level.INFO,
"Adding request back to the queue. All slots are busy. Request: {0}",
requestId);
bus.fire(new NewSessionRequestEvent(requestId));
} finally {
writeLock.unlock();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public class LocalNewSessionQueuer extends NewSessionQueuer {

public final NewSessionQueue sessionRequests;
private final EventBus bus;
private final GetNewSessionResponse getNewSessionResponse;

public LocalNewSessionQueuer(
Tracer tracer,
Expand All @@ -50,6 +51,8 @@ public LocalNewSessionQueuer(
super(tracer, registrationSecret);
this.bus = Require.nonNull("Event bus", bus);
this.sessionRequests = Require.nonNull("New Session Request Queue", sessionRequests);

this.getNewSessionResponse = new GetNewSessionResponse(tracer, bus, sessionRequests);
}

public static NewSessionQueuer create(Config config) {
Expand All @@ -72,8 +75,6 @@ public static NewSessionQueuer create(Config config) {
@Override
public HttpResponse addToQueue(HttpRequest request) {
validateSessionRequest(request);
GetNewSessionResponse getNewSessionResponse =
new GetNewSessionResponse(tracer, bus, sessionRequests);
return getNewSessionResponse.add(request);
}

Expand Down

0 comments on commit b54ff65

Please sign in to comment.