Skip to content

Commit

Permalink
Phase 1 of decoupling socket and queue in critical section - TESTED
Browse files Browse the repository at this point in the history
This is a fairly substantial change to the pipeline queueRequest()
method, introducing a temporary connection queue to reduce interference
between clients and the response handler thread, and, to prepare for
stripping the critical zone into finer sections to reduce overall
contention.

Performance is somewhat (measurable but marginal) better than the
previous versionof the chunked pipeline.  Jitter seems to be lower
but that needs to be actually measured.
  • Loading branch information
Joubin Houshyar committed Jan 21, 2012
1 parent 3c3b93c commit 5a45980
Show file tree
Hide file tree
Showing 8 changed files with 122 additions and 60 deletions.
8 changes: 7 additions & 1 deletion core/api/src/main/java/org/jredis/JRedisFuture.java
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,13 @@ public interface JRedisFuture {
* @Redis QUIT
*/
public <K extends Object> Future<ResponseStatus> quit ();
public <K extends Object> Future<ResponseStatus> flush ();

/**
* Optional connection control command.
* @param <K>
* @return
*/
public Future<ResponseStatus> flush ();

// ------------------------------------------------------------------------
// "Commands operating on string values"
Expand Down
2 changes: 1 addition & 1 deletion core/api/src/main/java/org/jredis/protocol/Command.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public enum Command {
// connection handling
PING (RequestType.NO_ARG, ResponseType.STATUS),
QUIT (RequestType.NO_ARG, ResponseType.VIRTUAL),
CONN_FLUSH (RequestType.NO_ARG, ResponseType.NOP),
CONN_FLUSH (RequestType.NO_ARG, ResponseType.NOP),

// String values operations
SET (RequestType.KEY_VALUE, ResponseType.STATUS),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public void run() {
}
long queued = System.nanoTime() - start;
try {
jredis.flush();
long counter = frCounter.get(); // NOTE: excellent place to put implicit flush()
long delta_ns = System.nanoTime() - start;
long delta_ms = TimeUnit.MILLISECONDS.convert(delta_ns, TimeUnit.NANOSECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1028,7 +1028,7 @@ public <K extends Object> FutureStatus quit() {
return new FutureStatus(this.queueRequest(Command.QUIT));
}
@Override
public <K extends Object> FutureStatus flush() {
public FutureStatus flush() {
return new FutureStatus(this.queueRequest(Command.CONN_FLUSH));
}
/* ------------------------------- commands that don't get a response END --------- */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,12 +106,32 @@ public class ChunkedPipelineConnection
/** counted down on notifyConnect */
private CountDownLatch connectionEstablished;

// chunk buffer size
static final int MTU_SIZE = 2884; // assumes TCP MTU of 1500. pick some power of 8 less than MTU*x.
// chunk buffer offset
/** MTU multiples to use as upper bound of the size of the chunk buffer */
static final int MTU_FACTOR = 1; // TODO: ConnectionSpec me.

static final int MTU_SIZE = 1488;

/** chunk buffer size */
static final int CHUNK_BUFF_SIZE = MTU_SIZE * MTU_FACTOR;

/** minimum request size in bytes -- using PING e.g. 14 b */
static final int MIN_REQ_SIZE = 14;

/** Chunk Queue size (slots) */
static final int CHUNK_Q_SIZE = CHUNK_BUFF_SIZE / MIN_REQ_SIZE;

/** chunk buffer offset */
private int off = 0;

/** chunk buffer */
private byte[] chunkbuff;


/** Chunk Queue available slot index */
private int idx = 0;

/** Chunk Queue of requests in Chunk buffer */
private PendingCPRequest[] chunkqueue;


// ------------------------------------------------------------------------
// Constructor(s)
Expand Down Expand Up @@ -139,7 +159,8 @@ protected void initializeComponents () {
spec.setConnectionFlag(Flag.RELIABLE, true);
spec.setConnectionFlag(Flag.SHARED, true);

chunkbuff = new byte[MTU_SIZE];
chunkbuff = new byte[CHUNK_BUFF_SIZE];
chunkqueue = new PendingCPRequest[CHUNK_Q_SIZE];

super.initializeComponents();

Expand Down Expand Up @@ -182,10 +203,10 @@ protected Protocol newProtocolHandler () {
}

// TODO: write chunking + mod ProtocolBase.Stream...Request + Command.FLUSH_BUFFERS. // DONE
// @Override
// protected OutputStream newOutputStream(OutputStream socketOutputStream) {
// return new BufferedOutputStream(socketOutputStream);
// }
// @Override
// protected OutputStream newOutputStream(OutputStream socketOutputStream) {
// return new BufferedOutputStream(socketOutputStream, MTU_SIZE);
// }

/**
* Just make sure its a {@link FastBufferedInputStream}.
Expand All @@ -208,9 +229,10 @@ protected final InputStream newInputStream (InputStream socketInputStream) throw


/**
* This is a pseudo asynchronous method. The actual write to server does
* occur in this method, so when this method returns, your request has been
* sent. This simply defers the response read to the response handler.
* This is a true asynchronous method. The actual request write to server
* possibly does occur in this method if the connection determines it is
* optimal to flush the pipeline buffer, or, if you explicitly had requested
* flush via {@link Command#CONN_FLUSH}.
* <p>
* Other item of note is that once a QUIT request has been queued, no further
* requests are accepted and a ClientRuntimeException is thrown.
Expand Down Expand Up @@ -242,21 +264,25 @@ public final Future<Response> queueRequest (Command cmd, byte[]... args)
}

/* PendingCPRequest provides transparent hook to force flush on future get(..) */
final PendingRequest queuedRequest = new PendingCPRequest(this, cmd);
final PendingCPRequest queuedRequest = new PendingCPRequest(this, cmd);

/* possibly silly optimization, pulled out of sync block */
final OutputStream out = getOutputStream();
final boolean isflush = cmd == Command.CONN_FLUSH;
final boolean exceeds = reqbyteslen > MTU_SIZE;
final boolean exceeds = reqbyteslen > CHUNK_BUFF_SIZE;
final boolean isquit = cmd == Command.QUIT;

/* auth is used on connector initialization and must be sent asap */
final boolean doflush =
cmd == Command.AUTH ||
cmd == Command.SELECT ||
cmd == Command.AUTH ||
cmd == Command.SELECT ||
isquit ||
isflush;

/* NOTES:
* This hacked implementation is zero-copy on direct writes and will also copy
/* ------------
* NOTES:
*
* This ~hacked implementation is zero-copy on direct writes and will also copy
* directly to the chunk buffer. Performance for single threaded is a
* bit improved but for n threaded loading, it is really negligible.
*
Expand All @@ -274,48 +300,62 @@ public final Future<Response> queueRequest (Command cmd, byte[]... args)
* - equally annoying is the fact that if a client thread is flushing/writing
* to the socket, other threads could be writing to a mem buffer. (This would
* require double buffering ala graphics.)
*
*/

try {
synchronized (serviceLock) {
/* don't move -- off is contended */
boolean overflows = exceeds || off + reqbyteslen > MTU_SIZE ? true : false;
boolean overflows = exceeds || off + reqbyteslen > CHUNK_BUFF_SIZE ? true : false;

if(cmd != Command.QUIT) { // WATCH THIS SPACE -- BUG PRONE
if(overflows) {
out.write(chunkbuff, 0, off);
if(overflows) {
out.write(chunkbuff, 0, off);
out.flush();
off = 0;
for(int i=0; i<idx; i++) {
PendingCPRequest item = chunkqueue[i];
pendingResponseQueue.add(item);
}
idx = 0;
}

if(sendreq){
if(exceeds) {
/* can optimize and dispense with new byte[] -- only for large payloads */
/* chunkqueue should be empty and idx 0 : assert for now */
out.write(protocol.createRequestBuffer(cmd, args));
out.flush();
off = 0;
pendingResponseQueue.add(queuedRequest);
}
else {
// NOTE: this 'new' here is not necessary and is only because of copy/paste from
// ProtocolBase (see ProtocolHelper.writeReq..().
ProtocolHelper.writeRequestToBuffer(new ProtocolHelper.Buffer(chunkbuff, off), cmd, args);
off+=reqbyteslen;

chunkqueue[idx] = queuedRequest;
idx++;
}
}

if(sendreq){
if(exceeds) {
/* can optimize and dispense with new byte[] -- only for large payloads */
out.write(protocol.createRequestBuffer(cmd, args));
if(doflush) {
if(!isquit){
if(off>0){
out.write(chunkbuff, 0, off);
out.flush();
}
else {
// NOTE: this 'new' here is not necessary and is only because of copy/paste from
// ProtocolBase (see ProtocolHelper.writeReq..().
ProtocolHelper.writeRequestToBuffer(new ProtocolHelper.Buffer(chunkbuff, off), cmd, args);
off+=reqbyteslen;
off = 0;
for(int i=0; i<idx; i++) {
PendingCPRequest item = chunkqueue[i];
pendingResponseQueue.add(item);
}
idx = 0;
}
}

if(doflush) {
out.write(chunkbuff, 0, off);
out.flush();
off = 0;
else {
pendingQuit = true;
isActive.set(false);
pendingResponseQueue.add(queuedRequest);
}
}
else if(cmd == Command.QUIT) { // BUG ALERT: see above (ditto)
pendingQuit = true;
isActive.set(false);
}

/* NOTE: this is the bottleneck now and quite possibly the cause of the jitter */
/* cond here inside sync block is 1:1 consumer producer */
pendingResponseQueue.add(queuedRequest);
}
} catch (IOException e) {
Log.error("on %s", cmd.code);
Expand Down Expand Up @@ -487,9 +527,15 @@ public Response get(long timeout, TimeUnit unit)
}
final
private void requestFlush() {
// Log.log("[%s] requesting flush ..", cmd.code);
if(cmd != Command.QUIT) {
// Log.log("[%s] requesting flush .. INSIDE", cmd.code);
pipeline.queueRequest(Command.CONN_FLUSH);
}
else {
new Exception().printStackTrace();
}
// Log.log("[%s] requesting flush .. DONE", cmd.code);
}
}
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,19 +191,21 @@ public final Future<Response> queueRequest (Command cmd, byte[]... args)
if(!isConnected())
throw new NotConnectedException ("Not connected!");

if(pendingQuit)
throw new ClientRuntimeException("Pipeline shutting down: Quit in progess; no further requests are accepted.");

Protocol protocol = Assert.notNull(getProtocolHandler(), "thread protocol handler", ProviderException.class);
Request request = Assert.notNull(protocol.createRequest (cmd, args), "request object from handler", ProviderException.class);
// PendingRequest pendingResponse = new PendingRequest(request, cmd);
PendingRequest pendingResponse = new PendingRequest(cmd);

if(pendingQuit)
throw new ClientRuntimeException("Pipeline shutting down: Quit in progess; no further requests are accepted.");
if(cmd == Command.CONN_FLUSH) {
Log.log("%s not supported -- ignored", cmd.code);
return pendingResponse;
}

synchronized (serviceLock) {

if(cmd != Command.QUIT) {
// System.out.format("out is %s\n", getOutputStream().getClass().getName()); // TODO: REMOVE -- testing
// System.out.format("req is %s\n", request.getClass().getName()); // TODO: REMOVE -- testing
request.write(getOutputStream());
}
else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,16 @@

@SuppressWarnings("unused")
public class AdHocTestChunkPipeline {
static final int MAX_ITER = Integer.MAX_VALUE;
public static void main(String[] args) throws Throwable {
try {
AdHocTestChunkPipeline tester = new AdHocTestChunkPipeline();
while(true){
int iter = 0;
while(iter < MAX_ITER){
tester.run();
iter++;
}
tester.stop();
} catch (Exception e) {
e.printStackTrace();
// Log.error("fault", e); // TODO:
Expand All @@ -33,6 +37,9 @@ public AdHocTestChunkPipeline() {
static final int wcnt = 2;
static final int reqnums = 150000;

private void stop() {
jredis.quit();
}
private void run() {
final Thread[] workers = new Thread[wcnt];
for(int i=0;i< workers.length; i++){
Expand Down Expand Up @@ -73,7 +80,7 @@ private Runnable task (final JRedisFuture jredis) {
// conn.ping();
fCntr = conn.incr(cntr);
}
conn.flush();
// conn.flush();

// try {
// Long counter = fCntr.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.jredis.ClientRuntimeException;
import org.jredis.JRedisFuture;
import org.jredis.connector.ConnectionSpec;
import org.jredis.protocol.Command;
import org.jredis.ri.alphazero.connection.DefaultConnectionSpec;
import org.jredis.ri.alphazero.support.Log;
import org.testng.annotations.AfterTest;
Expand All @@ -34,7 +35,7 @@
*/
//@Test(invocationCount=20, threadPoolSize=5, sequential=false)
@Test(sequential = true, suiteName="JRedisChunkedPipeline-tests")
public class JRedisChunkedPIpelineClientTest extends JRedisFutureProviderTestsBase {
public class JRedisChunkedPipelineClientTest extends JRedisFutureProviderTestsBase {

// ------------------------------------------------------------------------
// TEST SETUP
Expand Down Expand Up @@ -77,8 +78,7 @@ protected JRedisFuture newProviderInstance () {
public void testQuit() {
try {
JRedisFuture pipeline = getProviderInstance();
pipeline.ping().get();
pipeline.quit().get();
pipeline.quit();
}
catch (Exception e) {
fail("QUIT" + e);
Expand Down

0 comments on commit 5a45980

Please sign in to comment.