diff --git a/java/client/src/org/openqa/selenium/remote/tracing/AttributeKey.java b/java/client/src/org/openqa/selenium/remote/tracing/AttributeKey.java index 7d9e358347425..b91952115adfc 100644 --- a/java/client/src/org/openqa/selenium/remote/tracing/AttributeKey.java +++ b/java/client/src/org/openqa/selenium/remote/tracing/AttributeKey.java @@ -48,7 +48,9 @@ public enum AttributeKey { DATABASE_OPERATION ("db.operation"), DATABASE_USER ("db.user"), DATABASE_CONNECTION_STRING ("db.connection_string"), - DATABASE_SYSTEM("db.system"); + DATABASE_SYSTEM("db.system"), + + REQUEST_ID ("request.id"); private final String key; diff --git a/java/server/src/org/openqa/selenium/grid/BUILD.bazel b/java/server/src/org/openqa/selenium/grid/BUILD.bazel index 8a3a9a5f0f124..7bc84e265e39b 100644 --- a/java/server/src/org/openqa/selenium/grid/BUILD.bazel +++ b/java/server/src/org/openqa/selenium/grid/BUILD.bazel @@ -72,6 +72,7 @@ java_export( "//java/server/src/org/openqa/selenium/grid/node/httpd", "//java/server/src/org/openqa/selenium/grid/router/httpd", "//java/server/src/org/openqa/selenium/grid/sessionmap/httpd", + "//java/server/src/org/openqa/selenium/grid/sessionqueue/httpd", ], deps = [ ":base-command", diff --git a/java/server/src/org/openqa/selenium/grid/config/StandardGridRoles.java b/java/server/src/org/openqa/selenium/grid/config/StandardGridRoles.java index e4f68e8f71b37..4d27bef638c11 100644 --- a/java/server/src/org/openqa/selenium/grid/config/StandardGridRoles.java +++ b/java/server/src/org/openqa/selenium/grid/config/StandardGridRoles.java @@ -33,7 +33,10 @@ private StandardGridRoles() { public static final Role NODE_ROLE = Role.of("grid-node"); public static final Role ROUTER_ROLE = Role.of("grid-router"); public static final Role SESSION_MAP_ROLE = Role.of("grid-session-map"); + public static final Role SESSION_QUEUE_ROLE = Role.of("grid-new-session-queue"); + public static final Role SESSION_QUEUER_ROLE = Role.of("grid-new-session-queuer"); public static final Set ALL_ROLES = Collections.unmodifiableSet( - new TreeSet<>(Arrays.asList(DISTRIBUTOR_ROLE, EVENT_BUS_ROLE, NODE_ROLE, ROUTER_ROLE, SESSION_MAP_ROLE))); + new TreeSet<>( + Arrays.asList(DISTRIBUTOR_ROLE, EVENT_BUS_ROLE, NODE_ROLE, ROUTER_ROLE, SESSION_MAP_ROLE, SESSION_QUEUER_ROLE, SESSION_QUEUE_ROLE))); } diff --git a/java/server/src/org/openqa/selenium/grid/data/NewSessionErrorResponse.java b/java/server/src/org/openqa/selenium/grid/data/NewSessionErrorResponse.java new file mode 100644 index 0000000000000..01b3831690329 --- /dev/null +++ b/java/server/src/org/openqa/selenium/grid/data/NewSessionErrorResponse.java @@ -0,0 +1,76 @@ +// Licensed to the Software Freedom Conservancy (SFC) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The SFC licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.openqa.selenium.grid.data; + +import com.google.common.collect.ImmutableMap; + +import org.openqa.selenium.internal.Require; +import org.openqa.selenium.json.JsonInput; + +import java.util.Map; + +public class NewSessionErrorResponse { + + private final String message; + private final RequestId requestId; + + public NewSessionErrorResponse(RequestId requestId, String message) { + this.requestId = Require.nonNull("Request Id", requestId); + this.message = Require.nonNull("Message", message); + } + + public String getMessage() { + return message; + } + + public RequestId getRequestId() { + return requestId; + } + + private Map toJson() { + return ImmutableMap.of( + "message", message, + "requestId", requestId); + } + + private static NewSessionErrorResponse fromJson(JsonInput input) { + String message = null; + RequestId requestId = null; + + input.beginObject(); + while (input.hasNext()) { + switch (input.nextName()) { + case "message": + message = input.read(String.class); + break; + + case "requestId": + requestId = input.read(RequestId.class); + break; + + default: + input.skipValue(); + break; + } + } + input.endObject(); + + return new NewSessionErrorResponse(requestId, message); + } + +} diff --git a/java/server/src/org/openqa/selenium/grid/data/NewSessionRejectedEvent.java b/java/server/src/org/openqa/selenium/grid/data/NewSessionRejectedEvent.java new file mode 100644 index 0000000000000..c05c3a5cc441d --- /dev/null +++ b/java/server/src/org/openqa/selenium/grid/data/NewSessionRejectedEvent.java @@ -0,0 +1,32 @@ +// Licensed to the Software Freedom Conservancy (SFC) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The SFC licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.openqa.selenium.grid.data; + +import org.openqa.selenium.events.Event; +import org.openqa.selenium.events.Type; + +import java.util.UUID; + +public class NewSessionRejectedEvent extends Event { + + public static final Type NEW_SESSION_REJECTED = new Type("new-session-rejected"); + + public NewSessionRejectedEvent(NewSessionErrorResponse response) { + super(NEW_SESSION_REJECTED, response); + } +} diff --git a/java/server/src/org/openqa/selenium/grid/data/NewSessionRequest.java b/java/server/src/org/openqa/selenium/grid/data/NewSessionRequest.java new file mode 100644 index 0000000000000..6252e30c09c49 --- /dev/null +++ b/java/server/src/org/openqa/selenium/grid/data/NewSessionRequest.java @@ -0,0 +1,31 @@ +package org.openqa.selenium.grid.data; + +import org.openqa.selenium.remote.http.HttpResponse; + +import java.util.UUID; +import java.util.concurrent.CountDownLatch; + +public class NewSessionRequest { + + private final RequestId requestId; + private final CountDownLatch latch; + private HttpResponse sessionResponse; + + public NewSessionRequest(RequestId requestId, CountDownLatch latch) { + this.requestId = requestId; + this.latch = latch; + } + + public CountDownLatch getLatch() { + return latch; + } + + public void setSessionResponse(HttpResponse sessionResponse) { + this.sessionResponse = sessionResponse; + } + + public HttpResponse getSessionResponse() { + return sessionResponse; + } + +} diff --git a/java/server/src/org/openqa/selenium/grid/data/NewSessionRequestEvent.java b/java/server/src/org/openqa/selenium/grid/data/NewSessionRequestEvent.java new file mode 100644 index 0000000000000..83a1063383fe8 --- /dev/null +++ b/java/server/src/org/openqa/selenium/grid/data/NewSessionRequestEvent.java @@ -0,0 +1,33 @@ +// Licensed to the Software Freedom Conservancy (SFC) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The SFC licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.openqa.selenium.grid.data; + +import org.openqa.selenium.events.Event; +import org.openqa.selenium.events.Type; +import org.openqa.selenium.remote.SessionId; + +import java.util.UUID; + +public class NewSessionRequestEvent extends Event { + + public static final Type NEW_SESSION_REQUEST = new Type("new-session-request"); + + public NewSessionRequestEvent(RequestId requestId) { + super(NEW_SESSION_REQUEST, requestId); + } +} diff --git a/java/server/src/org/openqa/selenium/grid/data/NewSessionResponse.java b/java/server/src/org/openqa/selenium/grid/data/NewSessionResponse.java new file mode 100644 index 0000000000000..be57f67f99389 --- /dev/null +++ b/java/server/src/org/openqa/selenium/grid/data/NewSessionResponse.java @@ -0,0 +1,92 @@ +// Licensed to the Software Freedom Conservancy (SFC) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The SFC licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.openqa.selenium.grid.data; + +import com.google.common.collect.ImmutableMap; + +import org.openqa.selenium.internal.Require; +import org.openqa.selenium.json.JsonInput; + +import java.util.Base64; +import java.util.Map; + +public class NewSessionResponse { + + private final RequestId requestId; + private final Session session; + private final byte[] downstreamEncodedResponse; + + public NewSessionResponse(RequestId requestId, Session session, + byte[] downstreamEncodedResponse) { + this.requestId = Require.nonNull("Request Id", requestId); + this.session = Require.nonNull("Session", session); + this.downstreamEncodedResponse = Require.nonNull + ("Downstream encoded response", downstreamEncodedResponse); + } + + public RequestId getRequestId() { + return requestId; + } + + public Session getSession() { + return session; + } + + public byte[] getDownstreamEncodedResponse() { + return downstreamEncodedResponse; + } + + private Map toJson() { + return ImmutableMap.of( + "requestId", requestId, + "session", session, + "downstreamEncodedResponse", Base64.getEncoder().encodeToString(downstreamEncodedResponse) + ); + } + + private static NewSessionResponse fromJson(JsonInput input) { + RequestId requestId = null; + Session session = null; + byte[] downstreamResponse = null; + + input.beginObject(); + while (input.hasNext()) { + switch (input.nextName()) { + case "requestId": + requestId = input.read(RequestId.class); + break; + + case "session": + session = input.read(Session.class); + break; + + case "downstreamEncodedResponse": + downstreamResponse = Base64.getDecoder().decode(input.nextString()); + break; + + default: + input.skipValue(); + break; + } + } + input.endObject(); + + return new NewSessionResponse(requestId, session, downstreamResponse); + } + +} diff --git a/java/server/src/org/openqa/selenium/grid/data/NewSessionResponseEvent.java b/java/server/src/org/openqa/selenium/grid/data/NewSessionResponseEvent.java new file mode 100644 index 0000000000000..d7bcf8552b589 --- /dev/null +++ b/java/server/src/org/openqa/selenium/grid/data/NewSessionResponseEvent.java @@ -0,0 +1,32 @@ +// Licensed to the Software Freedom Conservancy (SFC) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The SFC licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.openqa.selenium.grid.data; + +import org.openqa.selenium.events.Event; +import org.openqa.selenium.events.Type; + +import java.util.UUID; + +public class NewSessionResponseEvent extends Event { + + public static final Type NEW_SESSION_RESPONSE = new Type("new-session-response"); + + public NewSessionResponseEvent(NewSessionResponse sessionResponse) { + super(NEW_SESSION_RESPONSE, sessionResponse); + } +} diff --git a/java/server/src/org/openqa/selenium/grid/data/RequestId.java b/java/server/src/org/openqa/selenium/grid/data/RequestId.java new file mode 100644 index 0000000000000..de14b2df1929f --- /dev/null +++ b/java/server/src/org/openqa/selenium/grid/data/RequestId.java @@ -0,0 +1,65 @@ +// Licensed to the Software Freedom Conservancy (SFC) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The SFC licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.openqa.selenium.grid.data; + +import org.openqa.selenium.internal.Require; + +import java.util.Objects; +import java.util.UUID; + +public class RequestId { + + private final UUID uuid; + + public RequestId(UUID uuid) { + this.uuid = Require.nonNull("Request id", uuid); + } + + public UUID toUuid() { + return uuid; + } + + @Override + public String toString() { + return uuid.toString(); + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof RequestId)) { + return false; + } + + RequestId that = (RequestId) o; + return Objects.equals(this.uuid, that.uuid); + } + + @Override + public int hashCode() { + return Objects.hash(uuid); + } + + private Object toJson() { + return uuid; + } + + private static RequestId fromJson(UUID id) { + return new RequestId(id); + } + +} diff --git a/java/server/src/org/openqa/selenium/grid/sessionqueue/AddBackToSessionQueue.java b/java/server/src/org/openqa/selenium/grid/sessionqueue/AddBackToSessionQueue.java new file mode 100644 index 0000000000000..ef7667d6274c7 --- /dev/null +++ b/java/server/src/org/openqa/selenium/grid/sessionqueue/AddBackToSessionQueue.java @@ -0,0 +1,69 @@ +// Licensed to the Software Freedom Conservancy (SFC) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The SFC licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.openqa.selenium.grid.sessionqueue; + +import static org.openqa.selenium.remote.http.Contents.asJson; +import static org.openqa.selenium.remote.tracing.HttpTracing.newSpanAsChildOf; +import static org.openqa.selenium.remote.tracing.Tags.HTTP_REQUEST; +import static org.openqa.selenium.remote.tracing.Tags.HTTP_RESPONSE; + +import com.google.common.collect.ImmutableMap; + +import org.openqa.selenium.grid.data.RequestId; +import org.openqa.selenium.internal.Require; +import org.openqa.selenium.remote.http.HttpHandler; +import org.openqa.selenium.remote.http.HttpRequest; +import org.openqa.selenium.remote.http.HttpResponse; +import org.openqa.selenium.remote.tracing.AttributeKey; +import org.openqa.selenium.remote.tracing.Span; +import org.openqa.selenium.remote.tracing.Tracer; + +import java.util.UUID; + +class AddBackToSessionQueue implements HttpHandler { + + private final Tracer tracer; + private final NewSessionQueuer newSessionQueuer; + private final RequestId id; + + AddBackToSessionQueue(Tracer tracer, NewSessionQueuer newSessionQueuer, RequestId id) { + this.tracer = Require.nonNull("Tracer", tracer); + this.newSessionQueuer = Require.nonNull("New Session Queuer", newSessionQueuer); + this.id = id; + } + + @Override + public HttpResponse execute(HttpRequest req) { + try (Span span = newSpanAsChildOf(tracer, req, "sessionqueuer.retry")) { + HTTP_REQUEST.accept(span, req); + span.setAttribute(AttributeKey.REQUEST_ID.getKey(), id.toString()); + + boolean value = newSessionQueuer.retryAddToQueue(req, id); + + span.setAttribute("request.retry", value); + + HttpResponse response = new HttpResponse().setContent( + asJson(ImmutableMap.of("value", value))); + + HTTP_RESPONSE.accept(span, response); + + return response; + } + } +} + diff --git a/java/server/src/org/openqa/selenium/grid/sessionqueue/AddToSessionQueue.java b/java/server/src/org/openqa/selenium/grid/sessionqueue/AddToSessionQueue.java new file mode 100644 index 0000000000000..cc3e02394f8e6 --- /dev/null +++ b/java/server/src/org/openqa/selenium/grid/sessionqueue/AddToSessionQueue.java @@ -0,0 +1,54 @@ +// Licensed to the Software Freedom Conservancy (SFC) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The SFC licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.openqa.selenium.grid.sessionqueue; + +import static org.openqa.selenium.remote.tracing.HttpTracing.newSpanAsChildOf; +import static org.openqa.selenium.remote.tracing.Tags.HTTP_REQUEST; +import static org.openqa.selenium.remote.tracing.Tags.HTTP_RESPONSE; + +import org.openqa.selenium.internal.Require; +import org.openqa.selenium.remote.http.HttpHandler; +import org.openqa.selenium.remote.http.HttpRequest; +import org.openqa.selenium.remote.http.HttpResponse; +import org.openqa.selenium.remote.tracing.Span; +import org.openqa.selenium.remote.tracing.Tracer; + +class AddToSessionQueue implements HttpHandler { + + private final Tracer tracer; + private final NewSessionQueuer newSessionQueuer; + + AddToSessionQueue(Tracer tracer, NewSessionQueuer newSessionQueuer) { + this.tracer = Require.nonNull("Tracer", tracer); + this.newSessionQueuer = Require.nonNull("New Session Queuer", newSessionQueuer); + } + + @Override + public HttpResponse execute(HttpRequest req) { + try (Span span = newSpanAsChildOf(tracer, req, "sessionqueuer.add")) { + HTTP_REQUEST.accept(span, req); + + HttpResponse response = newSessionQueuer.addToQueue(req); + + HTTP_RESPONSE.accept(span, response); + + return response; + } + } +} + diff --git a/java/server/src/org/openqa/selenium/grid/sessionqueue/BUILD.bazel b/java/server/src/org/openqa/selenium/grid/sessionqueue/BUILD.bazel new file mode 100644 index 0000000000000..19dc555e5b036 --- /dev/null +++ b/java/server/src/org/openqa/selenium/grid/sessionqueue/BUILD.bazel @@ -0,0 +1,24 @@ +load("@rules_jvm_external//:defs.bzl", "artifact") + +java_library( + name = "sessionqueue", + srcs = glob(["*.java"]), + visibility = [ + "//java/server/src/org/openqa/selenium/grid:__subpackages__", + "//java/server/test/org/openqa/selenium/grid:__subpackages__", + ], + exports = [ + "//java/server/src/org/openqa/selenium/status", + ], + deps = [ + "//java/client/src/org/openqa/selenium:core", + "//java/client/src/org/openqa/selenium/json", + "//java/client/src/org/openqa/selenium/remote", + "//java/client/src/org/openqa/selenium/remote/http", + "//java/server/src/org/openqa/selenium/grid/data", + "//java/server/src/org/openqa/selenium/status", + "//java/server/src/org/openqa/selenium/events", + "//java/server/src/org/openqa/selenium/grid/log", + artifact("com.google.guava:guava"), + ], +) \ No newline at end of file diff --git a/java/server/src/org/openqa/selenium/grid/sessionqueue/ClearSessionQueue.java b/java/server/src/org/openqa/selenium/grid/sessionqueue/ClearSessionQueue.java new file mode 100644 index 0000000000000..dcf83283dfe90 --- /dev/null +++ b/java/server/src/org/openqa/selenium/grid/sessionqueue/ClearSessionQueue.java @@ -0,0 +1,65 @@ +package org.openqa.selenium.grid.sessionqueue; + +import static java.net.HttpURLConnection.HTTP_INTERNAL_ERROR; +import static org.openqa.selenium.remote.http.Contents.asJson; +import static org.openqa.selenium.remote.tracing.HttpTracing.newSpanAsChildOf; +import static org.openqa.selenium.remote.tracing.Tags.HTTP_REQUEST; +import static org.openqa.selenium.remote.tracing.Tags.HTTP_RESPONSE; + +import com.google.common.collect.ImmutableMap; + +import org.openqa.selenium.internal.Require; +import org.openqa.selenium.remote.http.HttpHandler; +import org.openqa.selenium.remote.http.HttpRequest; +import org.openqa.selenium.remote.http.HttpResponse; +import org.openqa.selenium.remote.tracing.Span; +import org.openqa.selenium.remote.tracing.Tracer; + +public class ClearSessionQueue implements HttpHandler { + + private final Tracer tracer; + private final NewSessionQueuer newSessionQueuer; + + ClearSessionQueue(Tracer tracer, NewSessionQueuer newSessionQueuer) { + this.tracer = Require.nonNull("Tracer", tracer); + this.newSessionQueuer = Require.nonNull("New Session Queuer", newSessionQueuer); + } + + @Override + public HttpResponse execute(HttpRequest req) { + Span span = newSpanAsChildOf(tracer, req, "sessionqueuer.clear"); + HTTP_REQUEST.accept(span, req); + + try { + int value = newSessionQueuer.clearQueue(); + span.setAttribute("cleared", value); + + HttpResponse response = new HttpResponse(); + if (value != 0) { + response.setContent( + asJson(ImmutableMap.of("value", value, + "message", "Cleared the new session request queue", + "cleared_requests", value))); + } else { + response.setContent( + asJson(ImmutableMap.of("value", value, + "message", + "New session request queue empty. Nothing to clear."))); + } + + span.setAttribute("requests.cleared", value); + HTTP_RESPONSE.accept(span, response); + return response; + } catch (Exception e) { + HttpResponse response = new HttpResponse().setStatus((HTTP_INTERNAL_ERROR)).setContent( + asJson(ImmutableMap.of("value", 0, + "message", + "Error while clearing the queue. Full queue may not have been cleared."))); + + HTTP_RESPONSE.accept(span, response); + return response; + } finally { + span.close(); + } + } +} diff --git a/java/server/src/org/openqa/selenium/grid/sessionqueue/GetNewSessionResponse.java b/java/server/src/org/openqa/selenium/grid/sessionqueue/GetNewSessionResponse.java new file mode 100644 index 0000000000000..eb1cdf4abdd1d --- /dev/null +++ b/java/server/src/org/openqa/selenium/grid/sessionqueue/GetNewSessionResponse.java @@ -0,0 +1,153 @@ +// Licensed to the Software Freedom Conservancy (SFC) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The SFC licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.openqa.selenium.grid.sessionqueue; + +import static java.net.HttpURLConnection.HTTP_INTERNAL_ERROR; +import static org.openqa.selenium.grid.data.NewSessionRejectedEvent.NEW_SESSION_REJECTED; +import static org.openqa.selenium.grid.data.NewSessionResponseEvent.NEW_SESSION_RESPONSE; +import static org.openqa.selenium.remote.http.Contents.asJson; +import static org.openqa.selenium.remote.http.Contents.bytes; + +import com.google.common.collect.ImmutableMap; + +import org.openqa.selenium.events.EventBus; +import org.openqa.selenium.grid.data.NewSessionErrorResponse; +import org.openqa.selenium.grid.data.NewSessionRequest; +import org.openqa.selenium.grid.data.NewSessionResponse; +import org.openqa.selenium.grid.data.RequestId; +import org.openqa.selenium.internal.Require; +import org.openqa.selenium.remote.http.HttpRequest; +import org.openqa.selenium.remote.http.HttpResponse; +import org.openqa.selenium.remote.tracing.Tracer; + +import java.util.Map; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.logging.Level; +import java.util.logging.Logger; + +public class GetNewSessionResponse { + + private static final Logger LOG = Logger.getLogger(GetNewSessionResponse.class.getName()); + private final EventBus bus; + private final Tracer tracer; + private final NewSessionQueue sessionRequests; + private final Map knownRequests = new ConcurrentHashMap<>(); + + public GetNewSessionResponse(Tracer tracer, EventBus bus, + NewSessionQueue sessionRequests) { + this.tracer = Require.nonNull("Tracer", tracer); + this.bus = Require.nonNull("Event bus", bus); + this.sessionRequests = Require.nonNull("New Session Request Queue", sessionRequests); + + this.bus.addListener(NEW_SESSION_RESPONSE, event -> { + try { + NewSessionResponse sessionResponse = event.getData(NewSessionResponse.class); + this.setResponse(sessionResponse); + } catch (Exception ignore) { + // Ignore any exception. Do not want to block the eventbus thread. + } + }); + + this.bus.addListener(NEW_SESSION_REJECTED, event -> { + try { + NewSessionErrorResponse sessionResponse = event.getData(NewSessionErrorResponse.class); + this.setErrorResponse(sessionResponse); + } catch (Exception ignore) { + // Ignore any exception. Do not want to block the eventbus thread. + } + }); + } + + private void setResponse(NewSessionResponse sessionResponse) { + // Each thread will get its own CountDownLatch and it is stored in the Map using request id as the key. + // EventBus thread will retrieve the same request and set it's response and unblock waiting request thread. + RequestId id = sessionResponse.getRequestId(); + Optional sessionRequest = Optional.ofNullable(knownRequests.get(id)); + + if (sessionRequest.isPresent()) { + NewSessionRequest request = sessionRequest.get(); + request.setSessionResponse( + new HttpResponse().setContent(bytes(sessionResponse.getDownstreamEncodedResponse()))); + request.getLatch().countDown(); + } + } + + private void setErrorResponse(NewSessionErrorResponse sessionResponse) { + RequestId id = sessionResponse.getRequestId(); + Optional sessionRequest = Optional.ofNullable(knownRequests.get(id)); + + // There could be a situation where the session request in the queue is scheduled for retry. + // Meanwhile the request queue is cleared. + // This will fire a error response event and remove the request id from the knownRequests map. + // Another error response event will be fired by the Distributor when the request is retried. + // Since a response is already provided for the request, the event listener should not take any action. + + if (sessionRequest.isPresent()) { + NewSessionRequest request = sessionRequest.get(); + request + .setSessionResponse(new HttpResponse() + .setStatus(HTTP_INTERNAL_ERROR) + .setContent(asJson( + ImmutableMap.of("message", sessionResponse.getMessage())))); + request.getLatch().countDown(); + } + } + + public HttpResponse add(HttpRequest request) { + Require.nonNull("New Session request", request); + + CountDownLatch latch = new CountDownLatch(1); + UUID uuid = UUID.randomUUID(); + RequestId requestId = new RequestId(uuid); + NewSessionRequest requestIdentifier = new NewSessionRequest(requestId, latch); + knownRequests.put(requestId, requestIdentifier); + + if (!sessionRequests.offerLast(request, requestId)) { + return new HttpResponse() + .setStatus(HTTP_INTERNAL_ERROR) + .setContent(asJson(ImmutableMap.of("message", + "Session request could not be created. Error while adding to the session queue."))); + } + + try { + // Block until response is received. + // This will not wait indefinitely due to request timeout handled by the LocalDistributor. + latch.await(); + HttpResponse res = requestIdentifier.getSessionResponse(); + return res; + } catch (InterruptedException e) { + LOG.log(Level.WARNING, "The thread waiting for new session response interrupted. {0}", + e.getMessage()); + Thread.currentThread().interrupt(); + + return new HttpResponse() + .setStatus(HTTP_INTERNAL_ERROR) + .setContent(asJson(ImmutableMap.of("message", + "Session request could not be created. Error while processing the session request."))); + } finally { + removeRequest(requestId); + } + } + + private void removeRequest(RequestId id) { + knownRequests.remove(id); + } +} diff --git a/java/server/src/org/openqa/selenium/grid/sessionqueue/NewSessionQueue.java b/java/server/src/org/openqa/selenium/grid/sessionqueue/NewSessionQueue.java new file mode 100644 index 0000000000000..ebe13bf887d61 --- /dev/null +++ b/java/server/src/org/openqa/selenium/grid/sessionqueue/NewSessionQueue.java @@ -0,0 +1,61 @@ +// Licensed to the Software Freedom Conservancy (SFC) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The SFC licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.openqa.selenium.grid.sessionqueue; + +import org.openqa.selenium.grid.data.RequestId; +import org.openqa.selenium.internal.Require; + +import org.openqa.selenium.remote.http.HttpRequest; +import org.openqa.selenium.remote.tracing.Tracer; +import org.openqa.selenium.status.HasReadyState; + +import java.time.Duration; +import java.time.Instant; +import java.util.Optional; + +public abstract class NewSessionQueue implements HasReadyState { + + protected final Tracer tracer; + + protected final Duration retryInterval; + + public static final String SESSIONREQUEST_TIMESTAMP_HEADER = "new-session-request-timestamp"; + + public static final String SESSIONREQUEST_ID_HEADER = "request-id"; + + public abstract boolean offerLast(HttpRequest request, RequestId requestId); + + public abstract boolean offerFirst(HttpRequest request, RequestId requestId); + + public abstract Optional poll(); + + public abstract int clear(); + + public void addRequestHeaders(HttpRequest request, RequestId reqId) { + long timestamp = Instant.now().getEpochSecond(); + request.addHeader(SESSIONREQUEST_TIMESTAMP_HEADER, Long.toString(timestamp)); + + request.addHeader(SESSIONREQUEST_ID_HEADER, reqId.toString()); + } + + public NewSessionQueue(Tracer tracer, Duration retryInterval) { + this.tracer = Require.nonNull("Tracer", tracer); + this.retryInterval = Require.nonNull("Session request retry interval", retryInterval); + } + +} diff --git a/java/server/src/org/openqa/selenium/grid/sessionqueue/NewSessionQueuer.java b/java/server/src/org/openqa/selenium/grid/sessionqueue/NewSessionQueuer.java new file mode 100644 index 0000000000000..de9276101e727 --- /dev/null +++ b/java/server/src/org/openqa/selenium/grid/sessionqueue/NewSessionQueuer.java @@ -0,0 +1,128 @@ +// Licensed to the Software Freedom Conservancy (SFC) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The SFC licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.openqa.selenium.grid.sessionqueue; + +import static org.openqa.selenium.remote.http.Contents.reader; +import static org.openqa.selenium.remote.http.Route.combine; +import static org.openqa.selenium.remote.http.Route.delete; +import static org.openqa.selenium.remote.http.Route.post; +import static org.openqa.selenium.remote.tracing.Tags.EXCEPTION; + +import org.openqa.selenium.Capabilities; +import org.openqa.selenium.SessionNotCreatedException; +import org.openqa.selenium.grid.data.RequestId; +import org.openqa.selenium.internal.Require; +import org.openqa.selenium.remote.NewSessionPayload; +import org.openqa.selenium.remote.http.HttpRequest; +import org.openqa.selenium.remote.http.HttpResponse; +import org.openqa.selenium.remote.http.Routable; +import org.openqa.selenium.remote.http.Route; +import org.openqa.selenium.remote.tracing.AttributeKey; +import org.openqa.selenium.remote.tracing.EventAttribute; +import org.openqa.selenium.remote.tracing.EventAttributeValue; +import org.openqa.selenium.remote.tracing.Span; +import org.openqa.selenium.remote.tracing.Tracer; +import org.openqa.selenium.status.HasReadyState; + +import java.io.IOException; +import java.io.Reader; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.UUID; +import java.util.logging.Logger; + +public abstract class NewSessionQueuer implements HasReadyState, Routable { + + private static final Logger LOG = Logger.getLogger(NewSessionQueuer.class.getName()); + private final Route routes; + protected final Tracer tracer; + + protected NewSessionQueuer(Tracer tracer) { + this.tracer = Require.nonNull("Tracer", tracer); + + routes = combine( + post("/session") + .to(() -> this::addToQueue), + post("/se/grid/newsessionqueuer/session") + .to(() -> new AddToSessionQueue(tracer, this)), + post("/se/grid/newsessionqueuer/session/retry/{requestId}") + .to(params -> new AddBackToSessionQueue(tracer, this, + new RequestId( + UUID.fromString(params.get("requestId"))))), + Route.get("/se/grid/newsessionqueuer/session") + .to(() -> new RemoveFromSessionQueue(tracer, this)), + delete("/se/grid/newsessionqueuer/queue") + .to(() -> new ClearSessionQueue(tracer, this))); + } + + public void validateSessionRequest(HttpRequest request) { + try (Span span = tracer.getCurrentContext().createSpan("newsession_queuer.validate")) { + Map attributeMap = new HashMap<>(); + try ( + Reader reader = reader(request); + NewSessionPayload payload = NewSessionPayload.create(reader)) { + Objects.requireNonNull(payload, "Requests to process must be set."); + attributeMap.put("request.payload", EventAttribute.setValue(payload.toString())); + + Iterator iterator = payload.stream().iterator(); + if (!iterator.hasNext()) { + SessionNotCreatedException + exception = + new SessionNotCreatedException("No capabilities found"); + EXCEPTION.accept(attributeMap, exception); + attributeMap.put(AttributeKey.EXCEPTION_MESSAGE.getKey(), + EventAttribute.setValue(exception.getMessage())); + span.addEvent(AttributeKey.EXCEPTION_EVENT.getKey(), attributeMap); + throw exception; + } + } catch (IOException e) { + SessionNotCreatedException exception = new SessionNotCreatedException(e.getMessage(), e); + EXCEPTION.accept(attributeMap, exception); + attributeMap.put(AttributeKey.EXCEPTION_MESSAGE.getKey(), + EventAttribute.setValue( + "IOException while reading the request payload. " + exception + .getMessage())); + span.addEvent(AttributeKey.EXCEPTION_EVENT.getKey(), attributeMap); + throw exception; + } + } + } + + public abstract HttpResponse addToQueue(HttpRequest request); + + public abstract boolean retryAddToQueue(HttpRequest request, RequestId reqId); + + public abstract Optional remove(); + + public abstract int clearQueue(); + + @Override + public boolean matches(HttpRequest req) { + return routes.matches(req); + } + + @Override + public HttpResponse execute(HttpRequest req) { + return routes.execute(req); + } + +} + diff --git a/java/server/src/org/openqa/selenium/grid/sessionqueue/RemoveFromSessionQueue.java b/java/server/src/org/openqa/selenium/grid/sessionqueue/RemoveFromSessionQueue.java new file mode 100644 index 0000000000000..22344fa63946f --- /dev/null +++ b/java/server/src/org/openqa/selenium/grid/sessionqueue/RemoveFromSessionQueue.java @@ -0,0 +1,66 @@ +// Licensed to the Software Freedom Conservancy (SFC) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The SFC licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.openqa.selenium.grid.sessionqueue; + +import static java.net.HttpURLConnection.HTTP_NO_CONTENT; +import static org.openqa.selenium.remote.tracing.HttpTracing.newSpanAsChildOf; +import static org.openqa.selenium.remote.tracing.Tags.HTTP_REQUEST; +import static org.openqa.selenium.remote.tracing.Tags.HTTP_RESPONSE; + +import org.openqa.selenium.internal.Require; +import org.openqa.selenium.remote.http.HttpHandler; +import org.openqa.selenium.remote.http.HttpRequest; +import org.openqa.selenium.remote.http.HttpResponse; +import org.openqa.selenium.remote.tracing.Span; +import org.openqa.selenium.remote.tracing.Tracer; + +import java.util.Optional; + +public class RemoveFromSessionQueue implements HttpHandler { + + private final Tracer tracer; + private final NewSessionQueuer newSessionQueuer; + + RemoveFromSessionQueue(Tracer tracer, NewSessionQueuer newSessionQueuer) { + this.tracer = Require.nonNull("Tracer", tracer); + this.newSessionQueuer = Require.nonNull("New Session Queuer", newSessionQueuer); + } + + @Override + public HttpResponse execute(HttpRequest req) { + try (Span span = newSpanAsChildOf(tracer, req, "sessionqueuer.remove")) { + HTTP_REQUEST.accept(span, req); + + Optional sessionRequest = newSessionQueuer.remove(); + HttpResponse response = new HttpResponse(); + + if (sessionRequest.isPresent()) { + HttpRequest request = sessionRequest.get(); + response.setContent(request.getContent()); + response.setHeader(NewSessionQueue.SESSIONREQUEST_TIMESTAMP_HEADER, + request.getHeader(NewSessionQueue.SESSIONREQUEST_TIMESTAMP_HEADER)); + } else { + response.setStatus(HTTP_NO_CONTENT); + } + + HTTP_RESPONSE.accept(span, response); + return response; + } + } +} + diff --git a/java/server/src/org/openqa/selenium/grid/sessionqueue/config/BUILD.bazel b/java/server/src/org/openqa/selenium/grid/sessionqueue/config/BUILD.bazel new file mode 100644 index 0000000000000..be08e0167d3d0 --- /dev/null +++ b/java/server/src/org/openqa/selenium/grid/sessionqueue/config/BUILD.bazel @@ -0,0 +1,16 @@ +load("@rules_jvm_external//:defs.bzl", "artifact") + +java_library( + name = "config", + srcs = glob(["*.java"]), + visibility = [ + "//java/server/src/org/openqa/selenium/grid:__subpackages__", + ], + deps = [ + "//java:auto-service", + "//java/server/src/org/openqa/selenium/grid/config", + "//java/server/src/org/openqa/selenium/grid/sessionqueue", + artifact("com.beust:jcommander"), + artifact("io.opentelemetry:opentelemetry-api"), + ], +) diff --git a/java/server/src/org/openqa/selenium/grid/sessionqueue/config/NewSessionQueueFlags.java b/java/server/src/org/openqa/selenium/grid/sessionqueue/config/NewSessionQueueFlags.java new file mode 100644 index 0000000000000..9060903b53239 --- /dev/null +++ b/java/server/src/org/openqa/selenium/grid/sessionqueue/config/NewSessionQueueFlags.java @@ -0,0 +1,54 @@ +// Licensed to the Software Freedom Conservancy (SFC) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The SFC licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.openqa.selenium.grid.sessionqueue.config; + +import static org.openqa.selenium.grid.config.StandardGridRoles.SESSION_QUEUE_ROLE; + +import com.google.auto.service.AutoService; + +import com.beust.jcommander.Parameter; + +import org.openqa.selenium.grid.config.ConfigValue; +import org.openqa.selenium.grid.config.HasRoles; +import org.openqa.selenium.grid.config.Role; + +import java.util.Collections; +import java.util.Set; + +@AutoService(HasRoles.class) +public class NewSessionQueueFlags implements HasRoles { + + @Parameter( + names = { "--session-request-timeout" }, + description = "Timeout in seconds. New incoming session request is added to the queue. " + + "Requests sitting in the queue for longer than the configured time will timeout.") + @ConfigValue(section = "sessionqueue", name = "session-request-timeout", example = "5") + private int sessionRequestTimeout = 300; + + @Parameter( + names = { "--session-retry-interval" }, + description = "Retry interval in seconds. If all slots are busy, new session request " + + "will be retried after the given interval.") + @ConfigValue(section = "sessionqueue", name = "session-retry-interval", example = "5") + private int sessionRetryInterval = 5; + + @Override + public Set getRoles() { + return Collections.singleton(SESSION_QUEUE_ROLE); + } +} diff --git a/java/server/src/org/openqa/selenium/grid/sessionqueue/config/NewSessionQueueOptions.java b/java/server/src/org/openqa/selenium/grid/sessionqueue/config/NewSessionQueueOptions.java new file mode 100644 index 0000000000000..3e64ac6f73159 --- /dev/null +++ b/java/server/src/org/openqa/selenium/grid/sessionqueue/config/NewSessionQueueOptions.java @@ -0,0 +1,64 @@ +// Licensed to the Software Freedom Conservancy (SFC) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The SFC licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.openqa.selenium.grid.sessionqueue.config; + +import org.openqa.selenium.grid.config.Config; +import org.openqa.selenium.grid.sessionqueue.NewSessionQueue; + +import java.time.Duration; + +public class NewSessionQueueOptions { + + private static final String SESSIONS_QUEUE_SECTION = "sessionqueue"; + private static final String + DEFAULT_NEWSESSION_QUEUE = "org.openqa.selenium.grid.sessionmap.remote.LocalNewSessionQueue"; + + private final Config config; + private final int DEFAULT_REQUEST_TIMEOUT = 300; + private final int DEFAULT_RETRY_INTERVAL = 5; + + public NewSessionQueueOptions(Config config) { + this.config = config; + } + + public Duration getSessionRequestTimeout() { + int timeout = config.getInt(SESSIONS_QUEUE_SECTION, "session-request-timeout") + .orElse(DEFAULT_REQUEST_TIMEOUT); + + if (timeout <= 0) { + return Duration.ofSeconds(DEFAULT_REQUEST_TIMEOUT); + } + return Duration.ofSeconds(timeout); + } + + public Duration getSessionRequestRetryInterval() { + int interval = config.getInt(SESSIONS_QUEUE_SECTION, "session-retry-interval") + .orElse(DEFAULT_RETRY_INTERVAL); + + if (interval <= 0) { + return Duration.ofSeconds(DEFAULT_RETRY_INTERVAL); + } + return Duration.ofSeconds(interval); + } + + public NewSessionQueue getSessionQueue() { + return config + .getClass(SESSIONS_QUEUE_SECTION, "implementation", NewSessionQueue.class, + DEFAULT_NEWSESSION_QUEUE); + } +} diff --git a/java/server/src/org/openqa/selenium/grid/sessionqueue/config/NewSessionQueuerFlags.java b/java/server/src/org/openqa/selenium/grid/sessionqueue/config/NewSessionQueuerFlags.java new file mode 100644 index 0000000000000..05d2ca1cdfbd0 --- /dev/null +++ b/java/server/src/org/openqa/selenium/grid/sessionqueue/config/NewSessionQueuerFlags.java @@ -0,0 +1,59 @@ +// Licensed to the Software Freedom Conservancy (SFC) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The SFC licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.openqa.selenium.grid.sessionqueue.config; + +import static org.openqa.selenium.grid.config.StandardGridRoles.SESSION_QUEUER_ROLE; + +import com.google.auto.service.AutoService; + +import com.beust.jcommander.Parameter; + +import org.openqa.selenium.grid.config.ConfigValue; +import org.openqa.selenium.grid.config.HasRoles; +import org.openqa.selenium.grid.config.Role; + +import java.net.URI; +import java.util.Collections; +import java.util.Set; + +@AutoService(HasRoles.class) +public class NewSessionQueuerFlags implements HasRoles { + + @Parameter( + names = { "-sq", "--sessionqueuer" }, + description = "Address of the session queue server.") + @ConfigValue(section = "sessionqueuer", name = "host", example = "\"http://localhost:1237\"") + private URI sessionQueueServer; + + @Parameter( + names = "--sessionqueuer-port", + description = "Port on which the session queue server is listening.") + @ConfigValue(section = "sessionqueuer", name = "port", example = "1234") + private int sessionQueueServerPort; + + @Parameter( + names = "--sessionqueuer-host", + description = "Host on which the session queue server is listening.") + @ConfigValue(section = "sessionqueuer", name = "hostname", example = "\"localhost\"") + private String sessionQueueServerHost; + + @Override + public Set getRoles() { + return Collections.singleton(SESSION_QUEUER_ROLE); + } +} diff --git a/java/server/src/org/openqa/selenium/grid/sessionqueue/config/NewSessionQueuerOptions.java b/java/server/src/org/openqa/selenium/grid/sessionqueue/config/NewSessionQueuerOptions.java new file mode 100644 index 0000000000000..5f0a528ca559e --- /dev/null +++ b/java/server/src/org/openqa/selenium/grid/sessionqueue/config/NewSessionQueuerOptions.java @@ -0,0 +1,80 @@ +// Licensed to the Software Freedom Conservancy (SFC) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The SFC licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.openqa.selenium.grid.sessionqueue.config; + +import org.openqa.selenium.grid.config.Config; +import org.openqa.selenium.grid.config.ConfigException; +import org.openqa.selenium.grid.sessionqueue.NewSessionQueuer; + +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Optional; + +public class NewSessionQueuerOptions { + + private static final String SESSION_QUEUER_SECTION = "sessionqueuer"; + + private final Config config; + + public NewSessionQueuerOptions(Config config) { + this.config = config; + } + + public URI getSessionQueuerUri() { + + Optional host = config.get(SESSION_QUEUER_SECTION, "host").map(str -> { + try { + return new URI(str); + } catch (URISyntaxException e) { + throw new ConfigException("Session queuer server URI is not a valid URI: " + str); + } + }); + + if (host.isPresent()) { + return host.get(); + } + + Optional port = config.getInt(SESSION_QUEUER_SECTION, "port"); + Optional hostname = config.get(SESSION_QUEUER_SECTION, "hostname"); + + if (!(port.isPresent() && hostname.isPresent())) { + throw new ConfigException("Unable to determine host and port for the session queuer server"); + } + + try { + return new URI( + "http", + null, + hostname.get(), + port.get(), + "", + null, + null); + } catch (URISyntaxException e) { + throw new ConfigException( + "Session queuer server uri configured through host (%s) and port (%d) is not a valid URI", + hostname.get(), + port.get()); + } + } + + public NewSessionQueuer getSessionQueuer(String implementation) { + return config + .getClass(SESSION_QUEUER_SECTION, "implementation", NewSessionQueuer.class, implementation); + } +} diff --git a/java/server/src/org/openqa/selenium/grid/sessionqueue/httpd/BUILD.bazel b/java/server/src/org/openqa/selenium/grid/sessionqueue/httpd/BUILD.bazel new file mode 100644 index 0000000000000..4bf4d70298e95 --- /dev/null +++ b/java/server/src/org/openqa/selenium/grid/sessionqueue/httpd/BUILD.bazel @@ -0,0 +1,25 @@ +load("@rules_jvm_external//:defs.bzl", "artifact") + +java_library( + name = "httpd", + srcs = glob(["*.java"]), + visibility = [ + "//java/server/src/org/openqa/selenium/grid:__pkg__", + "//java/server/test/org/openqa/selenium/grid/router:__pkg__", + ], + deps = [ + "//java:auto-service", + "//java/client/src/org/openqa/selenium:core", + "//java/client/src/org/openqa/selenium/json", + "//java/server/src/org/openqa/selenium/cli", + "//java/server/src/org/openqa/selenium/grid:base-command", + "//java/server/src/org/openqa/selenium/grid/config", + "//java/server/src/org/openqa/selenium/grid/server", + "//java/server/src/org/openqa/selenium/grid/sessionqueue", + "//java/server/src/org/openqa/selenium/grid/sessionqueue/local", + "//java/server/src/org/openqa/selenium/grid/sessionqueue/config", + "//java/server/src/org/openqa/selenium/netty/server", + artifact("com.beust:jcommander"), + artifact("com.google.guava:guava"), + ], +) diff --git a/java/server/src/org/openqa/selenium/grid/sessionqueue/httpd/DefaultNewSessionQueuerConfig.java b/java/server/src/org/openqa/selenium/grid/sessionqueue/httpd/DefaultNewSessionQueuerConfig.java new file mode 100644 index 0000000000000..85c9b1ad1f294 --- /dev/null +++ b/java/server/src/org/openqa/selenium/grid/sessionqueue/httpd/DefaultNewSessionQueuerConfig.java @@ -0,0 +1,34 @@ +// Licensed to the Software Freedom Conservancy (SFC) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The SFC licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.openqa.selenium.grid.sessionqueue.httpd; + + import com.google.common.collect.ImmutableMap; + + import org.openqa.selenium.grid.config.MapConfig; + +class DefaultNewSessionQueuerConfig extends MapConfig { + + DefaultNewSessionQueuerConfig() { + super(ImmutableMap.of( + "events", ImmutableMap.of( + "publish", "tcp://*:4442", + "subscribe", "tcp://*:4443"), + "server", ImmutableMap.of( + "port", 5559))); + } +} diff --git a/java/server/src/org/openqa/selenium/grid/sessionqueue/httpd/NewSessionQueuerServer.java b/java/server/src/org/openqa/selenium/grid/sessionqueue/httpd/NewSessionQueuerServer.java new file mode 100644 index 0000000000000..55d139278816e --- /dev/null +++ b/java/server/src/org/openqa/selenium/grid/sessionqueue/httpd/NewSessionQueuerServer.java @@ -0,0 +1,114 @@ +// Licensed to the Software Freedom Conservancy (SFC) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The SFC licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.openqa.selenium.grid.sessionqueue.httpd; + +import static java.net.HttpURLConnection.HTTP_NO_CONTENT; +import static org.openqa.selenium.grid.config.StandardGridRoles.EVENT_BUS_ROLE; +import static org.openqa.selenium.grid.config.StandardGridRoles.HTTPD_ROLE; +import static org.openqa.selenium.grid.config.StandardGridRoles.SESSION_QUEUER_ROLE; +import static org.openqa.selenium.grid.config.StandardGridRoles.SESSION_QUEUE_ROLE; +import static org.openqa.selenium.json.Json.JSON_UTF_8; +import static org.openqa.selenium.remote.http.Contents.asJson; +import static org.openqa.selenium.remote.http.Route.get; + +import com.google.auto.service.AutoService; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; + +import org.openqa.selenium.BuildInfo; +import org.openqa.selenium.cli.CliCommand; +import org.openqa.selenium.grid.TemplateGridCommand; +import org.openqa.selenium.grid.config.Config; +import org.openqa.selenium.grid.config.Role; + +import org.openqa.selenium.grid.server.BaseServerOptions; +import org.openqa.selenium.grid.server.Server; +import org.openqa.selenium.grid.sessionqueue.NewSessionQueuer; +import org.openqa.selenium.grid.sessionqueue.config.NewSessionQueuerOptions; +import org.openqa.selenium.netty.server.NettyServer; +import org.openqa.selenium.remote.http.HttpResponse; +import org.openqa.selenium.remote.http.Route; + +import java.util.Collections; +import java.util.Set; +import java.util.logging.Logger; + +@AutoService(CliCommand.class) +public class NewSessionQueuerServer extends TemplateGridCommand { + + private static final Logger LOG = Logger.getLogger(NewSessionQueuerServer.class.getName()); + private static final String + LOCAL_NEWSESSION_QUEUER = "org.openqa.selenium.grid.sessionqueue.local.LocalNewSessionQueuer"; + + @Override + public String getName() { + return "sessionqueuer"; + } + + @Override + public String getDescription() { + return "Adds this server as the new session queue in a selenium grid."; + } + + @Override + public Set getConfigurableRoles() { + return ImmutableSet.of(EVENT_BUS_ROLE, HTTPD_ROLE, SESSION_QUEUER_ROLE, SESSION_QUEUE_ROLE); + } + + @Override + public Set getFlagObjects() { + return Collections.emptySet(); + } + + @Override + protected String getSystemPropertiesConfigPrefix() { + return "sessionqueuer"; + } + + @Override + protected Config getDefaultConfig() { + return new DefaultNewSessionQueuerConfig(); + } + + @Override + protected void execute(Config config) { + BaseServerOptions serverOptions = new BaseServerOptions(config); + NewSessionQueuerOptions queuerOptions = new NewSessionQueuerOptions(config); + + NewSessionQueuer sessionQueuer = queuerOptions.getSessionQueuer(LOCAL_NEWSESSION_QUEUER); + + Server server = new NettyServer(serverOptions, Route.combine( + sessionQueuer, + get("/status").to(() -> req -> + new HttpResponse() + .addHeader("Content-Type", JSON_UTF_8) + .setContent(asJson( + ImmutableMap.of("value", ImmutableMap.of( + "ready", true, + "message", "New Session Queuer is ready."))))), + get("/readyz").to(() -> req -> new HttpResponse().setStatus(HTTP_NO_CONTENT)))); + server.start(); + + BuildInfo info = new BuildInfo(); + LOG.info(String.format( + "Started Selenium New Session Queuer %s (revision %s): %s", + info.getReleaseLabel(), + info.getBuildRevision(), + server.getUrl())); + } +} diff --git a/java/server/src/org/openqa/selenium/grid/sessionqueue/local/BUILD.bazel b/java/server/src/org/openqa/selenium/grid/sessionqueue/local/BUILD.bazel new file mode 100644 index 0000000000000..7d6533361bb4b --- /dev/null +++ b/java/server/src/org/openqa/selenium/grid/sessionqueue/local/BUILD.bazel @@ -0,0 +1,21 @@ +load("@rules_jvm_external//:defs.bzl", "artifact") +java_library( + name = "local", + srcs = glob(["*.java"]), + visibility = [ + "//java/server/src/org/openqa/selenium/grid/commands:__pkg__", + "//java/server/src/org/openqa/selenium/grid:__subpackages__", + "//java/server/test/org/openqa/selenium/grid:__subpackages__", + ], + deps = [ + "//java/client/src/org/openqa/selenium/remote", + "//java/server/src/org/openqa/selenium/events", + "//java/server/src/org/openqa/selenium/grid/config", + "//java/server/src/org/openqa/selenium/grid/data", + "//java/server/src/org/openqa/selenium/grid/log", + "//java/server/src/org/openqa/selenium/grid/server", + "//java/server/src/org/openqa/selenium/grid/sessionqueue", + "//java/server/src/org/openqa/selenium/grid/sessionqueue/config", + artifact("com.google.guava:guava"), + ], +) diff --git a/java/server/src/org/openqa/selenium/grid/sessionqueue/local/LocalNewSessionQueue.java b/java/server/src/org/openqa/selenium/grid/sessionqueue/local/LocalNewSessionQueue.java new file mode 100644 index 0000000000000..8a5cdf1a0a59e --- /dev/null +++ b/java/server/src/org/openqa/selenium/grid/sessionqueue/local/LocalNewSessionQueue.java @@ -0,0 +1,171 @@ +// Licensed to the Software Freedom Conservancy (SFC) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The SFC licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.openqa.selenium.grid.sessionqueue.local; + +import org.openqa.selenium.events.EventBus; +import org.openqa.selenium.grid.config.Config; +import org.openqa.selenium.grid.data.NewSessionErrorResponse; +import org.openqa.selenium.grid.data.NewSessionRejectedEvent; +import org.openqa.selenium.grid.data.NewSessionRequestEvent; +import org.openqa.selenium.grid.data.RequestId; +import org.openqa.selenium.grid.log.LoggingOptions; +import org.openqa.selenium.grid.server.EventBusOptions; +import org.openqa.selenium.grid.sessionqueue.NewSessionQueue; +import org.openqa.selenium.grid.sessionqueue.config.NewSessionQueueOptions; +import org.openqa.selenium.internal.Require; +import org.openqa.selenium.remote.http.HttpRequest; +import org.openqa.selenium.remote.tracing.AttributeKey; +import org.openqa.selenium.remote.tracing.EventAttribute; +import org.openqa.selenium.remote.tracing.EventAttributeValue; +import org.openqa.selenium.remote.tracing.Span; +import org.openqa.selenium.remote.tracing.Tracer; + +import java.time.Duration; +import java.util.Deque; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.logging.Level; +import java.util.logging.Logger; + +public class LocalNewSessionQueue extends NewSessionQueue { + + private static final Logger LOG = Logger.getLogger(LocalNewSessionQueue.class.getName()); + private final EventBus bus; + private final Deque sessionRequests = new ConcurrentLinkedDeque<>(); + private final ReadWriteLock lock = new ReentrantReadWriteLock(true); + private final ScheduledExecutorService executorService = Executors + .newSingleThreadScheduledExecutor(); + private final Thread shutdownHook = new Thread(this::callExecutorShutdown); + + public LocalNewSessionQueue(Tracer tracer, EventBus bus, Duration retryInterval) { + super(tracer, retryInterval); + this.bus = Require.nonNull("Event bus", bus); + Runtime.getRuntime().addShutdownHook(shutdownHook); + } + + public static NewSessionQueue create(Config config) { + Tracer tracer = new LoggingOptions(config).getTracer(); + EventBus bus = new EventBusOptions(config).getEventBus(); + Duration retryInterval = new NewSessionQueueOptions(config).getSessionRequestRetryInterval(); + return new LocalNewSessionQueue(tracer, bus, retryInterval); + } + + @Override + public boolean isReady() { + return bus.isReady(); + } + + @Override + public boolean offerLast(HttpRequest request, RequestId requestId) { + Require.nonNull("New Session request", request); + Lock writeLock = lock.writeLock(); + writeLock.lock(); + + Span span = tracer.getCurrentContext().createSpan("local_sessionqueue.add"); + Map attributeMap = new HashMap<>(); + attributeMap.put(AttributeKey.LOGGER_CLASS.getKey(), + EventAttribute.setValue(getClass().getName())); + boolean added = false; + try { + added = sessionRequests.offerLast(request); + addRequestHeaders(request, requestId); + + attributeMap + .put(AttributeKey.REQUEST_ID.getKey(), EventAttribute.setValue(requestId.toString())); + attributeMap.put("request.added", EventAttribute.setValue(added)); + span.addEvent("Add new session request to the queue", attributeMap); + + return added; + } finally { + writeLock.unlock(); + span.close(); + if (added) { + bus.fire(new NewSessionRequestEvent(requestId)); + } + } + } + + @Override + public boolean offerFirst(HttpRequest request, RequestId requestId) { + Require.nonNull("New Session request", request); + Lock writeLock = lock.writeLock(); + writeLock.lock(); + boolean added = false; + try { + added = sessionRequests.offerFirst(request); + return added; + } finally { + writeLock.unlock(); + if (added) { + executorService.schedule(() -> { + LOG.log(Level.INFO, "Adding request back to the queue. All slots are busy. Request: {0}", + requestId); + bus.fire(new NewSessionRequestEvent(requestId)); + }, super.retryInterval.getSeconds(), TimeUnit.SECONDS); + } + } + } + + @Override + public Optional poll() { + Lock writeLock = lock.writeLock(); + writeLock.lock(); + try { + return Optional.ofNullable(sessionRequests.poll()); + } finally { + writeLock.unlock(); + } + } + + @Override + public int clear() { + Lock writeLock = lock.writeLock(); + writeLock.lock(); + try { + int count = 0; + LOG.info("Clearing new session request queue"); + for (HttpRequest request = sessionRequests.poll(); request != null; + request = sessionRequests.poll()) { + count++; + UUID reqId = UUID.fromString(request.getHeader(SESSIONREQUEST_ID_HEADER)); + NewSessionErrorResponse errorResponse = + new NewSessionErrorResponse(new RequestId(reqId), "New session request cancelled."); + + bus.fire(new NewSessionRejectedEvent(errorResponse)); + } + return count; + } finally { + writeLock.unlock(); + } + } + + public void callExecutorShutdown() { + LOG.info("Shutting down session queue executor service"); + executorService.shutdown(); + } +} + diff --git a/java/server/src/org/openqa/selenium/grid/sessionqueue/local/LocalNewSessionQueuer.java b/java/server/src/org/openqa/selenium/grid/sessionqueue/local/LocalNewSessionQueuer.java new file mode 100644 index 0000000000000..d0c36c07fb7a4 --- /dev/null +++ b/java/server/src/org/openqa/selenium/grid/sessionqueue/local/LocalNewSessionQueuer.java @@ -0,0 +1,88 @@ +// Licensed to the Software Freedom Conservancy (SFC) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The SFC licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.openqa.selenium.grid.sessionqueue.local; + +import org.openqa.selenium.events.EventBus; +import org.openqa.selenium.grid.config.Config; +import org.openqa.selenium.grid.data.RequestId; +import org.openqa.selenium.grid.log.LoggingOptions; +import org.openqa.selenium.grid.server.EventBusOptions; +import org.openqa.selenium.grid.sessionqueue.GetNewSessionResponse; +import org.openqa.selenium.grid.sessionqueue.NewSessionQueue; +import org.openqa.selenium.grid.sessionqueue.NewSessionQueuer; +import org.openqa.selenium.grid.sessionqueue.config.NewSessionQueueOptions; +import org.openqa.selenium.internal.Require; +import org.openqa.selenium.remote.http.HttpRequest; +import org.openqa.selenium.remote.http.HttpResponse; +import org.openqa.selenium.remote.tracing.Tracer; + +import java.time.Duration; +import java.util.Optional; +import java.util.logging.Logger; + +public class LocalNewSessionQueuer extends NewSessionQueuer { + + private static final Logger LOG = Logger.getLogger(LocalNewSessionQueuer.class.getName()); + private final EventBus bus; + public final NewSessionQueue sessionRequests; + + public LocalNewSessionQueuer(Tracer tracer, EventBus bus, + NewSessionQueue sessionRequests) { + super(tracer); + this.bus = Require.nonNull("Event bus", bus); + this.sessionRequests = Require.nonNull("New Session Request Queue", sessionRequests); + } + + public static NewSessionQueuer create(Config config) { + Tracer tracer = new LoggingOptions(config).getTracer(); + EventBus bus = new EventBusOptions(config).getEventBus(); + Duration retryInterval = new NewSessionQueueOptions(config).getSessionRequestRetryInterval(); + NewSessionQueue sessionRequests = new LocalNewSessionQueue(tracer, bus, retryInterval); + return new LocalNewSessionQueuer(tracer, bus, sessionRequests); + } + + @Override + public HttpResponse addToQueue(HttpRequest request) { + validateSessionRequest(request); + GetNewSessionResponse + getNewSessionResponse = new GetNewSessionResponse(tracer, bus, sessionRequests); + return getNewSessionResponse.add(request); + } + + @Override + public boolean retryAddToQueue(HttpRequest request, RequestId reqId) { + return sessionRequests.offerFirst(request, reqId); + } + + @Override + public Optional remove() { + return sessionRequests.poll(); + } + + @Override + public int clearQueue() { + return sessionRequests.clear(); + } + + @Override + public boolean isReady() { + return bus.isReady(); + } + +} + diff --git a/java/server/src/org/openqa/selenium/grid/sessionqueue/remote/BUILD.bazel b/java/server/src/org/openqa/selenium/grid/sessionqueue/remote/BUILD.bazel new file mode 100644 index 0000000000000..818508ac36974 --- /dev/null +++ b/java/server/src/org/openqa/selenium/grid/sessionqueue/remote/BUILD.bazel @@ -0,0 +1,22 @@ +load("@rules_jvm_external//:defs.bzl", "artifact") +java_library( + name = "remote", + srcs = glob(["*.java"]), + visibility = [ + "//java/server/src/org/openqa/selenium/grid:__subpackages__", + "//java/server/test/org/openqa/selenium/grid/sessionqueue:__pkg__", + "//java/server/test/org/openqa/selenium/grid/router:__pkg__", + ], + deps = [ + "//java/client/src/org/openqa/selenium/remote", + "//java/server/src/org/openqa/selenium/grid/config", + "//java/server/src/org/openqa/selenium/grid/data", + "//java/server/src/org/openqa/selenium/grid/log", + "//java/server/src/org/openqa/selenium/grid/server", + "//java/server/src/org/openqa/selenium/grid/sessionqueue", + "//java/server/src/org/openqa/selenium/grid/sessionqueue/config", + "//java/server/src/org/openqa/selenium/grid/web", + artifact("com.google.guava:guava"), + ], +) + diff --git a/java/server/src/org/openqa/selenium/grid/sessionqueue/remote/RemoteNewSessionQueuer.java b/java/server/src/org/openqa/selenium/grid/sessionqueue/remote/RemoteNewSessionQueuer.java new file mode 100644 index 0000000000000..293dda712b078 --- /dev/null +++ b/java/server/src/org/openqa/selenium/grid/sessionqueue/remote/RemoteNewSessionQueuer.java @@ -0,0 +1,126 @@ +// Licensed to the Software Freedom Conservancy (SFC) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The SFC licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.openqa.selenium.grid.sessionqueue.remote; + +import static java.net.HttpURLConnection.HTTP_OK; +import static org.openqa.selenium.grid.sessionqueue.NewSessionQueue.*; +import static org.openqa.selenium.remote.http.HttpMethod.DELETE; +import static org.openqa.selenium.remote.http.HttpMethod.GET; +import static org.openqa.selenium.remote.http.HttpMethod.POST; + +import org.openqa.selenium.grid.config.Config; +import org.openqa.selenium.grid.data.RequestId; +import org.openqa.selenium.grid.log.LoggingOptions; +import org.openqa.selenium.grid.server.NetworkOptions; +import org.openqa.selenium.grid.sessionqueue.NewSessionQueuer; +import org.openqa.selenium.grid.sessionqueue.config.NewSessionQueuerOptions; +import org.openqa.selenium.grid.web.Values; +import org.openqa.selenium.internal.Require; +import org.openqa.selenium.remote.http.HttpClient; +import org.openqa.selenium.remote.http.HttpRequest; +import org.openqa.selenium.remote.http.HttpResponse; +import org.openqa.selenium.remote.tracing.HttpTracing; +import org.openqa.selenium.remote.tracing.Tracer; + +import java.io.UncheckedIOException; +import java.net.MalformedURLException; +import java.net.URI; +import java.util.Optional; +import java.util.UUID; +import java.util.logging.Logger; + +public class RemoteNewSessionQueuer extends NewSessionQueuer { + + private static final Logger LOG = Logger.getLogger(RemoteNewSessionQueuer.class.getName()); + private final HttpClient client; + private static final String timestampHeader= SESSIONREQUEST_TIMESTAMP_HEADER; + private static final String reqIdHeader= SESSIONREQUEST_ID_HEADER; + + public RemoteNewSessionQueuer(Tracer tracer, HttpClient client) { + super(tracer); + this.client = Require.nonNull("HTTP client", client); + } + + public static NewSessionQueuer create(Config config) { + Tracer tracer = new LoggingOptions(config).getTracer(); + URI uri = new NewSessionQueuerOptions(config).getSessionQueuerUri(); + HttpClient.Factory clientFactory = new NetworkOptions(config).getHttpClientFactory(tracer); + + try { + return new RemoteNewSessionQueuer(tracer, clientFactory.createClient(uri.toURL())); + } catch (MalformedURLException e) { + throw new UncheckedIOException(e); + } + } + + @Override + public HttpResponse addToQueue(HttpRequest request) { + HttpRequest upstream = new HttpRequest(POST, "/se/grid/newsessionqueuer/session"); + HttpTracing.inject(tracer, tracer.getCurrentContext(), upstream); + upstream.setContent(request.getContent()); + return client.execute(upstream); + } + + @Override + public boolean retryAddToQueue(HttpRequest request, RequestId reqId) { + HttpRequest upstream = + new HttpRequest(POST, "/se/grid/newsessionqueuer/session/retry/" + reqId.toString()); + HttpTracing.inject(tracer, tracer.getCurrentContext(), upstream); + upstream.setContent(request.getContent()); + upstream.setHeader(timestampHeader, request.getHeader(timestampHeader)); + upstream.setHeader(reqIdHeader, reqId.toString()); + HttpResponse response = client.execute(upstream); + return Values.get(response, Boolean.class); + } + + @Override + public Optional remove() { + HttpRequest upstream = new HttpRequest(GET, "/se/grid/newsessionqueuer/session"); + HttpTracing.inject(tracer, tracer.getCurrentContext(), upstream); + HttpResponse response = client.execute(upstream); + + if(response.getStatus()==HTTP_OK) { + HttpRequest httpRequest = new HttpRequest(POST, "/session"); + httpRequest.setContent(response.getContent()); + httpRequest.setHeader(timestampHeader, response.getHeader(timestampHeader)); + httpRequest.setHeader(reqIdHeader, response.getHeader(reqIdHeader)); + return Optional.ofNullable(httpRequest); + } + + return Optional.empty(); + } + + @Override + public int clearQueue() { + HttpRequest upstream = new HttpRequest(DELETE, "/se/grid/newsessionqueuer/queue"); + HttpTracing.inject(tracer, tracer.getCurrentContext(), upstream); + HttpResponse response = client.execute(upstream); + + return Values.get(response, Integer.class); + } + + @Override + public boolean isReady() { + try { + return client.execute(new HttpRequest(GET, "/readyz")).isSuccessful(); + } catch (RuntimeException e) { + return false; + } + } + +} diff --git a/java/server/test/org/openqa/selenium/grid/sessionqueue/BUILD.bazel b/java/server/test/org/openqa/selenium/grid/sessionqueue/BUILD.bazel new file mode 100644 index 0000000000000..8da6d23833bbe --- /dev/null +++ b/java/server/test/org/openqa/selenium/grid/sessionqueue/BUILD.bazel @@ -0,0 +1,25 @@ +load("@rules_jvm_external//:defs.bzl", "artifact") +load("//java:defs.bzl", "java_test_suite") + +java_test_suite( + name = "SmallTests", + size = "small", + srcs = glob(["*Test.java"]), + deps = [ + "//java/client/src/org/openqa/selenium/remote", + "//java/client/src/org/openqa/selenium/support", + "//java/client/test/org/openqa/selenium/remote/tracing:tracing-support", + "//java/server/src/org/openqa/selenium/events", + "//java/server/src/org/openqa/selenium/events/local", + "//java/server/src/org/openqa/selenium/grid/data", + "//java/server/src/org/openqa/selenium/grid/sessionqueue", + "//java/server/src/org/openqa/selenium/grid/sessionqueue/local", + "//java/server/src/org/openqa/selenium/grid/sessionqueue/remote", + "//java/server/test/org/openqa/selenium/grid/testing", + "//java/client/src/org/openqa/selenium/json", + artifact("junit:junit"), + artifact("io.opentelemetry:opentelemetry-api"), + artifact("org.assertj:assertj-core"), + artifact("com.google.guava:guava"), + ], +) diff --git a/java/server/test/org/openqa/selenium/grid/sessionqueue/NewSessionQueuerTest.java b/java/server/test/org/openqa/selenium/grid/sessionqueue/NewSessionQueuerTest.java new file mode 100644 index 0000000000000..2af85cf94d261 --- /dev/null +++ b/java/server/test/org/openqa/selenium/grid/sessionqueue/NewSessionQueuerTest.java @@ -0,0 +1,430 @@ +// Licensed to the Software Freedom Conservancy (SFC) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The SFC licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.openqa.selenium.grid.sessionqueue; + +import static java.net.HttpURLConnection.HTTP_INTERNAL_ERROR; +import static java.net.HttpURLConnection.HTTP_OK; +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.assertj.core.api.Assertions.fail; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertTrue; +import static org.openqa.selenium.grid.data.NewSessionRejectedEvent.NEW_SESSION_REJECTED; +import static org.openqa.selenium.grid.data.NewSessionRequestEvent.NEW_SESSION_REQUEST; +import static org.openqa.selenium.remote.http.Contents.utf8String; +import static org.openqa.selenium.remote.http.HttpMethod.POST; + +import com.google.common.collect.ImmutableMap; + +import org.junit.Before; +import org.junit.Test; +import org.openqa.selenium.ImmutableCapabilities; +import org.openqa.selenium.events.EventBus; +import org.openqa.selenium.events.local.GuavaEventBus; +import org.openqa.selenium.grid.data.CreateSessionResponse; +import org.openqa.selenium.grid.data.NewSessionErrorResponse; +import org.openqa.selenium.grid.data.NewSessionRejectedEvent; +import org.openqa.selenium.grid.data.NewSessionResponse; +import org.openqa.selenium.grid.data.NewSessionResponseEvent; +import org.openqa.selenium.grid.data.RequestId; +import org.openqa.selenium.grid.data.Session; +import org.openqa.selenium.grid.sessionqueue.local.LocalNewSessionQueue; +import org.openqa.selenium.grid.sessionqueue.local.LocalNewSessionQueuer; +import org.openqa.selenium.grid.sessionqueue.remote.RemoteNewSessionQueuer; +import org.openqa.selenium.grid.testing.PassthroughHttpClient; +import org.openqa.selenium.json.Json; +import org.openqa.selenium.remote.NewSessionPayload; +import org.openqa.selenium.remote.SessionId; +import org.openqa.selenium.remote.http.Contents; +import org.openqa.selenium.remote.http.HttpClient; +import org.openqa.selenium.remote.http.HttpMethod; +import org.openqa.selenium.remote.http.HttpRequest; +import org.openqa.selenium.remote.http.HttpResponse; +import org.openqa.selenium.remote.tracing.DefaultTestTracer; +import org.openqa.selenium.remote.tracing.Tracer; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.time.Duration; +import java.time.Instant; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +public class NewSessionQueuerTest { + + private LocalNewSessionQueuer local; + private RemoteNewSessionQueuer remote; + private EventBus bus; + private ImmutableCapabilities caps; + private NewSessionPayload payload; + private HttpRequest request; + private static int count = 0; + private static final Json JSON = new Json(); + private static int sessionTimeout = 5; + private NewSessionQueue sessionQueue; + + + @Before + public void setUp() { + Tracer tracer = DefaultTestTracer.createTracer(); + caps = new ImmutableCapabilities("browserName", "chrome"); + bus = new GuavaEventBus(); + + sessionQueue = new LocalNewSessionQueue(tracer, bus, Duration.ofSeconds(1)); + local = new LocalNewSessionQueuer(tracer, bus, sessionQueue); + + HttpClient client = new PassthroughHttpClient(local); + remote = new RemoteNewSessionQueuer(tracer, client); + + payload = NewSessionPayload.create(caps); + request = createRequest(payload, POST, "/session"); + } + + @Test + public void shouldBeAbleToAddToQueueAndGetValidResponse() { + + bus.addListener(NEW_SESSION_REQUEST, event -> { + Optional sessionRequest = this.local.remove(); + assertTrue(sessionRequest.isPresent()); + RequestId reqId = event.getData(RequestId.class); + ImmutableCapabilities capabilities = new ImmutableCapabilities("browserName", "chrome"); + try { + SessionId sessionId = new SessionId("123"); + Session session = new Session(sessionId, new URI("http://example.com"), capabilities); + CreateSessionResponse sessionResponse = new CreateSessionResponse(session, + JSON.toJson( + ImmutableMap.of( + "value", + ImmutableMap.of( + "sessionId", + sessionId, + "capabilities", + capabilities))) + .getBytes(UTF_8)); + NewSessionResponse newSessionResponse = + new NewSessionResponse(reqId, sessionResponse.getSession(), + sessionResponse.getDownstreamEncodedResponse()); + bus.fire(new NewSessionResponseEvent(newSessionResponse)); + } catch (URISyntaxException e) { + bus.fire( + new NewSessionRejectedEvent( + new NewSessionErrorResponse(new RequestId(UUID.randomUUID()), "Error"))); + } + }); + + HttpResponse httpResponse = local.addToQueue(request); + + assertEquals(httpResponse.getStatus(), HTTP_OK); + } + + @Test + public void shouldBeAbleToAddToQueueAndGetErrorResponse() { + + bus.addListener(NEW_SESSION_REQUEST, event -> { + Optional sessionRequest = this.local.remove(); + assertTrue(sessionRequest.isPresent()); + RequestId reqId = event.getData(RequestId.class); + bus.fire( + new NewSessionRejectedEvent( + new NewSessionErrorResponse(reqId, "Error"))); + + }); + + HttpResponse httpResponse = local.addToQueue(request); + + assertEquals(httpResponse.getStatus(), HTTP_INTERNAL_ERROR); + } + + @Test + public void shouldBeAbleToAddToQueueRemotelyAndGetErrorResponse() { + + bus.addListener(NEW_SESSION_REQUEST, event -> { + Optional sessionRequest = this.remote.remove(); + assertTrue(sessionRequest.isPresent()); + RequestId reqId = event.getData(RequestId.class); + bus.fire( + new NewSessionRejectedEvent( + new NewSessionErrorResponse(reqId, "Could not poll the queue"))); + + }); + + HttpResponse httpResponse = remote.addToQueue(request); + + assertEquals(httpResponse.getStatus(), HTTP_INTERNAL_ERROR); + } + + + @Test + public void shouldBeAbleToRemoveFromQueue() { + Optional httpRequest = local.remove(); + + assertFalse(httpRequest.isPresent()); + } + + @Test + public void shouldBeClearQueue() { + + RequestId requestId = new RequestId(UUID.randomUUID()); + sessionQueue.offerLast(request, requestId); + + int count = local.clearQueue(); + + assertEquals(count, 1); + assertFalse(local.remove().isPresent()); + } + + @Test + public void shouldBeClearQueueRemotely() { + + RequestId requestId = new RequestId(UUID.randomUUID()); + sessionQueue.offerLast(request, requestId); + + int count = remote.clearQueue(); + + assertEquals(count, 1); + assertFalse(remote.remove().isPresent()); + } + + @Test + public void shouldBeClearQueueAndFireRejectedEvent() { + + RequestId requestId = new RequestId(UUID.randomUUID()); + bus.addListener(NEW_SESSION_REJECTED, event -> { + assertEquals(event.getData(UUID.class), requestId); + }); + + sessionQueue.offerLast(request, requestId); + + int count = remote.clearQueue(); + + assertEquals(count, 1); + assertFalse(remote.remove().isPresent()); + } + + @Test + public void shouldBeAbleToRemoveFromQueueRemotely() { + Optional httpRequest = remote.remove(); + + assertFalse(httpRequest.isPresent()); + } + + @Test + public void shouldBeAbleToAddAgainToQueue() { + boolean added = local.retryAddToQueue(request, new RequestId(UUID.randomUUID())); + assertTrue(added); + } + + @Test + public void shouldBeAbleToAddAgainToQueueRemotely() { + HttpRequest request = createRequest(payload, POST, "/se/grid/newsessionqueuer/session"); + boolean added = remote.retryAddToQueue(request, new RequestId(UUID.randomUUID())); + + assertTrue(added); + } + + @Test + public void shouldBeAbleToRetryRequest() { + + bus.addListener(NEW_SESSION_REQUEST, event -> { + // Keep a count of event fired + count++; + Optional sessionRequest = this.remote.remove(); + assertTrue(sessionRequest.isPresent()); + RequestId reqId = event.getData(RequestId.class); + + if (count == 1) { + assertTrue(remote.retryAddToQueue(sessionRequest.get(), reqId)); + } + + // Only if it was retried after an interval, the count is 2 + if (count == 2) { + ImmutableCapabilities capabilities = new ImmutableCapabilities("browserName", "chrome"); + try { + SessionId sessionId = new SessionId("123"); + Session session = new Session(sessionId, new URI("http://example.com"), capabilities); + CreateSessionResponse sessionResponse = new CreateSessionResponse(session, + JSON.toJson( + ImmutableMap.of( + "value", + ImmutableMap.of( + "sessionId", + sessionId, + "capabilities", + capabilities))) + .getBytes(UTF_8)); + NewSessionResponse newSessionResponse = + new NewSessionResponse(reqId, sessionResponse.getSession(), + sessionResponse.getDownstreamEncodedResponse()); + bus.fire(new NewSessionResponseEvent(newSessionResponse)); + } catch (URISyntaxException e) { + bus.fire( + new NewSessionRejectedEvent( + new NewSessionErrorResponse(new RequestId(UUID.randomUUID()), "Error"))); + } + } + }); + + HttpResponse httpResponse = remote.addToQueue(request); + + assertEquals(httpResponse.getStatus(), HTTP_OK); + } + + @Test + public void shouldBeAbleToHandleMultipleSessionRequestsAtTheSameTime() { + + bus.addListener(NEW_SESSION_REQUEST, event -> { + Optional sessionRequest = this.local.remove(); + assertTrue(sessionRequest.isPresent()); + RequestId reqId = event.getData(RequestId.class); + ImmutableCapabilities capabilities = new ImmutableCapabilities("browserName", "chrome"); + try { + SessionId sessionId = new SessionId(UUID.randomUUID()); + Session session = new Session(sessionId, new URI("http://example.com"), capabilities); + CreateSessionResponse sessionResponse = new CreateSessionResponse(session, + JSON.toJson( + ImmutableMap.of( + "value", + ImmutableMap.of( + "sessionId", + sessionId, + "capabilities", + capabilities))) + .getBytes(UTF_8)); + NewSessionResponse newSessionResponse = + new NewSessionResponse(reqId, sessionResponse.getSession(), + sessionResponse.getDownstreamEncodedResponse()); + bus.fire(new NewSessionResponseEvent(newSessionResponse)); + } catch (URISyntaxException e) { + bus.fire( + new NewSessionRejectedEvent( + new NewSessionErrorResponse(new RequestId(UUID.randomUUID()), "Error"))); + } + }); + + ExecutorService executor = Executors.newFixedThreadPool(2); + + Callable callable = () -> remote.addToQueue(request); + + Future firstRequest = executor.submit(callable); + Future secondRequest = executor.submit(callable); + + try { + HttpResponse firstResponse = firstRequest.get(30, TimeUnit.SECONDS); + HttpResponse secondResponse = secondRequest.get(30, TimeUnit.SECONDS); + + String firstResponseContents = Contents.string(firstResponse); + String secondResponseContents = Contents.string(secondResponse); + + assertEquals(firstResponse.getStatus(), HTTP_OK); + assertEquals(secondResponse.getStatus(), HTTP_OK); + + assertNotEquals(firstResponseContents, secondResponseContents); + } catch (InterruptedException | ExecutionException | TimeoutException e) { + fail("Could not create session"); + } + + executor.shutdown(); + } + + @Test + public void shouldBeAbleToTimeoutARequest() { + + bus.addListener(NEW_SESSION_REQUEST, event -> { + Optional sessionRequest = this.remote.remove(); + assertTrue(sessionRequest.isPresent()); + RequestId reqId = event.getData(RequestId.class); + + // Ensures that timestamp header is present + if (hasRequestTimedOut(sessionRequest.get())) { + // Reject the request once timeout occurs. + bus.fire( + new NewSessionRejectedEvent(new NewSessionErrorResponse(reqId, "Error"))); + } else { + // Keep adding to front of queue till the request times out + assertTrue(remote.retryAddToQueue(sessionRequest.get(), reqId)); + } + }); + + HttpResponse httpResponse = remote.addToQueue(request); + + assertEquals(httpResponse.getStatus(), HTTP_INTERNAL_ERROR); + } + + @Test + public void shouldBeAbleToClearQueueAndRejectMultipleRequests() { + + ExecutorService executor = Executors.newFixedThreadPool(2); + + Callable callable = () -> remote.addToQueue(request); + + Future firstRequest = executor.submit(callable); + Future secondRequest = executor.submit(callable); + + int count = 0; + + while (count < 2) { + count += remote.clearQueue(); + } + + try { + HttpResponse firstResponse = firstRequest.get(30, TimeUnit.SECONDS); + HttpResponse secondResponse = secondRequest.get(30, TimeUnit.SECONDS); + + assertEquals(firstResponse.getStatus(), HTTP_INTERNAL_ERROR); + assertEquals(secondResponse.getStatus(), HTTP_INTERNAL_ERROR); + + } catch (InterruptedException | ExecutionException | TimeoutException e) { + fail("Could not create session"); + } + + executor.shutdown(); + } + + private HttpRequest createRequest(NewSessionPayload payload, HttpMethod httpMethod, String uri) { + StringBuilder builder = new StringBuilder(); + try { + payload.writeTo(builder); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + + HttpRequest request = new HttpRequest(httpMethod, uri); + request.setContent(utf8String(builder.toString())); + + return request; + } + + private boolean hasRequestTimedOut(HttpRequest request) { + String enqueTimestampStr = request.getHeader(NewSessionQueue.SESSIONREQUEST_TIMESTAMP_HEADER); + Instant enque = Instant.ofEpochSecond(Long.parseLong(enqueTimestampStr)); + Instant deque = Instant.now(); + Duration duration = Duration.between(enque, deque); + + return duration.getSeconds() > sessionTimeout; + } +} diff --git a/java/server/test/org/openqa/selenium/grid/sessionqueue/local/BUILD.bazel b/java/server/test/org/openqa/selenium/grid/sessionqueue/local/BUILD.bazel new file mode 100644 index 0000000000000..81b20fabc0a87 --- /dev/null +++ b/java/server/test/org/openqa/selenium/grid/sessionqueue/local/BUILD.bazel @@ -0,0 +1,22 @@ +load("@rules_jvm_external//:defs.bzl", "artifact") +load("//java:defs.bzl", "java_test_suite") + +java_test_suite( + name = "SmallTests", + size = "small", + srcs = glob(["*.java"]), + deps = [ + "//java/client/src/org/openqa/selenium/remote", + "//java/client/test/org/openqa/selenium/remote/tracing:tracing-support", + "//java/server/src/org/openqa/selenium/events", + "//java/server/src/org/openqa/selenium/events/local", + "//java/server/src/org/openqa/selenium/grid/data", + "//java/server/src/org/openqa/selenium/grid/sessionqueue", + "//java/server/src/org/openqa/selenium/grid/sessionqueue/local", + "//java/server/test/org/openqa/selenium/grid/testing", + artifact("com.google.guava:guava"), + artifact("io.opentelemetry:opentelemetry-api"), + artifact("junit:junit"), + artifact("org.assertj:assertj-core"), + ], +) diff --git a/java/server/test/org/openqa/selenium/grid/sessionqueue/local/LocalNewSessionQueueTest.java b/java/server/test/org/openqa/selenium/grid/sessionqueue/local/LocalNewSessionQueueTest.java new file mode 100644 index 0000000000000..f6a31e105f291 --- /dev/null +++ b/java/server/test/org/openqa/selenium/grid/sessionqueue/local/LocalNewSessionQueueTest.java @@ -0,0 +1,169 @@ +// Licensed to the Software Freedom Conservancy (SFC) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The SFC licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.openqa.selenium.grid.sessionqueue.local; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.openqa.selenium.grid.data.NewSessionRequestEvent.NEW_SESSION_REQUEST; +import static org.openqa.selenium.remote.http.Contents.utf8String; +import static org.openqa.selenium.remote.http.HttpMethod.POST; + +import org.junit.Before; +import org.junit.Test; +import org.openqa.selenium.ImmutableCapabilities; +import org.openqa.selenium.events.EventBus; +import org.openqa.selenium.events.local.GuavaEventBus; +import org.openqa.selenium.grid.data.RequestId; +import org.openqa.selenium.grid.sessionqueue.NewSessionQueue; +import org.openqa.selenium.remote.NewSessionPayload; +import org.openqa.selenium.remote.http.HttpMethod; +import org.openqa.selenium.remote.http.HttpRequest; +import org.openqa.selenium.remote.tracing.DefaultTestTracer; +import org.openqa.selenium.remote.tracing.Tracer; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.time.Duration; +import java.util.Optional; +import java.util.UUID; + +public class LocalNewSessionQueueTest { + + private EventBus bus; + private ImmutableCapabilities caps; + private NewSessionQueue sessionQueue; + private HttpRequest expectedSessionRequest; + private RequestId requestId; + + @Before + public void setUp() { + Tracer tracer = DefaultTestTracer.createTracer(); + caps = new ImmutableCapabilities("browserName", "chrome"); + bus = new GuavaEventBus(); + requestId = new RequestId(UUID.randomUUID()); + sessionQueue = new LocalNewSessionQueue(tracer, bus, Duration.ofSeconds(1)); + + NewSessionPayload payload = NewSessionPayload.create(caps); + expectedSessionRequest = createRequest(payload, POST, "/session"); + } + + @Test + public void shouldBeAbleToAddToEndOfQueue() { + boolean added = sessionQueue.offerLast(expectedSessionRequest, requestId); + assertTrue(added); + + bus.addListener(NEW_SESSION_REQUEST, event -> { + assertEquals(requestId, event.getData(UUID.class)); + }); + } + + @Test + public void shouldBeAbleToRemoveFromFrontOfQueue() { + boolean added = sessionQueue.offerLast(expectedSessionRequest, requestId); + assertTrue(added); + + Optional receivedRequest = sessionQueue.poll(); + + assertTrue(receivedRequest.isPresent()); + assertEquals(expectedSessionRequest, receivedRequest.get()); + } + + @Test + public void shouldAddTimestampHeader() { + boolean added = sessionQueue.offerLast(expectedSessionRequest, requestId); + assertTrue(added); + + Optional receivedRequest = sessionQueue.poll(); + + assertTrue(receivedRequest.isPresent()); + HttpRequest request = receivedRequest.get(); + assertEquals(expectedSessionRequest, request); + assertTrue(request.getHeader(NewSessionQueue.SESSIONREQUEST_TIMESTAMP_HEADER) != null); + } + + @Test + public void shouldAddRequestIdHeader() { + boolean added = sessionQueue.offerLast(expectedSessionRequest, requestId); + assertTrue(added); + + Optional receivedRequest = sessionQueue.poll(); + + assertTrue(receivedRequest.isPresent()); + HttpRequest request = receivedRequest.get(); + assertEquals(expectedSessionRequest, request); + String polledRequestId = request.getHeader(NewSessionQueue.SESSIONREQUEST_ID_HEADER); + assertTrue(polledRequestId != null); + assertEquals(requestId, new RequestId(UUID.fromString(polledRequestId))); + } + + @Test + public void shouldBeAbleToAddToFrontOfQueue() { + ImmutableCapabilities chromeCaps = new ImmutableCapabilities("browserName", "chrome"); + NewSessionPayload chromePayload = NewSessionPayload.create(chromeCaps); + HttpRequest chromeRequest = createRequest(chromePayload, POST, "/session"); + RequestId chromeRequestId = new RequestId(UUID.randomUUID()); + + ImmutableCapabilities firefoxCaps = new ImmutableCapabilities("browserName", "firefox"); + NewSessionPayload firefoxpayload = NewSessionPayload.create(firefoxCaps); + HttpRequest firefoxRequest = createRequest(firefoxpayload, POST, "/session"); + RequestId firefoxRequestId = new RequestId(UUID.randomUUID()); + + boolean addedChromeRequest = sessionQueue.offerFirst(chromeRequest, chromeRequestId); + assertTrue(addedChromeRequest); + + boolean addFirefoxRequest = sessionQueue.offerFirst(firefoxRequest, firefoxRequestId); + assertTrue(addFirefoxRequest); + + Optional polledFirefoxRequest = sessionQueue.poll(); + assertTrue(polledFirefoxRequest.isPresent()); + assertEquals(firefoxRequest, polledFirefoxRequest.get()); + + Optional polledChromeRequest = sessionQueue.poll(); + assertTrue(polledChromeRequest.isPresent()); + assertEquals(chromeRequest, polledChromeRequest.get()); + } + + @Test + public void shouldBeClearAPopulatedQueue() { + sessionQueue.offerLast(expectedSessionRequest, requestId); + sessionQueue.offerLast(expectedSessionRequest, requestId); + + int count = sessionQueue.clear(); + assertEquals(count, 2); + } + + @Test + public void shouldBeClearAEmptyQueue() { + int count = sessionQueue.clear(); + assertEquals(count, 0); + } + + private HttpRequest createRequest(NewSessionPayload payload, HttpMethod httpMethod, String uri) { + StringBuilder builder = new StringBuilder(); + try { + payload.writeTo(builder); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + + HttpRequest request = new HttpRequest(httpMethod, uri); + request.setContent(utf8String(builder.toString())); + + return request; + } +}