Skip to content

Commit

Permalink
Move ensure available logic into ProcessingSequenceBarrier
Browse files Browse the repository at this point in the history
  • Loading branch information
mikeb01 committed Apr 12, 2013
1 parent 9389458 commit 6e49c36
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 4 deletions.
2 changes: 1 addition & 1 deletion src/main/java/com/lmax/disruptor/AbstractSequencer.java
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,6 @@ public long getMinimumSequence()
@Override
public SequenceBarrier newBarrier(Sequence... sequencesToTrack)
{
return new ProcessingSequenceBarrier(waitStrategy, cursor, sequencesToTrack);
return new ProcessingSequenceBarrier(this, waitStrategy, cursor, sequencesToTrack);
}
}
14 changes: 14 additions & 0 deletions src/main/java/com/lmax/disruptor/MultiProducerSequencer.java
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,20 @@ public boolean isAvailable(long sequence)
return UNSAFE.getIntVolatile(availableBuffer, bufferAddress) == flag;
}

@Override
public long getHighestPublishedSequence(long lowerBound, long availableSequence)
{
for (long sequence = lowerBound; sequence <= availableSequence; sequence++)
{
if (!isAvailable(sequence))
{
return sequence - 1;
}
}

return availableSequence;
}

private int calculateAvailabilityFlag(final long sequence)
{
return (int) (sequence >>> indexShift);
Expand Down
14 changes: 12 additions & 2 deletions src/main/java/com/lmax/disruptor/ProcessingSequenceBarrier.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,14 @@ final class ProcessingSequenceBarrier implements SequenceBarrier
private final Sequence dependentSequence;
private volatile boolean alerted = false;
private Sequence cursorSequence;
private Sequencer sequencer;

public ProcessingSequenceBarrier(final WaitStrategy waitStrategy,
public ProcessingSequenceBarrier(final Sequencer sequencer,
final WaitStrategy waitStrategy,
final Sequence cursorSequence,
final Sequence[] dependentSequences)
{
this.sequencer = sequencer;
this.waitStrategy = waitStrategy;
this.cursorSequence = cursorSequence;
if (0 == dependentSequences.length)
Expand All @@ -49,7 +52,14 @@ public long waitFor(final long sequence)
{
checkAlert();

return waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);
long availableSequence = waitStrategy.waitFor(sequence, cursorSequence, dependentSequence, this);

if (availableSequence < sequence)
{
return availableSequence;
}

return sequencer.getHighestPublishedSequence(sequence, availableSequence);
}

@Override
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/com/lmax/disruptor/RingBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ public static <E> RingBuffer<E> create(ProducerType producerType,
@SuppressWarnings("unchecked")
public E getPublished(long sequence)
{
sequencer.ensureAvailable(sequence);
// sequencer.ensureAvailable(sequence);
return (E)entries[(int)sequence & indexMask];
}

Expand Down
2 changes: 2 additions & 0 deletions src/main/java/com/lmax/disruptor/Sequencer.java
Original file line number Diff line number Diff line change
Expand Up @@ -163,4 +163,6 @@ public interface Sequencer extends Cursored
* no sequences have been added.
*/
long getMinimumSequence();

long getHighestPublishedSequence(long sequence, long availableSequence);
}
6 changes: 6 additions & 0 deletions src/main/java/com/lmax/disruptor/SingleProducerSequencer.java
Original file line number Diff line number Diff line change
Expand Up @@ -196,4 +196,10 @@ public boolean isAvailable(long sequence)
{
return sequence <= cursor.get();
}

@Override
public long getHighestPublishedSequence(long lowerBound, long availableSequence)
{
return availableSequence;
}
}

0 comments on commit 6e49c36

Please sign in to comment.