From 6e49c36bda5eddcf448fde00cb38ad8ea9e78b41 Mon Sep 17 00:00:00 2001 From: Michael Barker Date: Fri, 12 Apr 2013 16:52:48 +1200 Subject: [PATCH] Move ensure available logic into ProcessingSequenceBarrier --- .../java/com/lmax/disruptor/AbstractSequencer.java | 2 +- .../com/lmax/disruptor/MultiProducerSequencer.java | 14 ++++++++++++++ .../lmax/disruptor/ProcessingSequenceBarrier.java | 14 ++++++++++++-- src/main/java/com/lmax/disruptor/RingBuffer.java | 2 +- src/main/java/com/lmax/disruptor/Sequencer.java | 2 ++ .../lmax/disruptor/SingleProducerSequencer.java | 6 ++++++ 6 files changed, 36 insertions(+), 4 deletions(-) diff --git a/src/main/java/com/lmax/disruptor/AbstractSequencer.java b/src/main/java/com/lmax/disruptor/AbstractSequencer.java index 3e370fb27..f97ec84ee 100644 --- a/src/main/java/com/lmax/disruptor/AbstractSequencer.java +++ b/src/main/java/com/lmax/disruptor/AbstractSequencer.java @@ -106,6 +106,6 @@ public long getMinimumSequence() @Override public SequenceBarrier newBarrier(Sequence... sequencesToTrack) { - return new ProcessingSequenceBarrier(waitStrategy, cursor, sequencesToTrack); + return new ProcessingSequenceBarrier(this, waitStrategy, cursor, sequencesToTrack); } } \ No newline at end of file diff --git a/src/main/java/com/lmax/disruptor/MultiProducerSequencer.java b/src/main/java/com/lmax/disruptor/MultiProducerSequencer.java index b9bf8e46a..f07ce41c9 100644 --- a/src/main/java/com/lmax/disruptor/MultiProducerSequencer.java +++ b/src/main/java/com/lmax/disruptor/MultiProducerSequencer.java @@ -278,6 +278,20 @@ public boolean isAvailable(long sequence) return UNSAFE.getIntVolatile(availableBuffer, bufferAddress) == flag; } + @Override + public long getHighestPublishedSequence(long lowerBound, long availableSequence) + { + for (long sequence = lowerBound; sequence <= availableSequence; sequence++) + { + if (!isAvailable(sequence)) + { + return sequence - 1; + } + } + + return availableSequence; + } + private int calculateAvailabilityFlag(final long sequence) { return (int) (sequence >>> indexShift); diff --git a/src/main/java/com/lmax/disruptor/ProcessingSequenceBarrier.java b/src/main/java/com/lmax/disruptor/ProcessingSequenceBarrier.java index e58bb651e..4f5b124c7 100644 --- a/src/main/java/com/lmax/disruptor/ProcessingSequenceBarrier.java +++ b/src/main/java/com/lmax/disruptor/ProcessingSequenceBarrier.java @@ -26,11 +26,14 @@ final class ProcessingSequenceBarrier implements SequenceBarrier private final Sequence dependentSequence; private volatile boolean alerted = false; private Sequence cursorSequence; + private Sequencer sequencer; - public ProcessingSequenceBarrier(final WaitStrategy waitStrategy, + public ProcessingSequenceBarrier(final Sequencer sequencer, + final WaitStrategy waitStrategy, final Sequence cursorSequence, final Sequence[] dependentSequences) { + this.sequencer = sequencer; this.waitStrategy = waitStrategy; this.cursorSequence = cursorSequence; if (0 == dependentSequences.length) @@ -49,7 +52,14 @@ public long waitFor(final long sequence) { checkAlert(); - return waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this); + long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this); + + if (availableSequence < sequence) + { + return availableSequence; + } + + return sequencer.getHighestPublishedSequence(sequence, availableSequence); } @Override diff --git a/src/main/java/com/lmax/disruptor/RingBuffer.java b/src/main/java/com/lmax/disruptor/RingBuffer.java index e0a86f8d1..67100a4bf 100644 --- a/src/main/java/com/lmax/disruptor/RingBuffer.java +++ b/src/main/java/com/lmax/disruptor/RingBuffer.java @@ -166,7 +166,7 @@ public static RingBuffer create(ProducerType producerType, @SuppressWarnings("unchecked") public E getPublished(long sequence) { - sequencer.ensureAvailable(sequence); +// sequencer.ensureAvailable(sequence); return (E)entries[(int)sequence & indexMask]; } diff --git a/src/main/java/com/lmax/disruptor/Sequencer.java b/src/main/java/com/lmax/disruptor/Sequencer.java index 37cf04b98..1cf732244 100644 --- a/src/main/java/com/lmax/disruptor/Sequencer.java +++ b/src/main/java/com/lmax/disruptor/Sequencer.java @@ -163,4 +163,6 @@ public interface Sequencer extends Cursored * no sequences have been added. */ long getMinimumSequence(); + + long getHighestPublishedSequence(long sequence, long availableSequence); } \ No newline at end of file diff --git a/src/main/java/com/lmax/disruptor/SingleProducerSequencer.java b/src/main/java/com/lmax/disruptor/SingleProducerSequencer.java index 8402608d7..6c9723bcf 100644 --- a/src/main/java/com/lmax/disruptor/SingleProducerSequencer.java +++ b/src/main/java/com/lmax/disruptor/SingleProducerSequencer.java @@ -196,4 +196,10 @@ public boolean isAvailable(long sequence) { return sequence <= cursor.get(); } + + @Override + public long getHighestPublishedSequence(long lowerBound, long availableSequence) + { + return availableSequence; + } }