Skip to content

Commit

Permalink
Merge branch 'remkop-master'
Browse files Browse the repository at this point in the history
  • Loading branch information
Michael Barker committed Jul 15, 2016
2 parents 7b7e32a + 3975d23 commit 5961b7f
Show file tree
Hide file tree
Showing 3 changed files with 151 additions and 6 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package com.lmax.disruptor;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
* Variation of the {@link TimeoutBlockingWaitStrategy} that attempts to elide conditional wake-ups
* when the lock is uncontended.
*/
public class LiteTimeoutBlockingWaitStrategy implements WaitStrategy
{
private final Lock lock = new ReentrantLock();
private final Condition processorNotifyCondition = lock.newCondition();
private final AtomicBoolean signalNeeded = new AtomicBoolean(false);
private final long timeoutInNanos;

public LiteTimeoutBlockingWaitStrategy(final long timeout, final TimeUnit units)
{
timeoutInNanos = units.toNanos(timeout);
}

@Override
public long waitFor(
final long sequence,
final Sequence cursorSequence,
final Sequence dependentSequence,
final SequenceBarrier barrier)
throws AlertException, InterruptedException, TimeoutException
{
long nanos = timeoutInNanos;

long availableSequence;
if (cursorSequence.get() < sequence)
{
lock.lock();
try
{
while (cursorSequence.get() < sequence)
{
signalNeeded.getAndSet(true);

barrier.checkAlert();
nanos = processorNotifyCondition.awaitNanos(nanos);
if (nanos <= 0)
{
throw TimeoutException.INSTANCE;
}
}
}
finally
{
lock.unlock();
}
}

while ((availableSequence = dependentSequence.get()) < sequence)
{
barrier.checkAlert();
}

return availableSequence;
}

@Override
public void signalAllWhenBlocking()
{
if (signalNeeded.getAndSet(false))
{
lock.lock();
try
{
processorNotifyCondition.signalAll();
}
finally
{
lock.unlock();
}
}
}

@Override
public String toString()
{
return "LiteTimeoutBlockingWaitStrategy{" +
"processorNotifyCondition=" + processorNotifyCondition +
'}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package com.lmax.disruptor;

import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.junit.Assert.*;

import java.util.concurrent.TimeUnit;

import org.jmock.Expectations;
import org.jmock.Mockery;
import org.jmock.integration.junit4.JMock;
import org.junit.Test;
import org.junit.runner.RunWith;

@RunWith(JMock.class)
public class LiteTimeoutBlockingWaitStrategyTest
{
private final Mockery mockery = new Mockery();

@Test
public void shouldTimeoutWaitFor() throws Exception
{
final SequenceBarrier sequenceBarrier = mockery.mock(SequenceBarrier.class);

long theTimeout = 500;
LiteTimeoutBlockingWaitStrategy waitStrategy = new LiteTimeoutBlockingWaitStrategy(theTimeout, TimeUnit.MILLISECONDS);
Sequence cursor = new Sequence(5);
Sequence dependent = cursor;

mockery.checking(
new Expectations()
{
{
allowing(sequenceBarrier).checkAlert();
}
});

long t0 = System.currentTimeMillis();

try
{
waitStrategy.waitFor(6, cursor, dependent, sequenceBarrier);
fail("TimeoutException should have been thrown");
}
catch (TimeoutException e)
{
}

long t1 = System.currentTimeMillis();

long timeWaiting = t1 - t0;

assertThat(timeWaiting, greaterThanOrEqualTo(theTimeout));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ private static class MyEvent
private Object d;
}

private static EventFactory<MyEvent> FACTORY = new EventFactory<MyEvent>()
private static EventFactory<MyEvent> factory = new EventFactory<MyEvent>()
{
@Override
public MyEvent newInstance()
Expand All @@ -24,7 +24,7 @@ public MyEvent newInstance()
}
};

private static EventHandler<MyEvent> HANDLER_1 = new EventHandler<MyEvent>()
private static EventHandler<MyEvent> handler1 = new EventHandler<MyEvent>()
{
@Override
public void onEvent(MyEvent event, long sequence, boolean endOfBatch) throws Exception
Expand All @@ -33,7 +33,7 @@ public void onEvent(MyEvent event, long sequence, boolean endOfBatch) throws Exc
}
};

private static EventHandler<MyEvent> HANDLER_2 = new EventHandler<MyEvent>()
private static EventHandler<MyEvent> handler2 = new EventHandler<MyEvent>()
{
@Override
public void onEvent(MyEvent event, long sequence, boolean endOfBatch) throws Exception
Expand All @@ -42,7 +42,7 @@ public void onEvent(MyEvent event, long sequence, boolean endOfBatch) throws Exc
}
};

private static EventHandler<MyEvent> HANDLER_3 = new EventHandler<MyEvent>()
private static EventHandler<MyEvent> handler3 = new EventHandler<MyEvent>()
{
@Override
public void onEvent(MyEvent event, long sequence, boolean endOfBatch) throws Exception
Expand All @@ -53,9 +53,9 @@ public void onEvent(MyEvent event, long sequence, boolean endOfBatch) throws Exc

public static void main(String[] args)
{
Disruptor<MyEvent> disruptor = new Disruptor<MyEvent>(FACTORY, 1024, DaemonThreadFactory.INSTANCE);
Disruptor<MyEvent> disruptor = new Disruptor<MyEvent>(factory, 1024, DaemonThreadFactory.INSTANCE);

disruptor.handleEventsWith(HANDLER_1).then(HANDLER_2).then(HANDLER_3);
disruptor.handleEventsWith(handler1).then(handler2).then(handler3);

disruptor.start();
}
Expand Down

0 comments on commit 5961b7f

Please sign in to comment.