Skip to content

Commit

Permalink
IGNITE-18826 Java thin client: fixed the pending requests race on clo…
Browse files Browse the repository at this point in the history
…se (apache#10549)
  • Loading branch information
NSAmelchev authored Feb 20, 2023
1 parent 5d7e5d9 commit 075a76a
Show file tree
Hide file tree
Showing 3 changed files with 162 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,9 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon
/** Pending requests. */
private final Map<Long, ClientRequestFuture> pendingReqs = new ConcurrentHashMap<>();

/** Lock to safely close pending requests. */
private final ReadWriteLock pendingReqsLock = new ReentrantReadWriteLock();

/** Topology change listeners. */
private final Collection<Consumer<ClientChannel>> topChangeLsnrs = new CopyOnWriteArrayList<>();

Expand Down Expand Up @@ -273,8 +276,15 @@ private void close(Exception cause) {

U.closeQuiet(sock);

for (ClientRequestFuture pendingReq : pendingReqs.values())
pendingReq.onDone(new ClientConnectionException("Channel is closed", cause));
pendingReqsLock.writeLock().lock();

try {
for (ClientRequestFuture pendingReq : pendingReqs.values())
pendingReq.onDone(new ClientConnectionException("Channel is closed", cause));
}
finally {
pendingReqsLock.writeLock().unlock();
}

notificationLsnrsGuard.readLock().lock();

Expand Down Expand Up @@ -333,17 +343,26 @@ private ClientRequestFuture send(ClientOperation op, Consumer<PayloadOutputChann
PayloadOutputChannel payloadCh = new PayloadOutputChannel(this);

try {
if (closed()) {
ClientConnectionException err = new ClientConnectionException("Channel is closed");
ClientRequestFuture fut;

eventListener.onRequestFail(connDesc, id, op.code(), op.name(), System.nanoTime() - startTimeNanos, err);
pendingReqsLock.readLock().lock();

throw err;
}
try {
if (closed()) {
ClientConnectionException err = new ClientConnectionException("Channel is closed");

eventListener.onRequestFail(connDesc, id, op.code(), op.name(), System.nanoTime() - startTimeNanos, err);

throw err;
}

ClientRequestFuture fut = new ClientRequestFuture(id, op, startTimeNanos);
fut = new ClientRequestFuture(id, op, startTimeNanos);

pendingReqs.put(id, fut);
pendingReqs.put(id, fut);
}
finally {
pendingReqsLock.readLock().unlock();
}

eventListener.onRequestStart(connDesc, id, op.code(), op.name());

Expand Down Expand Up @@ -691,9 +710,21 @@ private void handshake(ProtocolVersion ver, String user, String pwd, Map<String,
new ProtocolContext(ver).toString(), null));

while (true) {
ClientRequestFuture fut = new ClientRequestFuture(requestId, ClientOperation.HANDSHAKE);
ClientRequestFuture fut;

pendingReqsLock.readLock().lock();

pendingReqs.put(requestId, fut);
try {
if (closed())
throw new ClientConnectionException("Channel is closed");

fut = new ClientRequestFuture(requestId, ClientOperation.HANDSHAKE);

pendingReqs.put(requestId, fut);
}
finally {
pendingReqsLock.readLock().unlock();
}

handshakeReq(ver, user, pwd, userAttrs);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.net.ssl.SSLContext;

import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
Expand Down Expand Up @@ -156,6 +155,9 @@ public GridNioClientConnectionMultiplexer(ClientConfiguration cfg) {

GridNioFuture<GridNioSession> sesFut = srv.createSession(ch, meta, false, null);

if (sesFut.error() != null)
sesFut.get();

if (sslHandshakeFut != null)
sslHandshakeFut.get();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,84 @@

package org.apache.ignite.internal.client.thin;

import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.Ignition;
import org.apache.ignite.client.ClientCache;
import org.apache.ignite.client.ClientConnectionException;
import org.apache.ignite.client.IgniteClient;
import org.apache.ignite.client.SslMode;
import org.apache.ignite.client.events.ConnectionEventListener;
import org.apache.ignite.client.events.HandshakeStartEvent;
import org.apache.ignite.configuration.ClientConfiguration;
import org.apache.ignite.configuration.ClientConnectorConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.processors.odbc.ClientListenerProcessor;
import org.apache.ignite.internal.util.nio.GridNioServer;
import org.apache.ignite.mxbean.ClientProcessorMXBean;
import org.apache.ignite.testframework.GridTestUtils;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import static org.apache.ignite.testframework.GridTestUtils.getFieldValue;
import static org.apache.ignite.testframework.GridTestUtils.setFieldValue;
import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;

/**
* Test partition awareness of thin client on unstable topology.
*/
@RunWith(Parameterized.class)
public class ThinClientPartitionAwarenessUnstableTopologyTest extends ThinClientAbstractPartitionAwarenessTest {
/** */
@Parameterized.Parameter
public boolean sslEnabled;

/** @return Test parameters. */
@Parameterized.Parameters(name = "sslEnabled={0}")
public static Collection<?> parameters() {
return Arrays.asList(new Object[][] {{false}, {true}});
}

/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
super.afterTest();

stopAllGrids();
}

/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);

if (sslEnabled) {
cfg.setClientConnectorConfiguration(new ClientConnectorConfiguration()
.setSslEnabled(true)
.setSslClientAuth(true)
.setUseIgniteSslContextFactory(false)
.setSslContextFactory(GridTestUtils.sslFactory()));
}

return cfg;
}

/** {@inheritDoc} */
@Override protected ClientConfiguration getClientConfiguration(int... nodeIdxs) {
ClientConfiguration cfg = super.getClientConfiguration(nodeIdxs);

if (sslEnabled) {
cfg.setSslMode(SslMode.REQUIRED)
.setSslContextFactory(GridTestUtils.sslFactory());
}

return cfg;
}

/**
* Test that join of the new node is detected by the client and affects partition awareness.
*/
Expand Down Expand Up @@ -203,4 +263,61 @@ private void testPartitionAwareness(boolean partReq) {
assertOpOnChannel(opCh, ClientOperation.CACHE_PUT);
}
}

/** */
@Test
public void testSessionCloseBeforeHandshake() throws Exception {
startGrid(0);

ClientConfiguration cliCfg = getClientConfiguration(0)
.setEventListeners(new ConnectionEventListener() {
@Override public void onHandshakeStart(HandshakeStartEvent event) {
// Close connection.
stopAllGrids();
}
});

GridTestUtils.assertThrowsWithCause(() -> {
try (IgniteClient client = Ignition.startClient(cliCfg)) {
return client;
}
}, ClientConnectionException.class);
}

/** */
@Test
public void testCreateSessionAfterClose() throws Exception {
startGrids(2);

CountDownLatch srvStopped = new CountDownLatch(1);

AtomicBoolean dfltInited = new AtomicBoolean();

// The client should close pending requests on closing without waiting.
try (TcpIgniteClient client = new TcpIgniteClient((cfg, connMgr) -> {
// Skip default channel to successful client start.
if (!dfltInited.compareAndSet(false, true)) {
try {
// Connection manager should be stopped before opening a new connection.
srvStopped.await(getTestTimeout(), TimeUnit.MILLISECONDS);
}
catch (InterruptedException ignored) {
// No-op.
}
}

return new TcpClientChannel(cfg, connMgr);
}, getClientConfiguration(0))) {
GridNioServer<ByteBuffer> srv = getFieldValue(client.reliableChannel(), "connMgr", "srv");

// Make sure handshake data will not be recieved.
setFieldValue(srv, "skipRead", true);

GridTestUtils.runAsync(() -> {
assertTrue(waitForCondition(() -> getFieldValue(srv, "closed"), getTestTimeout()));

srvStopped.countDown();
});
}
}
}

0 comments on commit 075a76a

Please sign in to comment.