Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[grid] Fix flaky Distributor and GraphqlHandlerTest. Add queuer config to DistributedCdpTest #8859

Merged
merged 3 commits into from
Nov 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ public class DistributorTest {
private Capabilities stereotype;
private Capabilities caps;
private final Secret registrationSecret = new Secret("hellim");
private final Wait<Object> wait = new FluentWait<>(new Object()).withTimeout(Duration.ofSeconds(5));

@Before
public void setUp() {
Expand Down Expand Up @@ -309,6 +310,8 @@ public void testDrainedNodeShutsDownOnceEmpty() throws URISyntaxException, Inter
queuer,
registrationSecret);
distributor.add(node);
wait.until(obj -> distributor.getStatus().hasCapacity());

distributor.drain(node.getId());

latch.await(5, TimeUnit.SECONDS);
Expand Down Expand Up @@ -352,6 +355,8 @@ public void drainedNodeDoesNotShutDownIfNotEmpty()
registrationSecret);
distributor.add(node);

wait.until(obj -> distributor.getStatus().hasCapacity());

NewSessionPayload payload = NewSessionPayload.create(caps);
distributor.newSession(createRequest(payload));

Expand Down Expand Up @@ -394,6 +399,8 @@ public void drainedNodeShutsDownAfterSessionsFinish()
registrationSecret);
distributor.add(node);

wait.until(obj -> distributor.getStatus().hasCapacity());

NewSessionPayload payload = NewSessionPayload.create(caps);
CreateSessionResponse firstResponse = distributor.newSession(createRequest(payload));
CreateSessionResponse secondResponse = distributor.newSession(createRequest(payload));
Expand Down Expand Up @@ -504,6 +511,8 @@ public void theMostLightlyLoadedNodeIsSelectedFirst() {
.add(lightest)
.add(massive);

wait.until(obj -> distributor.getStatus().hasCapacity());

try (NewSessionPayload payload = NewSessionPayload.create(caps)) {
Session session = distributor.newSession(createRequest(payload)).getSession();

Expand Down Expand Up @@ -633,7 +642,9 @@ public void shouldIncludeHostsThatAreUpInHostList() {
}

@Test
public void shouldNotScheduleAJobIfAllSlotsAreBeingUsed() {
public void shouldNotScheduleAJobIfAllSlotsAreBeingUsed() throws URISyntaxException {
URI nodeUri = new URI("http://example:5678");
URI routableUri = new URI("http://localhost:1234");
SessionMap sessions = new LocalSessionMap(tracer, bus);
LocalNewSessionQueue localNewSessionQueue = new LocalNewSessionQueue(
tracer,
Expand All @@ -642,19 +653,20 @@ public void shouldNotScheduleAJobIfAllSlotsAreBeingUsed() {
Duration.ofSeconds(2));
LocalNewSessionQueuer queuer = new LocalNewSessionQueuer(tracer, bus, localNewSessionQueue);

CombinedHandler handler = new CombinedHandler();
LocalNode node = LocalNode.builder(tracer, bus, routableUri, routableUri, registrationSecret)
.add(caps, new TestSessionFactory((id, c) -> new Session(
id, nodeUri, stereotype, c, Instant.now())))
.build();
Distributor distributor = new LocalDistributor(
tracer,
bus,
new PassthroughHttpClient.Factory(handler),
new PassthroughHttpClient.Factory(node),
sessions,
queuer,
registrationSecret);
handler.addHandler(distributor);

Node node = createNode(caps, 1, 0);
handler.addHandler(node);
distributor.add(node);
wait.until(obj -> distributor.getStatus().hasCapacity());

// Use up the one slot available
try (NewSessionPayload payload = NewSessionPayload.create(caps)) {
Expand All @@ -669,7 +681,9 @@ public void shouldNotScheduleAJobIfAllSlotsAreBeingUsed() {
}

@Test
public void shouldReleaseSlotOnceSessionEnds() {
public void shouldReleaseSlotOnceSessionEnds() throws URISyntaxException {
URI nodeUri = new URI("http://example:5678");
URI routableUri = new URI("http://localhost:1234");
SessionMap sessions = new LocalSessionMap(tracer, bus);
LocalNewSessionQueue localNewSessionQueue = new LocalNewSessionQueue(
tracer,
Expand All @@ -678,20 +692,22 @@ public void shouldReleaseSlotOnceSessionEnds() {
Duration.ofSeconds(2));
LocalNewSessionQueuer queuer = new LocalNewSessionQueuer(tracer, bus, localNewSessionQueue);

CombinedHandler handler = new CombinedHandler();
LocalNode node = LocalNode.builder(tracer, bus, routableUri, routableUri, registrationSecret)
.add(caps, new TestSessionFactory((id, c) -> new Session(
id, nodeUri, stereotype, c, Instant.now())))
.build();

Distributor distributor = new LocalDistributor(
tracer,
bus,
new PassthroughHttpClient.Factory(handler),
new PassthroughHttpClient.Factory(node),
sessions,
queuer,
registrationSecret);
handler.addHandler(distributor);

Node node = createNode(caps, 1, 0);
handler.addHandler(node);
distributor.add(node);

wait.until(obj -> distributor.getStatus().hasCapacity());

// Use up the one slot available
Session session;
try (NewSessionPayload payload = NewSessionPayload.create(caps)) {
Expand All @@ -702,9 +718,7 @@ public void shouldReleaseSlotOnceSessionEnds() {
sessions.get(session.getId());

node.stop(session.getId());

// Now wait for the session map to say the session is gone.
Wait<Object> wait = new FluentWait<>(new Object()).withTimeout(Duration.ofSeconds(2));
wait.until(obj -> {
try {
sessions.get(session.getId());
Expand Down Expand Up @@ -756,8 +770,9 @@ public void shouldNotStartASessionIfTheCapabilitiesAreNotSupported() {
}

@Test
public void attemptingToStartASessionWhichFailsMarksAsTheSlotAsAvailable() {
CombinedHandler handler = new CombinedHandler();
public void attemptingToStartASessionWhichFailsMarksAsTheSlotAsAvailable() throws URISyntaxException {
URI nodeUri = new URI("http://example:5678");
URI routableUri = new URI("http://localhost:1234");

SessionMap sessions = new LocalSessionMap(tracer, bus);
LocalNewSessionQueue localNewSessionQueue = new LocalNewSessionQueue(
Expand All @@ -766,26 +781,24 @@ public void attemptingToStartASessionWhichFailsMarksAsTheSlotAsAvailable() {
Duration.ofSeconds(2),
Duration.ofSeconds(2));
LocalNewSessionQueuer queuer = new LocalNewSessionQueuer(tracer, bus, localNewSessionQueue);
handler.addHandler(sessions);

URI uri = createUri();
Node node = LocalNode.builder(tracer, bus, uri, uri, registrationSecret)
.add(caps, new TestSessionFactory((id, caps) -> {
throw new SessionNotCreatedException("OMG");
}))
.build();
handler.addHandler(node);
LocalNode node = LocalNode.builder(tracer, bus, routableUri, routableUri, registrationSecret)
.add(caps, new TestSessionFactory((id, caps) -> {
throw new SessionNotCreatedException("OMG");
}))
.build();

Distributor distributor = new LocalDistributor(
tracer,
bus,
new PassthroughHttpClient.Factory(handler),
new PassthroughHttpClient.Factory(node),
sessions,
queuer,
registrationSecret);
handler.addHandler(distributor);
distributor.add(node);

wait.until(obj -> distributor.getStatus().hasCapacity());

try (NewSessionPayload payload = NewSessionPayload.create(caps)) {
assertThatExceptionOfType(SessionNotCreatedException.class)
.isThrownBy(() -> distributor.newSession(createRequest(payload)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ java_test_suite(
deps = [
"//java/client/src/org/openqa/selenium/json",
"//java/client/src/org/openqa/selenium/remote",
"//java/client/src/org/openqa/selenium/support",
"//java/client/test/org/openqa/selenium/remote/tracing:tracing-support",
"//java/server/src/org/openqa/selenium/events",
"//java/server/src/org/openqa/selenium/events/local",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@
import org.openqa.selenium.remote.http.HttpResponse;
import org.openqa.selenium.remote.tracing.DefaultTestTracer;
import org.openqa.selenium.remote.tracing.Tracer;
import org.openqa.selenium.support.ui.FluentWait;
import org.openqa.selenium.support.ui.Wait;

import java.io.IOException;
import java.io.UncheckedIOException;
Expand Down Expand Up @@ -77,6 +79,7 @@ public class GraphqlHandlerTest {
private ImmutableCapabilities caps;
private ImmutableCapabilities stereotype;
private NewSessionPayload payload;
private final Wait<Object> wait = new FluentWait<>(new Object()).withTimeout(Duration.ofSeconds(5));

public GraphqlHandlerTest() throws URISyntaxException {
}
Expand Down Expand Up @@ -146,6 +149,7 @@ public boolean test(Capabilities capabilities) {
})
.build();
distributor.add(node);
wait.until(obj -> distributor.getStatus().hasCapacity());

GraphqlHandler handler = new GraphqlHandler(distributor, publicUri);
Map<String, Object> topLevel = executeQuery(handler, "{ grid { nodes { uri } } }");
Expand All @@ -169,6 +173,8 @@ public void shouldBeAbleToGetSessionInfo() throws URISyntaxException {
Instant.now()))).build();

distributor.add(node);
wait.until(obj -> distributor.getStatus().hasCapacity());

Session session = distributor.newSession(createRequest(payload)).getSession();

assertThat(session).isNotNull();
Expand Down Expand Up @@ -215,6 +221,8 @@ public void shouldBeAbleToGetNodeInfoForSession() throws URISyntaxException {
Instant.now()))).build();

distributor.add(node);
wait.until(obj -> distributor.getStatus().hasCapacity());

Session session = distributor.newSession(createRequest(payload)).getSession();

assertThat(session).isNotNull();
Expand Down Expand Up @@ -260,6 +268,8 @@ public void shouldBeAbleToGetSlotInfoForSession() throws URISyntaxException {
Instant.now()))).build();

distributor.add(node);
wait.until(obj -> distributor.getStatus().hasCapacity());

Session session = distributor.newSession(createRequest(payload)).getSession();

assertThat(session).isNotNull();
Expand Down Expand Up @@ -311,6 +321,8 @@ public void shouldBeAbleToGetSessionDuration() throws URISyntaxException {
Instant.now()))).build();

distributor.add(node);
wait.until(obj -> distributor.getStatus().hasCapacity());

Session session = distributor.newSession(createRequest(payload)).getSession();

assertThat(session).isNotNull();
Expand Down Expand Up @@ -341,6 +353,7 @@ public void shouldThrowExceptionWhenSessionNotFound() throws URISyntaxException
Instant.now()))).build();

distributor.add(node);
wait.until(obj -> distributor.getStatus().hasCapacity());

String randomSessionId = UUID.randomUUID().toString();
String query = "{ session (id: \"" + randomSessionId + "\") { sessionDurationMillis } }";
Expand Down Expand Up @@ -376,6 +389,7 @@ public void shouldThrowExceptionWhenSessionIsEmpty() throws URISyntaxException {
Instant.now()))).build();

distributor.add(node);
wait.until(obj -> distributor.getStatus().hasCapacity());

String query = "{ session (id: \"\") { sessionDurationMillis } }";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.openqa.selenium.grid.server.BaseServerOptions;
import org.openqa.selenium.grid.server.Server;
import org.openqa.selenium.grid.sessionmap.httpd.SessionMapServer;
import org.openqa.selenium.grid.sessionqueue.httpd.NewSessionQueuerServer;
import org.openqa.selenium.grid.web.Values;
import org.openqa.selenium.net.PortProber;
import org.openqa.selenium.netty.server.NettyServer;
Expand Down Expand Up @@ -84,11 +85,21 @@ public void ensureBasicFunctionality() throws MalformedURLException, Interrupted
mergeArgs(eventBusFlags, "--bind-bus", "false", "--port", "" + sessionsPort)).run();
waitUntilUp(sessionsPort);

int queuerPort = PortProber.findFreePort();
new NewSessionQueuerServer().configure(
System.out,
System.err,
mergeArgs(eventBusFlags, "--bind-bus", "false", "--port", "" + queuerPort)).run();
waitUntilUp(sessionsPort);

int distributorPort = PortProber.findFreePort();
new DistributorServer().configure(
System.out,
System.err,
mergeArgs(eventBusFlags, "--bind-bus", "false", "--port", "" + distributorPort, "-s", "http://localhost:" + sessionsPort)).run();
mergeArgs(eventBusFlags,
"--bind-bus", "false", "--port", "" + distributorPort,
"-s", "http://localhost:" + sessionsPort,
"--sessionqueuer", "http://localhost:" + queuerPort)).run();
waitUntilUp(distributorPort);

int routerPort = PortProber.findFreePort();
Expand Down