Skip to content

Commit

Permalink
Don't cache head/tail index in Consumer/Producer
Browse files Browse the repository at this point in the history
  • Loading branch information
mgeier committed Apr 25, 2021
1 parent cc4191f commit 2573eb7
Showing 1 changed file with 27 additions and 42 deletions.
69 changes: 27 additions & 42 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,13 +306,11 @@ impl<T> RingBuffer<T> {
let buffer = Arc::new(self);
let p = Producer {
buffer: buffer.clone(),
head: Cell::new(0),
tail: Cell::new(0),
cached_head: Cell::new(0),
};
let c = Consumer {
buffer,
head: Cell::new(0),
tail: Cell::new(0),
cached_tail: Cell::new(0),
};
(p, c)
}
Expand Down Expand Up @@ -369,10 +367,8 @@ impl<T> RingBuffer<T> {
producer.buffer.head.swap(0, Ordering::Relaxed),
producer.buffer.tail.swap(0, Ordering::Relaxed)
);
producer.head.set(0);
producer.tail.set(0);
consumer.head.set(0);
consumer.tail.set(0);
producer.cached_head.set(0);
consumer.cached_tail.set(0);
}

/// Returns the capacity of the queue.
Expand Down Expand Up @@ -512,12 +508,7 @@ pub struct Producer<T> {
/// A copy of `buffer.head` for quick access.
///
/// This value can be stale and sometimes needs to be resynchronized with `buffer.head`.
head: Cell<usize>,

/// A copy of `buffer.tail` for quick access.
///
/// This value is always in sync with `buffer.tail`.
tail: Cell<usize>,
cached_head: Cell<usize>,
}

unsafe impl<T: Send> Send for Producer<T> {}
Expand Down Expand Up @@ -546,7 +537,6 @@ impl<T> Producer<T> {
}
let tail = self.buffer.increment1(tail);
self.buffer.tail.store(tail, Ordering::Release);
self.tail.set(tail);
Ok(())
} else {
Err(PushError::Full(value))
Expand Down Expand Up @@ -605,13 +595,13 @@ impl<T> Producer<T> {
/// For a safe alternative that provides [`Default`]-initialized slots,
/// see [`Producer::write_chunk()`].
pub fn write_chunk_uninit(&mut self, n: usize) -> Result<WriteChunkUninit<'_, T>, ChunkError> {
let tail = self.tail.get();
let tail = self.buffer.tail.load(Ordering::Acquire);

// Check if the queue has *possibly* not enough slots.
if self.buffer.capacity - self.buffer.distance(self.head.get(), tail) < n {
if self.buffer.capacity - self.buffer.distance(self.cached_head.get(), tail) < n {
// Refresh the head ...
let head = self.buffer.head.load(Ordering::Acquire);
self.head.set(head);
self.cached_head.set(head);

// ... and check if there *really* are not enough slots.
let slots = self.buffer.capacity - self.buffer.distance(head, tail);
Expand Down Expand Up @@ -648,8 +638,9 @@ impl<T> Producer<T> {
/// ```
pub fn slots(&self) -> usize {
let head = self.buffer.head.load(Ordering::Acquire);
self.head.set(head);
self.buffer.capacity - self.buffer.distance(head, self.tail.get())
let tail = self.buffer.tail.load(Ordering::Acquire);
self.cached_head.set(head);
self.buffer.capacity - self.buffer.distance(head, tail)
}

/// Returns `true` if there are no slots available for writing.
Expand Down Expand Up @@ -677,13 +668,13 @@ impl<T> Producer<T> {
/// This is a strict subset of the functionality implemented in write_chunk_uninit().
/// For performance, this special case is immplemented separately.
fn next_tail(&self) -> Option<usize> {
let tail = self.tail.get();
let tail = self.buffer.tail.load(Ordering::Acquire);

// Check if the queue is *possibly* full.
if self.buffer.distance(self.head.get(), tail) == self.buffer.capacity {
if self.buffer.distance(self.cached_head.get(), tail) == self.buffer.capacity {
// Refresh the head ...
let head = self.buffer.head.load(Ordering::Acquire);
self.head.set(head);
self.cached_head.set(head);

// ... and check if it's *really* full.
if self.buffer.distance(head, tail) == self.buffer.capacity {
Expand Down Expand Up @@ -715,15 +706,10 @@ pub struct Consumer<T> {
/// A reference to the ring buffer.
buffer: Arc<RingBuffer<T>>,

/// A copy of `buffer.head` for quick access.
///
/// This value is always in sync with `buffer.head`.
head: Cell<usize>,

/// A copy of `buffer.tail` for quick access.
///
/// This value can be stale and sometimes needs to be resynchronized with `buffer.tail`.
tail: Cell<usize>,
cached_tail: Cell<usize>,
}

unsafe impl<T: Send> Send for Consumer<T> {}
Expand Down Expand Up @@ -760,7 +746,6 @@ impl<T> Consumer<T> {
let value = unsafe { self.buffer.slot_ptr(head).read() };
let head = self.buffer.increment1(head);
self.buffer.head.store(head, Ordering::Release);
self.head.set(head);
Ok(value)
} else {
Err(PopError::Empty)
Expand Down Expand Up @@ -856,13 +841,13 @@ impl<T> Consumer<T> {
///
/// See the [crate-level documentation](crate#examples) for more examples.
pub fn read_chunk(&mut self, n: usize) -> Result<ReadChunk<'_, T>, ChunkError> {
let head = self.head.get();
let head = self.buffer.head.load(Ordering::Acquire);

// Check if the queue has *possibly* not enough slots.
if self.buffer.distance(head, self.tail.get()) < n {
if self.buffer.distance(head, self.cached_tail.get()) < n {
// Refresh the tail ...
let tail = self.buffer.tail.load(Ordering::Acquire);
self.tail.set(tail);
self.cached_tail.set(tail);

// ... and check if there *really* are not enough slots.
let slots = self.buffer.distance(head, tail);
Expand Down Expand Up @@ -899,9 +884,10 @@ impl<T> Consumer<T> {
/// assert_eq!(c.slots(), 0);
/// ```
pub fn slots(&self) -> usize {
let head = self.buffer.head.load(Ordering::Acquire);
let tail = self.buffer.tail.load(Ordering::Acquire);
self.tail.set(tail);
self.buffer.distance(self.head.get(), tail)
self.cached_tail.set(tail);
self.buffer.distance(head, tail)
}

/// Returns `true` if there are no slots available for reading.
Expand Down Expand Up @@ -929,13 +915,13 @@ impl<T> Consumer<T> {
/// This is a strict subset of the functionality implemented in read_chunk().
/// For performance, this special case is immplemented separately.
fn next_head(&self) -> Option<usize> {
let head = self.head.get();
let head = self.buffer.head.load(Ordering::Acquire);

// Check if the queue is *possibly* empty.
if head == self.tail.get() {
if head == self.cached_tail.get() {
// Refresh the tail ...
let tail = self.buffer.tail.load(Ordering::Acquire);
self.tail.set(tail);
self.cached_tail.set(tail);

// ... and check if it's *really* empty.
if head == tail {
Expand Down Expand Up @@ -1144,9 +1130,9 @@ impl<T> WriteChunkUninit<'_, T> {
}

unsafe fn commit_unchecked(self, n: usize) -> usize {
let tail = self.producer.buffer.increment(self.producer.tail.get(), n);
let tail = self.producer.buffer.tail.load(Ordering::Acquire);
let tail = self.producer.buffer.increment(tail, n);
self.producer.buffer.tail.store(tail, Ordering::Release);
self.producer.tail.set(tail);
n
}

Expand Down Expand Up @@ -1275,7 +1261,7 @@ impl<T> ReadChunk<'_, T> {
}

unsafe fn commit_unchecked(self, n: usize) -> usize {
let head = self.consumer.head.get();
let head = self.consumer.buffer.head.load(Ordering::Acquire);
// Safety: head has not yet been incremented
let ptr = self.consumer.buffer.slot_ptr(head);
let first_len = self.first_len.min(n);
Expand All @@ -1289,7 +1275,6 @@ impl<T> ReadChunk<'_, T> {
}
let head = self.consumer.buffer.increment(head, n);
self.consumer.buffer.head.store(head, Ordering::Release);
self.consumer.head.set(head);
n
}

Expand Down

0 comments on commit 2573eb7

Please sign in to comment.