From 0875fbbdfceee39d33753dd7b80960f61b9e6416 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Sun, 23 Jul 2023 11:44:23 -0700 Subject: [PATCH 01/25] add `WaitCell::subscribe` future --- maitake/src/sync/wait_cell.rs | 72 ++++++++++++++++++++++++++++++++++- 1 file changed, 71 insertions(+), 1 deletion(-) diff --git a/maitake/src/sync/wait_cell.rs b/maitake/src/sync/wait_cell.rs index 63234a3c..8e8e360f 100644 --- a/maitake/src/sync/wait_cell.rs +++ b/maitake/src/sync/wait_cell.rs @@ -73,6 +73,18 @@ pub enum RegisterError { pub struct Wait<'a> { /// The [`WaitCell`] being waited on. cell: &'a WaitCell, + + /// True if the `WaitCell` was closed while pre-registering this `Wait` + /// future during a call to [`WaitCell::subscribe()`]. + already_closed: bool, +} + +/// Future returned from [`WaitCell::subscribe()`]. +#[derive(Debug)] +#[must_use = "futures do nothing unless `.await`ed or `poll`ed"] +pub struct Subscribe<'a> { + /// The [`WaitCell`] being waited on. + cell: &'a WaitCell, } #[derive(Eq, PartialEq, Copy, Clone)] @@ -185,7 +197,14 @@ impl WaitCell { /// **Note**: The calling task's [`Waker`] is not registered until AFTER the /// first time the returned [`Wait`] future is polled. pub fn wait(&self) -> Wait<'_> { - Wait { cell: self } + Wait { + cell: self, + already_closed: false, + } + } + + pub fn subscribe(&self) -> Subscribe<'_> { + Subscribe { cell: self } } /// Wake the [`Waker`] stored in this cell. @@ -328,6 +347,26 @@ impl Future for Wait<'_> { } } +// === impl Subscribe === + +impl<'cell> Future for Subscribe<'cell> { + type Output = Wait<'cell>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let already_closed = loop { + match test_dbg!(self.cell.register_wait(cx.waker())) { + Ok(_) => break true, + Err(RegisterError::Closed) => break false, + _ => {} + } + }; + Poll::Ready(Wait { + cell: self.cell, + already_closed, + }) + } +} + // === impl State === impl State { @@ -426,6 +465,37 @@ mod tests { cell.wake(); + assert_ready_ok!(task.poll(), "should have been woken"); + } + /// Tests behavior when a `Wait` future is created and the `WaitCell` is + /// woken *between* the call to `wait()` and the first time the `Wait` future + /// is polled. + #[test] + fn wake_before_poll() { + let _trace = crate::util::test::trace_init(); + + let mut task = task::spawn(async move { + let cell = WaitCell::new(); + let wait = cell.wait(); + cell.wake(); + wait.await + }); + + assert_ready_ok!(task.poll(), "should have been woken"); + } + + /// Like `wake_before_poll` but with `close()` rather than `wait()`. + #[test] + fn close_before_poll() { + let _trace = crate::util::test::trace_init(); + + let mut task = task::spawn(async move { + let cell = WaitCell::new(); + let wait = cell.wait(); + cell.wake(); + wait.await + }); + assert_ready_ok!(task.poll(), "should have been woken"); } } From 5b76061c3cf0ede74639772ba7cbb31f17e607c8 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Sun, 23 Jul 2023 11:45:13 -0700 Subject: [PATCH 02/25] whoops forgot to actually use `already_closed` --- maitake/src/sync/wait_cell.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/maitake/src/sync/wait_cell.rs b/maitake/src/sync/wait_cell.rs index 8e8e360f..81f9e695 100644 --- a/maitake/src/sync/wait_cell.rs +++ b/maitake/src/sync/wait_cell.rs @@ -324,6 +324,10 @@ impl Future for Wait<'_> { type Output = Result<(), super::Closed>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + if self.already_closed { + return super::closed(); + } + // Try to take the cell's `WOKEN` bit to see if we were previously // waiting and then received a notification. if test_dbg!(self.cell.fetch_and(!State::WOKEN, AcqRel)).is(State::WOKEN) { From f65ff5abfc3a59eb40f649a1b0f56a38bca52b98 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Sun, 23 Jul 2023 11:54:28 -0700 Subject: [PATCH 03/25] HAHAHAHA I DID IT BACKWARDS --- maitake/src/sync/wait_cell.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/maitake/src/sync/wait_cell.rs b/maitake/src/sync/wait_cell.rs index 81f9e695..bf1b868f 100644 --- a/maitake/src/sync/wait_cell.rs +++ b/maitake/src/sync/wait_cell.rs @@ -359,8 +359,8 @@ impl<'cell> Future for Subscribe<'cell> { fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let already_closed = loop { match test_dbg!(self.cell.register_wait(cx.waker())) { - Ok(_) => break true, - Err(RegisterError::Closed) => break false, + Ok(_) => break false, + Err(RegisterError::Closed) => break true, _ => {} } }; From fdfd27d0633338acc474ba0be64032d56fc4c3d9 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Sun, 23 Jul 2023 12:21:56 -0700 Subject: [PATCH 04/25] add loom test for subscribe --- maitake/src/sync/wait_cell.rs | 53 ++++++++++++++++++----------------- 1 file changed, 28 insertions(+), 25 deletions(-) diff --git a/maitake/src/sync/wait_cell.rs b/maitake/src/sync/wait_cell.rs index bf1b868f..999807a7 100644 --- a/maitake/src/sync/wait_cell.rs +++ b/maitake/src/sync/wait_cell.rs @@ -471,36 +471,16 @@ mod tests { assert_ready_ok!(task.poll(), "should have been woken"); } - /// Tests behavior when a `Wait` future is created and the `WaitCell` is - /// woken *between* the call to `wait()` and the first time the `Wait` future - /// is polled. - #[test] - fn wake_before_poll() { - let _trace = crate::util::test::trace_init(); - - let mut task = task::spawn(async move { - let cell = WaitCell::new(); - let wait = cell.wait(); - cell.wake(); - wait.await - }); - assert_ready_ok!(task.poll(), "should have been woken"); - } - - /// Like `wake_before_poll` but with `close()` rather than `wait()`. #[test] - fn close_before_poll() { + fn subscribe() { let _trace = crate::util::test::trace_init(); - - let mut task = task::spawn(async move { + futures::executor::block_on(async { let cell = WaitCell::new(); - let wait = cell.wait(); + let wait = cell.subscribe().await; cell.wake(); - wait.await - }); - - assert_ready_ok!(task.poll(), "should have been woken"); + wait.await.unwrap(); + }) } } @@ -593,4 +573,27 @@ mod loom { info!("wait'd"); }); } + + #[test] + fn subscribe() { + crate::loom::model(|| { + future::block_on(async move { + let cell = Arc::new(WaitCell::new()); + let wait = cell.subscribe().await; + + thread::spawn({ + let waker = cell.clone(); + move || { + info!("waking"); + waker.wake(); + info!("woken"); + } + }); + + info!("waiting"); + wait.await.expect("wait should be woken, not closed"); + info!("wait'd"); + }); + }); + } } From 8dae68a49ac2a2746690e00fc9ad9551832542b8 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Sun, 23 Jul 2023 14:11:13 -0700 Subject: [PATCH 05/25] fix busy loop --- maitake/src/sync/wait_cell.rs | 27 ++++++++++++++++----------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/maitake/src/sync/wait_cell.rs b/maitake/src/sync/wait_cell.rs index 999807a7..7bf8c403 100644 --- a/maitake/src/sync/wait_cell.rs +++ b/maitake/src/sync/wait_cell.rs @@ -74,9 +74,7 @@ pub struct Wait<'a> { /// The [`WaitCell`] being waited on. cell: &'a WaitCell, - /// True if the `WaitCell` was closed while pre-registering this `Wait` - /// future during a call to [`WaitCell::subscribe()`]. - already_closed: bool, + presubscribe: Poll>, } /// Future returned from [`WaitCell::subscribe()`]. @@ -324,8 +322,9 @@ impl Future for Wait<'_> { type Output = Result<(), super::Closed>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - if self.already_closed { - return super::closed(); + // Did a wakeup occur while we were pre-registering the future? + if self.presubscribe.is_ready() { + return self.presubscribe; } // Try to take the cell's `WOKEN` bit to see if we were previously @@ -357,16 +356,22 @@ impl<'cell> Future for Subscribe<'cell> { type Output = Wait<'cell>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let already_closed = loop { - match test_dbg!(self.cell.register_wait(cx.waker())) { - Ok(_) => break false, - Err(RegisterError::Closed) => break true, - _ => {} + let presubscribe = match test_dbg!(self.cell.register_wait(cx.waker())) { + Ok(_) => Poll::Pending, + Err(RegisterError::Closed) => super::closed(), + Err(RegisterError::Registering) => { + // yield and try again + cx.waker().wake_by_ref(); + return Poll::Pending; + } + Err(RegisterError::Waking) => { + // we are also woken + Poll::Ready(Ok(())) } }; Poll::Ready(Wait { cell: self.cell, - already_closed, + presubscribe, }) } } From 7721aa18d6645997b92db867866b326bd8d3d48a Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Sun, 23 Jul 2023 14:13:36 -0700 Subject: [PATCH 06/25] WHOOPS --- maitake/src/sync/wait_cell.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/maitake/src/sync/wait_cell.rs b/maitake/src/sync/wait_cell.rs index 7bf8c403..9f872f8e 100644 --- a/maitake/src/sync/wait_cell.rs +++ b/maitake/src/sync/wait_cell.rs @@ -197,7 +197,7 @@ impl WaitCell { pub fn wait(&self) -> Wait<'_> { Wait { cell: self, - already_closed: false, + presubscribe: Poll::Pending, } } From bbaf301ffbdfa2aa851ed79979c292b1a0eaecb7 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Sun, 23 Jul 2023 14:19:10 -0700 Subject: [PATCH 07/25] lol --- maitake/src/trace.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/maitake/src/trace.rs b/maitake/src/trace.rs index 783f0067..a1a163a3 100644 --- a/maitake/src/trace.rs +++ b/maitake/src/trace.rs @@ -76,14 +76,14 @@ macro_rules! debug_span { }; } -#[cfg(not(test))] +#[cfg(all(not(test), not(maitake_ultraverbose)))] macro_rules! test_dbg { ($e:expr) => { $e }; } -#[cfg(test)] +#[cfg(any(test, maitake_ultraverbose))] macro_rules! test_dbg { ($e:expr) => { match $e { @@ -100,24 +100,24 @@ macro_rules! test_dbg { }; } -#[cfg(not(test))] +#[cfg(all(not(test), not(maitake_ultraverbose)))] macro_rules! test_debug { ($($args:tt)+) => {}; } -#[cfg(test)] +#[cfg(any(test, maitake_ultraverbose))] macro_rules! test_debug { ($($args:tt)+) => { debug!($($args)+); }; } -#[cfg(not(test))] +#[cfg(all(not(test), not(maitake_ultraverbose)))] macro_rules! test_trace { ($($args:tt)+) => {}; } -#[cfg(test)] +#[cfg(any(test, maitake_ultraverbose))] macro_rules! test_trace { ($($args:tt)+) => { trace!($($args)+); From 739649fe2ef05c7243ca54c1ebf8fe0bc166a64b Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Sun, 23 Jul 2023 14:20:53 -0700 Subject: [PATCH 08/25] gah --- maitake/src/sync/wait_queue.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/maitake/src/sync/wait_queue.rs b/maitake/src/sync/wait_queue.rs index 40dd07c9..cacf3135 100644 --- a/maitake/src/sync/wait_queue.rs +++ b/maitake/src/sync/wait_queue.rs @@ -26,7 +26,7 @@ use core::{ task::{Context, Poll, Waker}, }; use mycelium_bitfield::{bitfield, enum_from_bits, FromBits}; -#[cfg(test)] +#[cfg(any(test, maitake_ultraverbose))] use mycelium_util::fmt; use mycelium_util::sync::CachePadded; use pin_project::{pin_project, pinned_drop}; From 9d91d8a4f11804768bf1790a772c07e571923c20 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Sun, 23 Jul 2023 14:33:54 -0700 Subject: [PATCH 09/25] consume prev woken bit? --- maitake/src/sync/wait_cell.rs | 31 +++++++++++++++++++------------ 1 file changed, 19 insertions(+), 12 deletions(-) diff --git a/maitake/src/sync/wait_cell.rs b/maitake/src/sync/wait_cell.rs index 9f872f8e..fa6ad645 100644 --- a/maitake/src/sync/wait_cell.rs +++ b/maitake/src/sync/wait_cell.rs @@ -127,18 +127,25 @@ impl WaitCell { trace!(wait_cell = ?fmt::ptr(self), ?waker, "registering waker"); // this is based on tokio's AtomicWaker synchronization strategy - match test_dbg!(self.compare_exchange(State::WAITING, State::REGISTERING, Acquire)) { - // someone else is notifying, so don't wait! - Err(actual) if test_dbg!(actual.is(State::CLOSED)) => { - return Err(RegisterError::Closed); - } - Err(actual) - if test_dbg!(actual.is(State::WAKING)) || test_dbg!(actual.is(State::WOKEN)) => - { - return Err(RegisterError::Waking); + let mut cur = State::WAITING; + loop { + match test_dbg!(self.compare_exchange(cur, State::REGISTERING, Acquire)) { + // someone else is notifying, so don't wait! + Err(actual) if test_dbg!(actual.is(State::CLOSED)) => { + return Err(RegisterError::Closed); + } + Err(actual) if actual == State::WOKEN => { + cur = actual; + } + + Err(actual) if test_dbg!(actual.is(State::WAKING)) => { + return Err(RegisterError::Waking); + } + Err(_) => return Err(RegisterError::Registering), + Ok(_) => { + break; + } } - Err(_) => return Err(RegisterError::Registering), - Ok(_) => {} } test_debug!("-> wait cell locked!"); @@ -323,7 +330,7 @@ impl Future for Wait<'_> { fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { // Did a wakeup occur while we were pre-registering the future? - if self.presubscribe.is_ready() { + if test_dbg!(self.presubscribe.is_ready()) { return self.presubscribe; } From a5abed61f6d3426162e630b887dbee2d72e11a7e Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Sun, 23 Jul 2023 14:38:54 -0700 Subject: [PATCH 10/25] always take woken bit at the top of poll --- maitake/src/sync/wait_cell.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/maitake/src/sync/wait_cell.rs b/maitake/src/sync/wait_cell.rs index fa6ad645..25cb8bbb 100644 --- a/maitake/src/sync/wait_cell.rs +++ b/maitake/src/sync/wait_cell.rs @@ -329,17 +329,17 @@ impl Future for Wait<'_> { type Output = Result<(), super::Closed>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - // Did a wakeup occur while we were pre-registering the future? - if test_dbg!(self.presubscribe.is_ready()) { - return self.presubscribe; - } - // Try to take the cell's `WOKEN` bit to see if we were previously // waiting and then received a notification. if test_dbg!(self.cell.fetch_and(!State::WOKEN, AcqRel)).is(State::WOKEN) { return Poll::Ready(Ok(())); } + // Did a wakeup occur while we were pre-registering the future? + if test_dbg!(self.presubscribe.is_ready()) { + return self.presubscribe; + } + match test_dbg!(self.cell.register_wait(cx.waker())) { Ok(_) => Poll::Pending, Err(RegisterError::Registering) => { From 944f0aae15b155159d65c2ddbc65a42e70a8edd2 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Sun, 23 Jul 2023 14:44:29 -0700 Subject: [PATCH 11/25] ghghghghghghghg --- maitake/src/sync/wait_cell.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/maitake/src/sync/wait_cell.rs b/maitake/src/sync/wait_cell.rs index 25cb8bbb..da88d5b6 100644 --- a/maitake/src/sync/wait_cell.rs +++ b/maitake/src/sync/wait_cell.rs @@ -134,7 +134,7 @@ impl WaitCell { Err(actual) if test_dbg!(actual.is(State::CLOSED)) => { return Err(RegisterError::Closed); } - Err(actual) if actual == State::WOKEN => { + Err(actual) if actual.is(State::WOKEN) => { cur = actual; } @@ -260,7 +260,7 @@ impl WaitCell { if close { bits.0 |= State::CLOSED.0; } - if test_dbg!(self.fetch_or(bits, AcqRel)) == State::WAITING { + if test_dbg!(self.fetch_or(bits, AcqRel)).is(State::WAITING) { // we have the lock! let waker = self.waker.with_mut(|thread| unsafe { (*thread).take() }); @@ -386,11 +386,11 @@ impl<'cell> Future for Subscribe<'cell> { // === impl State === impl State { - const WAITING: Self = Self(0b00); - const REGISTERING: Self = Self(0b01); - const WAKING: Self = Self(0b10); - const CLOSED: Self = Self(0b100); - const WOKEN: Self = Self(0b1000); + const WAITING: Self = Self(1 << 1); + const REGISTERING: Self = Self(1 << 2); + const WAKING: Self = Self(1 << 3); + const CLOSED: Self = Self(1 << 4); + const WOKEN: Self = Self(1 << 5); fn is(self, Self(state): Self) -> bool { self.0 & state == state From 60b84f4cb48f10e1ce5e86e1e3ebb90d2eb97922 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Mon, 24 Jul 2023 10:02:33 -0700 Subject: [PATCH 12/25] fix wrong access --- maitake/src/sync/wait_cell.rs | 63 +++++++++++++++++------------------ maitake/src/util.rs | 2 +- 2 files changed, 31 insertions(+), 34 deletions(-) diff --git a/maitake/src/sync/wait_cell.rs b/maitake/src/sync/wait_cell.rs index da88d5b6..4906f147 100644 --- a/maitake/src/sync/wait_cell.rs +++ b/maitake/src/sync/wait_cell.rs @@ -40,7 +40,7 @@ use mycelium_util::{fmt, sync::CachePadded}; /// [`wait`]: Self::wait /// [`wake`]: Self::wake pub struct WaitCell { - lock: CachePadded, + state: CachePadded, waker: UnsafeCell>, } @@ -96,7 +96,7 @@ impl WaitCell { #[must_use] pub fn new() -> Self { Self { - lock: CachePadded::new(AtomicUsize::new(State::WAITING.0)), + state: CachePadded::new(AtomicUsize::new(State::WAITING.0)), waker: UnsafeCell::new(None), } } @@ -127,26 +127,21 @@ impl WaitCell { trace!(wait_cell = ?fmt::ptr(self), ?waker, "registering waker"); // this is based on tokio's AtomicWaker synchronization strategy - let mut cur = State::WAITING; - loop { - match test_dbg!(self.compare_exchange(cur, State::REGISTERING, Acquire)) { - // someone else is notifying, so don't wait! - Err(actual) if test_dbg!(actual.is(State::CLOSED)) => { - return Err(RegisterError::Closed); - } - Err(actual) if actual.is(State::WOKEN) => { - cur = actual; - } + match test_dbg!(self.compare_exchange(State::WAITING, State::REGISTERING, Acquire)) { + // someone else is notifying, so don't wait! + Err(actual) if test_dbg!(actual.contains(State::CLOSED)) => { + return Err(RegisterError::Closed); + } - Err(actual) if test_dbg!(actual.is(State::WAKING)) => { - return Err(RegisterError::Waking); - } - Err(_) => return Err(RegisterError::Registering), - Ok(_) => { - break; - } + Err(actual) + if test_dbg!(actual.contains(State::WAKING)) + || test_dbg!(actual.contains(State::WOKEN)) => + { + return Err(RegisterError::Waking); } - } + Err(_) => return Err(RegisterError::Registering), + Ok(_) => {} + }; test_debug!("-> wait cell locked!"); let prev_waker = self.waker.with_mut(|old_waker| unsafe { @@ -188,7 +183,7 @@ impl WaitCell { // Was the `CLOSED` bit set while we were clearing other bits? // If so, the cell is closed. Otherwise, we must have been notified. - if state.is(State::CLOSED) { + if state.contains(State::CLOSED) { Err(RegisterError::Closed) } else { Err(RegisterError::Waking) @@ -208,6 +203,7 @@ impl WaitCell { } } + /// Pre-subscribe to notifications from this `WaitCell`. pub fn subscribe(&self) -> Subscribe<'_> { Subscribe { cell: self } } @@ -260,7 +256,7 @@ impl WaitCell { if close { bits.0 |= State::CLOSED.0; } - if test_dbg!(self.fetch_or(bits, AcqRel)).is(State::WAITING) { + if test_dbg!(self.fetch_or(bits, AcqRel)) == State::WAITING { // we have the lock! let waker = self.waker.with_mut(|thread| unsafe { (*thread).take() }); @@ -283,7 +279,7 @@ impl WaitCell { State(new): State, success: Ordering, ) -> Result { - self.lock + self.state .compare_exchange(curr, new, success, Acquire) .map(State) .map_err(State) @@ -291,17 +287,17 @@ impl WaitCell { #[inline(always)] fn fetch_and(&self, State(state): State, order: Ordering) -> State { - State(self.lock.fetch_and(state, order)) + State(self.state.fetch_and(state, order)) } #[inline(always)] fn fetch_or(&self, State(state): State, order: Ordering) -> State { - State(self.lock.fetch_or(state, order)) + State(self.state.fetch_or(state, order)) } #[inline(always)] fn current_state(&self) -> State { - State(self.lock.load(Acquire)) + State(self.state.load(Acquire)) } } @@ -331,7 +327,7 @@ impl Future for Wait<'_> { fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { // Try to take the cell's `WOKEN` bit to see if we were previously // waiting and then received a notification. - if test_dbg!(self.cell.fetch_and(!State::WOKEN, AcqRel)).is(State::WOKEN) { + if test_dbg!(self.cell.fetch_and(!State::WOKEN, AcqRel)).contains(State::WOKEN) { return Poll::Ready(Ok(())); } @@ -386,14 +382,15 @@ impl<'cell> Future for Subscribe<'cell> { // === impl State === impl State { - const WAITING: Self = Self(1 << 1); - const REGISTERING: Self = Self(1 << 2); - const WAKING: Self = Self(1 << 3); + const WAITING: Self = Self(1 << 0); + const REGISTERING: Self = Self(1 << 1); + const WAKING: Self = Self(1 << 2); + const WOKEN: Self = Self(1 << 3); const CLOSED: Self = Self(1 << 4); - const WOKEN: Self = Self(1 << 5); - fn is(self, Self(state): Self) -> bool { - self.0 & state == state + fn contains(self, Self(state): Self) -> bool { + test_trace!("{:#?}.contains({state:#b})", self.0); + test_dbg!(self.0) & test_dbg!(state) == state } } diff --git a/maitake/src/util.rs b/maitake/src/util.rs index a93c2775..025c95e7 100644 --- a/maitake/src/util.rs +++ b/maitake/src/util.rs @@ -6,7 +6,7 @@ pub(crate) use self::wake_batch::WakeBatch; macro_rules! fmt_bits { ($self: expr, $f: expr, $has_states: ident, $($name: ident),+) => { $( - if $self.is(Self::$name) { + if $self.contains(Self::$name) { if $has_states { $f.write_str(" | ")?; } From c10a0d60316ce2fe49b64195716f624ab7dbce20 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Mon, 24 Jul 2023 11:19:08 -0700 Subject: [PATCH 13/25] ok i think its good now --- maitake/src/sync/wait_cell.rs | 127 +++++++++++++++++++++++----------- maitake/src/trace.rs | 12 ++++ 2 files changed, 98 insertions(+), 41 deletions(-) diff --git a/maitake/src/sync/wait_cell.rs b/maitake/src/sync/wait_cell.rs index 4906f147..7379c93d 100644 --- a/maitake/src/sync/wait_cell.rs +++ b/maitake/src/sync/wait_cell.rs @@ -1,6 +1,7 @@ //! An atomically registered [`Waker`], for waking a single task. //! //! See the documentation for the [`WaitCell`] type for details. +use super::Closed; use crate::loom::{ cell::UnsafeCell, sync::atomic::{ @@ -104,6 +105,19 @@ impl WaitCell { } impl WaitCell { + /// Poll to wait on this `WaitCell`. + pub fn poll_wait(&self, cx: &mut Context<'_>) -> Poll> { + match test_dbg!(self.register_wait(cx.waker())) { + Ok(()) => Poll::Pending, + Err(RegisterError::Closed) => super::closed(), + Err(RegisterError::Waking) => Poll::Ready(Ok(())), + Err(RegisterError::Registering) => { + cx.waker().wake_by_ref(); + Poll::Pending + } + } + } + /// Register `waker` with this `WaitCell`. /// /// Once a [`Waker`] has been registered, a subsequent call to [`wake`] will @@ -124,24 +138,26 @@ impl WaitCell { /// /// [`wake`]: Self::wake pub fn register_wait(&self, waker: &Waker) -> Result<(), RegisterError> { + enter_test_debug_span!("WaitCell::register_wait"); trace!(wait_cell = ?fmt::ptr(self), ?waker, "registering waker"); // this is based on tokio's AtomicWaker synchronization strategy match test_dbg!(self.compare_exchange(State::WAITING, State::REGISTERING, Acquire)) { - // someone else is notifying, so don't wait! Err(actual) if test_dbg!(actual.contains(State::CLOSED)) => { return Err(RegisterError::Closed); } - - Err(actual) - if test_dbg!(actual.contains(State::WAKING)) - || test_dbg!(actual.contains(State::WOKEN)) => - { + Err(actual) if test_dbg!(actual.contains(State::WOKEN)) => { + // take the wakeup + self.fetch_and(!State::WOKEN, Release); + return Err(RegisterError::Waking); + } + // someone else is notifying, so don't wait! + Err(actual) if test_dbg!(actual.contains(State::WAKING)) => { return Err(RegisterError::Waking); } Err(_) => return Err(RegisterError::Registering), Ok(_) => {} - }; + } test_debug!("-> wait cell locked!"); let prev_waker = self.waker.with_mut(|old_waker| unsafe { @@ -215,6 +231,7 @@ impl WaitCell { /// - `true` if a waiting task was woken. /// - `false` if no task was woken (no [`Waker`] was stored in the cell) pub fn wake(&self) -> bool { + enter_test_debug_span!("WaitCell::wake"); if let Some(waker) = self.take_waker(false) { waker.wake(); true @@ -232,6 +249,7 @@ impl WaitCell { /// [`wait`]: Self::wait /// [`register_wait`]: Self::register_wait pub fn close(&self) -> bool { + enter_test_debug_span!("WaitCell::close"); if let Some(waker) = self.take_waker(true) { waker.wake(); true @@ -252,21 +270,32 @@ impl WaitCell { // TODO(eliza): could probably be made a public API... pub(crate) fn take_waker(&self, close: bool) -> Option { trace!(wait_cell = ?fmt::ptr(self), ?close, "notifying"); - let mut bits = State::WAKING | State::WOKEN; + let mut bits = State::WAKING; if close { bits.0 |= State::CLOSED.0; } + if test_dbg!(self.fetch_or(bits, AcqRel)) == State::WAITING { - // we have the lock! + // Ladies and gentlemen...we got him (the lock)! let waker = self.waker.with_mut(|thread| unsafe { (*thread).take() }); - test_dbg!(self.fetch_and(!State::WAKING, AcqRel)); + // Release the lock and set the WOKEN bit. + let mut state = bits; + loop { + let next_state = (state & !State::WAKING) | State::WOKEN; + match test_dbg!(self.compare_exchange(state, next_state, AcqRel)) { + Ok(_) => break, + Err(actual) => state = actual, + } + } + self.fetch_and(!State::WAKING, Release); if let Some(waker) = test_dbg!(waker) { trace!(wait_cell = ?fmt::ptr(self), ?close, ?waker, "notified"); return Some(waker); } } + None } } @@ -322,34 +351,16 @@ impl Drop for WaitCell { // === impl Wait === impl Future for Wait<'_> { - type Output = Result<(), super::Closed>; + type Output = Result<(), Closed>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - // Try to take the cell's `WOKEN` bit to see if we were previously - // waiting and then received a notification. - if test_dbg!(self.cell.fetch_and(!State::WOKEN, AcqRel)).contains(State::WOKEN) { - return Poll::Ready(Ok(())); - } - + enter_test_debug_span!("Wait::poll"); // Did a wakeup occur while we were pre-registering the future? if test_dbg!(self.presubscribe.is_ready()) { return self.presubscribe; } - match test_dbg!(self.cell.register_wait(cx.waker())) { - Ok(_) => Poll::Pending, - Err(RegisterError::Registering) => { - // Cell was busy parking some other task, all we can do is try again later - cx.waker().wake_by_ref(); - Poll::Pending - } - Err(RegisterError::Waking) => { - // Cell is waking another task RIGHT NOW, so let's ride that high all the - // way to the READY state. - Poll::Ready(Ok(())) - } - Err(RegisterError::Closed) => super::closed(), - } + self.cell.poll_wait(cx) } } @@ -359,18 +370,15 @@ impl<'cell> Future for Subscribe<'cell> { type Output = Wait<'cell>; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + enter_test_debug_span!("Subscribe::poll"); let presubscribe = match test_dbg!(self.cell.register_wait(cx.waker())) { - Ok(_) => Poll::Pending, - Err(RegisterError::Closed) => super::closed(), + Ok(()) => Poll::Pending, + Err(RegisterError::Closed) => Poll::Ready(Err(Closed(()))), + Err(RegisterError::Waking) => Poll::Ready(Ok(())), Err(RegisterError::Registering) => { - // yield and try again cx.waker().wake_by_ref(); return Poll::Pending; } - Err(RegisterError::Waking) => { - // we are also woken - Poll::Ready(Ok(())) - } }; Poll::Ready(Wait { cell: self.cell, @@ -389,8 +397,7 @@ impl State { const CLOSED: Self = Self(1 << 4); fn contains(self, Self(state): Self) -> bool { - test_trace!("{:#?}.contains({state:#b})", self.0); - test_dbg!(self.0) & test_dbg!(state) == state + self.0 & state == state } } @@ -402,6 +409,14 @@ impl ops::BitOr for State { } } +impl ops::BitAnd for State { + type Output = Self; + + fn bitand(self, Self(rhs): Self) -> Self::Output { + Self(self.0 & rhs) + } +} + impl ops::Not for State { type Output = Self; @@ -436,7 +451,7 @@ mod tests { use crate::scheduler::Scheduler; use alloc::sync::Arc; - use tokio_test::{assert_pending, assert_ready_ok, task}; + use tokio_test::{assert_pending, assert_ready, assert_ready_ok, task}; #[test] fn wait_smoke() { @@ -491,6 +506,36 @@ mod tests { wait.await.unwrap(); }) } + + #[test] + fn wake_before_subscribe() { + let _trace = crate::util::test::trace_init(); + let cell = Arc::new(WaitCell::new()); + cell.wake(); + + let mut task = task::spawn({ + let cell = cell.clone(); + async move { + let wait = cell.subscribe().await; + wait.await.unwrap(); + } + }); + + assert_ready!(task.poll(), "woken task should complete"); + + let mut task = task::spawn({ + let cell = cell.clone(); + async move { + let wait = cell.subscribe().await; + wait.await.unwrap(); + } + }); + + assert_pending!(task.poll(), "wait cell hasn't been woken yet"); + cell.wake(); + assert!(task.is_woken()); + assert_ready!(task.poll()); + } } #[cfg(test)] diff --git a/maitake/src/trace.rs b/maitake/src/trace.rs index a1a163a3..f2a32271 100644 --- a/maitake/src/trace.rs +++ b/maitake/src/trace.rs @@ -100,6 +100,18 @@ macro_rules! test_dbg { }; } +#[cfg(all(not(test), not(maitake_ultraverbose)))] +macro_rules! enter_test_debug_span { + ($($args:tt)+) => {}; +} + +#[cfg(any(test, maitake_ultraverbose))] +macro_rules! enter_test_debug_span { + ($($args:tt)+) => { + let _span = debug_span!($($args)+).entered(); + }; +} + #[cfg(all(not(test), not(maitake_ultraverbose)))] macro_rules! test_debug { ($($args:tt)+) => {}; From 2506fe1edd7fa51fc1afd584cc93f78728c55c6c Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Mon, 24 Jul 2023 11:26:09 -0700 Subject: [PATCH 14/25] for real this time --- maitake/src/sync/wait_cell.rs | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/maitake/src/sync/wait_cell.rs b/maitake/src/sync/wait_cell.rs index 7379c93d..976e3480 100644 --- a/maitake/src/sync/wait_cell.rs +++ b/maitake/src/sync/wait_cell.rs @@ -270,7 +270,7 @@ impl WaitCell { // TODO(eliza): could probably be made a public API... pub(crate) fn take_waker(&self, close: bool) -> Option { trace!(wait_cell = ?fmt::ptr(self), ?close, "notifying"); - let mut bits = State::WAKING; + let mut bits = State::WAKING | State::WOKEN; if close { bits.0 |= State::CLOSED.0; } @@ -279,15 +279,7 @@ impl WaitCell { // Ladies and gentlemen...we got him (the lock)! let waker = self.waker.with_mut(|thread| unsafe { (*thread).take() }); - // Release the lock and set the WOKEN bit. - let mut state = bits; - loop { - let next_state = (state & !State::WAKING) | State::WOKEN; - match test_dbg!(self.compare_exchange(state, next_state, AcqRel)) { - Ok(_) => break, - Err(actual) => state = actual, - } - } + // Release the lock. self.fetch_and(!State::WAKING, Release); if let Some(waker) = test_dbg!(waker) { From 7370e6cd5c147c9022f1efbd09bf673e596880f1 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Mon, 24 Jul 2023 11:33:11 -0700 Subject: [PATCH 15/25] b;b;blblblblblbjj;lbbj --- maitake/src/sync/wait_cell.rs | 25 ++++++++++++++++--------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/maitake/src/sync/wait_cell.rs b/maitake/src/sync/wait_cell.rs index 976e3480..92c006be 100644 --- a/maitake/src/sync/wait_cell.rs +++ b/maitake/src/sync/wait_cell.rs @@ -107,6 +107,7 @@ impl WaitCell { impl WaitCell { /// Poll to wait on this `WaitCell`. pub fn poll_wait(&self, cx: &mut Context<'_>) -> Poll> { + enter_test_debug_span!("WaitCell::poll_wait", cell = ?fmt::ptr(self)); match test_dbg!(self.register_wait(cx.waker())) { Ok(()) => Poll::Pending, Err(RegisterError::Closed) => super::closed(), @@ -138,7 +139,7 @@ impl WaitCell { /// /// [`wake`]: Self::wake pub fn register_wait(&self, waker: &Waker) -> Result<(), RegisterError> { - enter_test_debug_span!("WaitCell::register_wait"); + enter_test_debug_span!("WaitCell::register_wait", cell = ?fmt::ptr(self)); trace!(wait_cell = ?fmt::ptr(self), ?waker, "registering waker"); // this is based on tokio's AtomicWaker synchronization strategy @@ -231,7 +232,7 @@ impl WaitCell { /// - `true` if a waiting task was woken. /// - `false` if no task was woken (no [`Waker`] was stored in the cell) pub fn wake(&self) -> bool { - enter_test_debug_span!("WaitCell::wake"); + enter_test_debug_span!("WaitCell::wake", cell = ?fmt::ptr(self)); if let Some(waker) = self.take_waker(false) { waker.wake(); true @@ -249,7 +250,7 @@ impl WaitCell { /// [`wait`]: Self::wait /// [`register_wait`]: Self::register_wait pub fn close(&self) -> bool { - enter_test_debug_span!("WaitCell::close"); + enter_test_debug_span!("WaitCell::close", cell = ?fmt::ptr(self)); if let Some(waker) = self.take_waker(true) { waker.wake(); true @@ -270,12 +271,18 @@ impl WaitCell { // TODO(eliza): could probably be made a public API... pub(crate) fn take_waker(&self, close: bool) -> Option { trace!(wait_cell = ?fmt::ptr(self), ?close, "notifying"); - let mut bits = State::WAKING | State::WOKEN; - if close { - bits.0 |= State::CLOSED.0; - } + // Set the WAKING bit (to indicate that we're touching the waker) and + // the WOKEN bit (to indicate that we intend to wake it up). + let state = { + let mut bits = State::WAKING | State::WOKEN; + if close { + bits.0 |= State::CLOSED.0; + } + test_dbg!(self.fetch_or(bits, AcqRel)) + }; - if test_dbg!(self.fetch_or(bits, AcqRel)) == State::WAITING { + // Is anyone else touching the waker? + if !test_dbg!(state.contains(State::WAKING | State::REGISTERING | State::CLOSED)) { // Ladies and gentlemen...we got him (the lock)! let waker = self.waker.with_mut(|thread| unsafe { (*thread).take() }); @@ -389,7 +396,7 @@ impl State { const CLOSED: Self = Self(1 << 4); fn contains(self, Self(state): Self) -> bool { - self.0 & state == state + self.0 & state > 0 } } From fd003e771a60914f429566721ee22c6beeccb0b2 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Mon, 24 Jul 2023 11:49:03 -0700 Subject: [PATCH 16/25] api polish --- maitake/src/sync/wait_cell.rs | 139 ++++++++++++++++---------------- maitake/src/time/timer/sleep.rs | 25 ++---- 2 files changed, 76 insertions(+), 88 deletions(-) diff --git a/maitake/src/sync/wait_cell.rs b/maitake/src/sync/wait_cell.rs index 92c006be..de44bae7 100644 --- a/maitake/src/sync/wait_cell.rs +++ b/maitake/src/sync/wait_cell.rs @@ -13,7 +13,7 @@ use core::{ future::Future, ops, pin::Pin, - task::{Context, Poll, Waker}, + task::{self, Context, Poll, Waker}, }; use mycelium_util::{fmt, sync::CachePadded}; @@ -50,16 +50,11 @@ pub struct WaitCell { /// /// This error is returned by the [`WaitCell::register_wait`] method. #[derive(Copy, Clone, Debug, Eq, PartialEq)] -pub enum RegisterError { +pub enum Error { /// The [`Waker`] was not registered because the [`WaitCell`] has been /// [closed](WaitCell::close). Closed, - /// The [`Waker`] was not registered because the [`WaitCell`] was already in - /// the process of [waking](WaitCell::wake). The caller may choose to treat - /// this error as a wakeup. - Waking, - /// The [`Waker`] was not registered because another task was concurrently /// storing its own [`Waker`] in the [`WaitCell`]. Registering, @@ -106,20 +101,6 @@ impl WaitCell { impl WaitCell { /// Poll to wait on this `WaitCell`. - pub fn poll_wait(&self, cx: &mut Context<'_>) -> Poll> { - enter_test_debug_span!("WaitCell::poll_wait", cell = ?fmt::ptr(self)); - match test_dbg!(self.register_wait(cx.waker())) { - Ok(()) => Poll::Pending, - Err(RegisterError::Closed) => super::closed(), - Err(RegisterError::Waking) => Poll::Ready(Ok(())), - Err(RegisterError::Registering) => { - cx.waker().wake_by_ref(); - Poll::Pending - } - } - } - - /// Register `waker` with this `WaitCell`. /// /// Once a [`Waker`] has been registered, a subsequent call to [`wake`] will /// wake that [`Waker`]. @@ -129,38 +110,36 @@ impl WaitCell { /// - `Ok(())` if the [`Waker`] was registered. If this method returns /// `Ok(())`, then the registered [`Waker`] will be woken by a subsequent /// call to [`wake`]. - /// - `Err(`[`RegisterError::Closed`]`)` if the [`WaitCell`] has been + /// - `Err(`[`Error::Closed`]`)` if the [`WaitCell`] has been /// closed. - /// - `Err(`[`RegisterError::Waking`]`)` if the [`WaitCell`] was - /// [woken][`wake`] *while* the waker was being registered. The caller may - /// choose to treat this as a valid wakeup. - /// - `Err(`[`RegisterError::Registering`]`)` if another task was + /// - `Err(`[`Err::Registering`]`)` if another task was /// [`WaitCell`] concurrently registering its [`Waker`]. /// /// [`wake`]: Self::wake - pub fn register_wait(&self, waker: &Waker) -> Result<(), RegisterError> { - enter_test_debug_span!("WaitCell::register_wait", cell = ?fmt::ptr(self)); - trace!(wait_cell = ?fmt::ptr(self), ?waker, "registering waker"); + pub fn poll_wait(&self, cx: &mut Context<'_>) -> Poll> { + enter_test_debug_span!("WaitCell::poll_wait", cell = ?fmt::ptr(self)); // this is based on tokio's AtomicWaker synchronization strategy match test_dbg!(self.compare_exchange(State::WAITING, State::REGISTERING, Acquire)) { Err(actual) if test_dbg!(actual.contains(State::CLOSED)) => { - return Err(RegisterError::Closed); + return Poll::Ready(Err(Error::Closed)); } Err(actual) if test_dbg!(actual.contains(State::WOKEN)) => { // take the wakeup self.fetch_and(!State::WOKEN, Release); - return Err(RegisterError::Waking); + return Poll::Ready(Ok(())); } // someone else is notifying, so don't wait! Err(actual) if test_dbg!(actual.contains(State::WAKING)) => { - return Err(RegisterError::Waking); + return Poll::Ready(Ok(())); } - Err(_) => return Err(RegisterError::Registering), + Err(_) => return Poll::Ready(Err(Error::Registering)), Ok(_) => {} } - test_debug!("-> wait cell locked!"); + let waker = cx.waker(); + trace!(wait_cell = ?fmt::ptr(self), ?waker, "registering waker"); + let prev_waker = self.waker.with_mut(|old_waker| unsafe { match &mut *old_waker { Some(old_waker) if waker.will_wake(old_waker) => None, @@ -173,40 +152,42 @@ impl WaitCell { prev_waker.wake(); } - match test_dbg!(self.compare_exchange(State::REGISTERING, State::WAITING, AcqRel)) { - Ok(_) => Ok(()), - Err(actual) => { - // If the `compare_exchange` fails above, this means that we were notified for one of - // two reasons: either the cell was awoken, or the cell was closed. - // - // Bail out of the parking state, and determine what to report to the caller. - test_trace!(state = ?actual, "was notified"); - let waker = self.waker.with_mut(|waker| unsafe { (*waker).take() }); - // Reset to the WAITING state by clearing everything *except* - // the closed bits (which must remain set). This `fetch_and` - // does *not* set the CLOSED bit if it is unset, it just doesn't - // clear it. - let state = test_dbg!(self.fetch_and(State::CLOSED, AcqRel)); - // The only valid state transition while we were parking is to - // add the CLOSED bit. - debug_assert!( - state == actual || state == actual | State::CLOSED, - "state changed unexpectedly while parking!" - ); - - if let Some(waker) = waker { - waker.wake(); - } + if let Err(actual) = + test_dbg!(self.compare_exchange(State::REGISTERING, State::WAITING, AcqRel)) + { + // If the `compare_exchange` fails above, this means that we were notified for one of + // two reasons: either the cell was awoken, or the cell was closed. + // + // Bail out of the parking state, and determine what to report to the caller. + test_trace!(state = ?actual, "was notified"); + let waker = self.waker.with_mut(|waker| unsafe { (*waker).take() }); + // Reset to the WAITING state by clearing everything *except* + // the closed bits (which must remain set). This `fetch_and` + // does *not* set the CLOSED bit if it is unset, it just doesn't + // clear it. + let state = test_dbg!(self.fetch_and(State::CLOSED, AcqRel)); + // The only valid state transition while we were parking is to + // add the CLOSED bit. + debug_assert!( + state == actual || state == actual | State::CLOSED, + "state changed unexpectedly while parking!" + ); + + if let Some(waker) = waker { + waker.wake(); + } - // Was the `CLOSED` bit set while we were clearing other bits? - // If so, the cell is closed. Otherwise, we must have been notified. - if state.contains(State::CLOSED) { - Err(RegisterError::Closed) - } else { - Err(RegisterError::Waking) - } + // Was the `CLOSED` bit set while we were clearing other bits? + // If so, the cell is closed. Otherwise, we must have been notified. + if state.contains(State::CLOSED) { + return Poll::Ready(Err(Error::Closed)); } + + return Poll::Ready(Ok(())); } + + // Waker registered, time to yield! + Poll::Pending } /// Wait to be woken up by this cell. @@ -354,12 +335,23 @@ impl Future for Wait<'_> { fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { enter_test_debug_span!("Wait::poll"); + // Did a wakeup occur while we were pre-registering the future? if test_dbg!(self.presubscribe.is_ready()) { return self.presubscribe; } - self.cell.poll_wait(cx) + // Okay, actually poll the cell, then. + match task::ready!(test_dbg!(self.cell.poll_wait(cx))) { + Ok(()) => Poll::Ready(Ok(())), + Err(Error::Closed) => Poll::Ready(Err(Closed(()))), + Err(Error::Registering) => { + // If some other task was registering, yield and try to re-register + // our waker when that task is done. + cx.waker().wake_by_ref(); + Poll::Pending + } + } } } @@ -370,15 +362,20 @@ impl<'cell> Future for Subscribe<'cell> { fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { enter_test_debug_span!("Subscribe::poll"); - let presubscribe = match test_dbg!(self.cell.register_wait(cx.waker())) { - Ok(()) => Poll::Pending, - Err(RegisterError::Closed) => Poll::Ready(Err(Closed(()))), - Err(RegisterError::Waking) => Poll::Ready(Ok(())), - Err(RegisterError::Registering) => { + + // Pre-register the waker in the cell. + let presubscribe = match test_dbg!(self.cell.poll_wait(cx)) { + Poll::Ready(Err(Error::Registering)) => { + // Someone else is in the process of registering. Yield now so we + // can wait until that task is done, and then try again. cx.waker().wake_by_ref(); return Poll::Pending; } + Poll::Ready(Err(Error::Closed)) => Poll::Ready(Err(Closed(()))), + Poll::Ready(Ok(())) => Poll::Ready(Ok(())), + Poll::Pending => Poll::Pending, }; + Poll::Ready(Wait { cell: self.cell, presubscribe, diff --git a/maitake/src/time/timer/sleep.rs b/maitake/src/time/timer/sleep.rs index 142c58f3..e81dac0f 100644 --- a/maitake/src/time/timer/sleep.rs +++ b/maitake/src/time/timer/sleep.rs @@ -4,7 +4,7 @@ use crate::{ cell::UnsafeCell, sync::atomic::{AtomicBool, Ordering::*}, }, - sync::wait_cell::{self, WaitCell}, + sync::wait_cell::WaitCell, }; use cordyceps::{list, Linked}; use core::{ @@ -12,7 +12,7 @@ use core::{ marker::PhantomPinned, pin::Pin, ptr::{self, NonNull}, - task::{Context, Poll}, + task::{ready, Context, Poll}, time::Duration, }; use mycelium_util::fmt; @@ -113,21 +113,12 @@ impl Future for Sleep<'_> { State::Completed => return Poll::Ready(()), } - match test_dbg!(this.entry.waker.register_wait(cx.waker())) { - Ok(_) => Poll::Pending, - // the timer has fired, so the future has now completed. - Err(wait_cell::RegisterError::Closed) => { - *this.state = State::Completed; - Poll::Ready(()) - } - // these ones don't happen - Err(wait_cell::RegisterError::Registering) => { - unreachable!("a sleep should only be polled by one task!") - } - Err(wait_cell::RegisterError::Waking) => { - unreachable!("a sleep's WaitCell should only be woken by closing") - } - } + let _poll = ready!(test_dbg!(this.entry.waker.poll_wait(cx))); + debug_assert!( + _poll.is_err(), + "a Sleep's WaitCell should only be woken by closing" + ); + Poll::Ready(()) } } From ecf21f7d3b3cf505cbf5b256a137a9c443021769 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Mon, 24 Jul 2023 12:10:27 -0700 Subject: [PATCH 17/25] API polish and docs --- maitake/src/sync/wait_cell.rs | 113 +++++++++++++++++++++++++++++----- 1 file changed, 96 insertions(+), 17 deletions(-) diff --git a/maitake/src/sync/wait_cell.rs b/maitake/src/sync/wait_cell.rs index de44bae7..b5c1ce2d 100644 --- a/maitake/src/sync/wait_cell.rs +++ b/maitake/src/sync/wait_cell.rs @@ -20,7 +20,7 @@ use mycelium_util::{fmt, sync::CachePadded}; /// An atomically registered [`Waker`]. /// /// This cell stores the [`Waker`] of a single task. A [`Waker`] is stored in -/// the cell either by calling [`register_wait`], or by polling a [`wait`] +/// the cell either by calling [`poll_wait`], or by polling a [`wait`] /// future. Once a task's [`Waker`] is stored in a `WaitCell`, it can be woken /// by calling [`wake`] on the `WaitCell`. /// @@ -37,7 +37,7 @@ use mycelium_util::{fmt, sync::CachePadded}; /// /// [`AtomicWaker`]: https://github.com/tokio-rs/tokio/blob/09b770c5db31a1f35631600e1d239679354da2dd/tokio/src/sync/task/atomic_waker.rs /// [`Waker`]: core::task::Waker -/// [`register_wait`]: Self::register_wait +/// [`poll_wait`]: Self::poll_wait /// [`wait`]: Self::wait /// [`wake`]: Self::wake pub struct WaitCell { @@ -48,7 +48,7 @@ pub struct WaitCell { /// An error indicating that a [`WaitCell`] was closed or busy while /// attempting register a [`Waker`]. /// -/// This error is returned by the [`WaitCell::register_wait`] method. +/// This error is returned by the [`WaitCell::poll_wait`] method. #[derive(Copy, Clone, Debug, Eq, PartialEq)] pub enum Error { /// The [`Waker`] was not registered because the [`WaitCell`] has been @@ -57,7 +57,7 @@ pub enum Error { /// The [`Waker`] was not registered because another task was concurrently /// storing its own [`Waker`] in the [`WaitCell`]. - Registering, + Busy, } /// Future returned from [`WaitCell::wait()`]. @@ -74,6 +74,8 @@ pub struct Wait<'a> { } /// Future returned from [`WaitCell::subscribe()`]. +/// +/// See the documentation for [`WaitCell::subscribe()`] for details. #[derive(Debug)] #[must_use = "futures do nothing unless `.await`ed or `poll`ed"] pub struct Subscribe<'a> { @@ -100,20 +102,24 @@ impl WaitCell { } impl WaitCell { - /// Poll to wait on this `WaitCell`. + /// Poll to wait on this `WaitCell`, consuming a stored wakeup or + /// registering the [`Waker`] from the provided [`Context`] to be woken by + /// the next wakeup. /// /// Once a [`Waker`] has been registered, a subsequent call to [`wake`] will /// wake that [`Waker`]. /// /// # Returns /// - /// - `Ok(())` if the [`Waker`] was registered. If this method returns - /// `Ok(())`, then the registered [`Waker`] will be woken by a subsequent - /// call to [`wake`]. - /// - `Err(`[`Error::Closed`]`)` if the [`WaitCell`] has been - /// closed. - /// - `Err(`[`Err::Registering`]`)` if another task was - /// [`WaitCell`] concurrently registering its [`Waker`]. + /// - [`Poll::Pending`] if the [`Waker`] was registered. If this method returns + /// [`Poll::Pending`], then the registered [`Waker`] will be woken by a + /// subsequent call to [`wake`]. + /// - [`Poll::Ready`]`(`[`Ok`]`(()))` if the cell was woken by a call to + /// [`wake`] while the [`Waker`] was being registered. + /// - [`Poll::Ready`]`(`[`Err`](`[`Error::Closed`]`))` if the [`WaitCell`] + /// has been closed. + /// - [`Poll::Ready`]`(`[`Err`](`[`Error::Busy`]`)`) if another task wasre + /// concurrently registering its [`Waker`] with this [`WaitCell`]. /// /// [`wake`]: Self::wake pub fn poll_wait(&self, cx: &mut Context<'_>) -> Poll> { @@ -133,7 +139,7 @@ impl WaitCell { Err(actual) if test_dbg!(actual.contains(State::WAKING)) => { return Poll::Ready(Ok(())); } - Err(_) => return Poll::Ready(Err(Error::Registering)), + Err(_) => return Poll::Ready(Err(Error::Busy)), Ok(_) => {} } @@ -192,8 +198,27 @@ impl WaitCell { /// Wait to be woken up by this cell. /// + /// # Returns + /// + /// This future completes with the following values: + /// + /// - [`Ok`]`(())` if the future was woken by a call to [`wake`] or another + /// task calling [`poll_wait`] or [`wait`] on this [`WaitCell`]. + /// - [`Err`]`(`[`Closed`]`)` if the task was woken by a call to [`close`], + /// or the [`WaitCell`] was already closed. + /// /// **Note**: The calling task's [`Waker`] is not registered until AFTER the - /// first time the returned [`Wait`] future is polled. + /// first time the returned [`Wait`] future is polled. This means that if a + /// call to [`wake`] occurs between when [`wait`] is called and when the + /// future is first polled, the future will *not* complete. If the caller is + /// responsible for performing an operation which will result in an eventual + /// wakeup, prefer calling [`subscribe`] _before_ performing that operation + /// and `.await`ing the [`Wait`] future returned by [`subscribe`]. + /// + /// [`wake`]: Self::wake + /// [`poll_wait`]: Self::poll_wait + /// [`close`]: Self::close + /// [`subscribe`]: Self::subscribe pub fn wait(&self) -> Wait<'_> { Wait { cell: self, @@ -201,7 +226,61 @@ impl WaitCell { } } - /// Pre-subscribe to notifications from this `WaitCell`. + /// Eagerly subscribe to notifications from this `WaitCell`. + /// + /// This method returns a [`Subscribe`] [`Future`], which outputs a [`Wait`] + /// [`Future`]. Awaiting the [`Subscribe`] future will eagerly register the + /// calling task to be woken by this [`WaitCell`], so that the returned + /// [`Wait`] future will be woken by any calls to [`wake`] (or [`close`]) + /// that occur between when the [`Subscribe`] future completes and when the + /// returned [`Wait`] future is `.await`ed. + /// + /// This is primarily intended for scenarios where the task that waits on a + /// [`WaitCell`] is responsible for performing some operation that + /// ultimately results in the [`WaitCell`] being woken. If the task were to + /// simply perform the operation and then call [`wait`] on the [`WaitCell`], + /// a potential race condition could occur where the operation completes and + /// wakes the [`WaitCell`] *before* the [`Wait`] future is first `.await`ed. + /// Using `subscribe`, the task can ensure that it is ready to be woken by + /// the cell *before* performing an operation that could result in it being + /// woken. + /// + /// These scenarios occur when a wakeup is triggered by another thread/CPU + /// core in response to an operation performed in the task waiting on the + /// `WaitCell`, or when the wakeup is triggered by a hardware interrupt + /// resulting from operations performed in the task. + /// + /// # Examples + /// + /// ``` + /// use maitake::sync::WaitCell; + /// + /// // Perform an operation that results in a concurrent wakeup, such as + /// // unmasking an interrupt. + /// fn do_something_that_causes_a_wakeup() { + /// # WAIT_CELL.wake(); + /// // ... + /// } + /// + /// static WAIT_CELL: WaitCell = WaitCell::new(); + /// + /// # async fn dox() { + /// // Subscribe to notifications from the cell *before* calling + /// // `do_something_that_causes_a_wakeup()`, to ensure that we are + /// // ready to be woken when the interrupt is unmasked. + /// let wait = WAIT_CELL.subscribe().await; + /// + /// // Actually perform the operation. + /// do_something_that_causes_a_wakeup(); + /// + /// // Wait for the wakeup. If the wakeup occurred *before* the first + /// // poll of the `wait` future had successfully subscribed to the + /// // `WaitCell`, we would still receive the wakeup, because the + /// // `subscribe` future ensured that our waker was registered to be + /// // woken. + /// wait.await.expect("WaitCell is not closed"); + /// # } + /// ``` pub fn subscribe(&self) -> Subscribe<'_> { Subscribe { cell: self } } @@ -345,7 +424,7 @@ impl Future for Wait<'_> { match task::ready!(test_dbg!(self.cell.poll_wait(cx))) { Ok(()) => Poll::Ready(Ok(())), Err(Error::Closed) => Poll::Ready(Err(Closed(()))), - Err(Error::Registering) => { + Err(Error::Busy) => { // If some other task was registering, yield and try to re-register // our waker when that task is done. cx.waker().wake_by_ref(); @@ -365,7 +444,7 @@ impl<'cell> Future for Subscribe<'cell> { // Pre-register the waker in the cell. let presubscribe = match test_dbg!(self.cell.poll_wait(cx)) { - Poll::Ready(Err(Error::Registering)) => { + Poll::Ready(Err(Error::Busy)) => { // Someone else is in the process of registering. Yield now so we // can wait until that task is done, and then try again. cx.waker().wake_by_ref(); From a0e63a413190cb93a9ad218d0dcebdc7f8ffe0f3 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Mon, 24 Jul 2023 12:33:04 -0700 Subject: [PATCH 18/25] add self wake test --- maitake/src/sync/wait_cell.rs | 29 +++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/maitake/src/sync/wait_cell.rs b/maitake/src/sync/wait_cell.rs index b5c1ce2d..8b445e14 100644 --- a/maitake/src/sync/wait_cell.rs +++ b/maitake/src/sync/wait_cell.rs @@ -611,6 +611,35 @@ mod tests { assert!(task.is_woken()); assert_ready!(task.poll()); } + + #[test] + fn subscribe_doesnt_self_wake() { + let _trace = crate::util::test::trace_init(); + let cell = Arc::new(WaitCell::new()); + + let mut task = task::spawn({ + let cell = cell.clone(); + async move { + let wait = cell.subscribe().await; + wait.await.unwrap(); + let wait = cell.subscribe().await; + wait.await.unwrap(); + } + }); + assert_pending!(task.poll()); + assert!(!task.is_woken()); + + cell.wake(); + assert!(task.is_woken()); + assert_pending!(task.poll()); + + assert!(!task.is_woken()); + assert_pending!(task.poll()); + + cell.wake(); + assert!(task.is_woken()); + assert_ready!(task.poll()); + } } #[cfg(test)] From 57a5f93722c8b360a6dd9becbdd8f675d6beb2cb Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Mon, 24 Jul 2023 13:15:18 -0700 Subject: [PATCH 19/25] oh goddammit --- maitake/src/sync/wait_cell.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/maitake/src/sync/wait_cell.rs b/maitake/src/sync/wait_cell.rs index 8b445e14..fbf42e12 100644 --- a/maitake/src/sync/wait_cell.rs +++ b/maitake/src/sync/wait_cell.rs @@ -465,11 +465,11 @@ impl<'cell> Future for Subscribe<'cell> { // === impl State === impl State { - const WAITING: Self = Self(1 << 0); - const REGISTERING: Self = Self(1 << 1); - const WAKING: Self = Self(1 << 2); - const WOKEN: Self = Self(1 << 3); - const CLOSED: Self = Self(1 << 4); + const WAITING: Self = Self(0b0000); + const REGISTERING: Self = Self(0b0001); + const WAKING: Self = Self(0b0010); + const WOKEN: Self = Self(0b0100); + const CLOSED: Self = Self(0b1000); fn contains(self, Self(state): Self) -> bool { self.0 & state > 0 From 2e23891438b0d370901bb36d82bd89c8f9cd8482 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Mon, 24 Jul 2023 13:17:20 -0700 Subject: [PATCH 20/25] Update maitake/src/sync/wait_cell.rs Co-authored-by: James Munns --- maitake/src/sync/wait_cell.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/maitake/src/sync/wait_cell.rs b/maitake/src/sync/wait_cell.rs index fbf42e12..1d57f1fc 100644 --- a/maitake/src/sync/wait_cell.rs +++ b/maitake/src/sync/wait_cell.rs @@ -118,7 +118,7 @@ impl WaitCell { /// [`wake`] while the [`Waker`] was being registered. /// - [`Poll::Ready`]`(`[`Err`](`[`Error::Closed`]`))` if the [`WaitCell`] /// has been closed. - /// - [`Poll::Ready`]`(`[`Err`](`[`Error::Busy`]`)`) if another task wasre + /// - [`Poll::Ready`]`(`[`Err`](`[`Error::Busy`]`)`) if another task was /// concurrently registering its [`Waker`] with this [`WaitCell`]. /// /// [`wake`]: Self::wake From a1db71e02df77048a3c5fa4e537c41f0c25f835c Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Mon, 24 Jul 2023 13:21:04 -0700 Subject: [PATCH 21/25] fix docs --- maitake/src/sync/wait_cell.rs | 20 +++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/maitake/src/sync/wait_cell.rs b/maitake/src/sync/wait_cell.rs index 1d57f1fc..fb9f6f59 100644 --- a/maitake/src/sync/wait_cell.rs +++ b/maitake/src/sync/wait_cell.rs @@ -118,7 +118,7 @@ impl WaitCell { /// [`wake`] while the [`Waker`] was being registered. /// - [`Poll::Ready`]`(`[`Err`](`[`Error::Closed`]`))` if the [`WaitCell`] /// has been closed. - /// - [`Poll::Ready`]`(`[`Err`](`[`Error::Busy`]`)`) if another task was + /// - [`Poll::Ready`]`(`[`Err`](`[`Error::Busy`]`))` if another task was /// concurrently registering its [`Waker`] with this [`WaitCell`]. /// /// [`wake`]: Self::wake @@ -217,6 +217,7 @@ impl WaitCell { /// /// [`wake`]: Self::wake /// [`poll_wait`]: Self::poll_wait + /// [`wait`]: Self::wait /// [`close`]: Self::close /// [`subscribe`]: Self::subscribe pub fn wait(&self) -> Wait<'_> { @@ -281,6 +282,10 @@ impl WaitCell { /// wait.await.expect("WaitCell is not closed"); /// # } /// ``` + /// + /// [`wait`]: Self::wait + /// [`wake`]: Self::wake + /// [`close`]: Self::close pub fn subscribe(&self) -> Subscribe<'_> { Subscribe { cell: self } } @@ -304,11 +309,11 @@ impl WaitCell { /// Close the [`WaitCell`]. /// /// This wakes any waiting task with an error indicating the `WaitCell` is - /// closed. Subsequent calls to [`wait`] or [`register_wait`] will return an + /// closed. Subsequent calls to [`wait`] or [`poll_wait`] will return an /// error indicating that the cell has been closed. /// /// [`wait`]: Self::wait - /// [`register_wait`]: Self::register_wait + /// [`poll_wait`]: Self::poll_wait pub fn close(&self) -> bool { enter_test_debug_span!("WaitCell::close", cell = ?fmt::ptr(self)); if let Some(waker) = self.take_waker(true) { @@ -465,6 +470,15 @@ impl<'cell> Future for Subscribe<'cell> { // === impl State === impl State { + /// /!\ EXTREMELY SERIOUS WARNING! /!\ + /// It is LOAD BEARING that the `WAITING` state is represented by zero! + /// This is because we return to the waiting state by `fetch_and`ing out all + /// other bits in a few places. If this state's bit representation is + /// changed to anything other than zero, that code will break! Don't do + /// that! + /// + /// YES, FUTURE ELIZA, THIS DOES APPLY TO YOU. YOU ALREADY BROKE IT ONCE. + /// DON'T DO IT AGAIN. const WAITING: Self = Self(0b0000); const REGISTERING: Self = Self(0b0001); const WAKING: Self = Self(0b0010); From 6f4f124c1b01f35a69b48bbbfebbfa339fe633d8 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Mon, 24 Jul 2023 13:22:27 -0700 Subject: [PATCH 22/25] docs unfuckening --- maitake/src/sync/wait_cell.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/maitake/src/sync/wait_cell.rs b/maitake/src/sync/wait_cell.rs index fb9f6f59..00a2c151 100644 --- a/maitake/src/sync/wait_cell.rs +++ b/maitake/src/sync/wait_cell.rs @@ -116,9 +116,9 @@ impl WaitCell { /// subsequent call to [`wake`]. /// - [`Poll::Ready`]`(`[`Ok`]`(()))` if the cell was woken by a call to /// [`wake`] while the [`Waker`] was being registered. - /// - [`Poll::Ready`]`(`[`Err`](`[`Error::Closed`]`))` if the [`WaitCell`] + /// - [`Poll::Ready`]`(`[`Err`]`(`[`Error::Closed`]`))` if the [`WaitCell`] /// has been closed. - /// - [`Poll::Ready`]`(`[`Err`](`[`Error::Busy`]`))` if another task was + /// - [`Poll::Ready`]`(`[`Err`]`(`[`Error::Busy`]`))` if another task was /// concurrently registering its [`Waker`] with this [`WaitCell`]. /// /// [`wake`]: Self::wake From 05b83b26b9109186d73057087535a3fe641ce1af Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Mon, 24 Jul 2023 13:23:26 -0700 Subject: [PATCH 23/25] s/Error/PollWaitError/g --- maitake/src/sync/wait_cell.rs | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/maitake/src/sync/wait_cell.rs b/maitake/src/sync/wait_cell.rs index 00a2c151..024fe79c 100644 --- a/maitake/src/sync/wait_cell.rs +++ b/maitake/src/sync/wait_cell.rs @@ -50,7 +50,7 @@ pub struct WaitCell { /// /// This error is returned by the [`WaitCell::poll_wait`] method. #[derive(Copy, Clone, Debug, Eq, PartialEq)] -pub enum Error { +pub enum PollWaitError { /// The [`Waker`] was not registered because the [`WaitCell`] has been /// [closed](WaitCell::close). Closed, @@ -122,13 +122,13 @@ impl WaitCell { /// concurrently registering its [`Waker`] with this [`WaitCell`]. /// /// [`wake`]: Self::wake - pub fn poll_wait(&self, cx: &mut Context<'_>) -> Poll> { + pub fn poll_wait(&self, cx: &mut Context<'_>) -> Poll> { enter_test_debug_span!("WaitCell::poll_wait", cell = ?fmt::ptr(self)); // this is based on tokio's AtomicWaker synchronization strategy match test_dbg!(self.compare_exchange(State::WAITING, State::REGISTERING, Acquire)) { Err(actual) if test_dbg!(actual.contains(State::CLOSED)) => { - return Poll::Ready(Err(Error::Closed)); + return Poll::Ready(Err(PollWaitError::Closed)); } Err(actual) if test_dbg!(actual.contains(State::WOKEN)) => { // take the wakeup @@ -139,7 +139,7 @@ impl WaitCell { Err(actual) if test_dbg!(actual.contains(State::WAKING)) => { return Poll::Ready(Ok(())); } - Err(_) => return Poll::Ready(Err(Error::Busy)), + Err(_) => return Poll::Ready(Err(PollWaitError::Busy)), Ok(_) => {} } @@ -186,7 +186,7 @@ impl WaitCell { // Was the `CLOSED` bit set while we were clearing other bits? // If so, the cell is closed. Otherwise, we must have been notified. if state.contains(State::CLOSED) { - return Poll::Ready(Err(Error::Closed)); + return Poll::Ready(Err(PollWaitError::Closed)); } return Poll::Ready(Ok(())); @@ -428,8 +428,8 @@ impl Future for Wait<'_> { // Okay, actually poll the cell, then. match task::ready!(test_dbg!(self.cell.poll_wait(cx))) { Ok(()) => Poll::Ready(Ok(())), - Err(Error::Closed) => Poll::Ready(Err(Closed(()))), - Err(Error::Busy) => { + Err(PollWaitError::Closed) => Poll::Ready(Err(Closed(()))), + Err(PollWaitError::Busy) => { // If some other task was registering, yield and try to re-register // our waker when that task is done. cx.waker().wake_by_ref(); @@ -449,13 +449,13 @@ impl<'cell> Future for Subscribe<'cell> { // Pre-register the waker in the cell. let presubscribe = match test_dbg!(self.cell.poll_wait(cx)) { - Poll::Ready(Err(Error::Busy)) => { + Poll::Ready(Err(PollWaitError::Busy)) => { // Someone else is in the process of registering. Yield now so we // can wait until that task is done, and then try again. cx.waker().wake_by_ref(); return Poll::Pending; } - Poll::Ready(Err(Error::Closed)) => Poll::Ready(Err(Closed(()))), + Poll::Ready(Err(PollWaitError::Closed)) => Poll::Ready(Err(Closed(()))), Poll::Ready(Ok(())) => Poll::Ready(Ok(())), Poll::Pending => Poll::Pending, }; From f626b415100bdfce9877e7ca22955b213f1b95df Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Mon, 24 Jul 2023 14:06:28 -0700 Subject: [PATCH 24/25] moar test --- maitake/src/sync/wait_cell.rs | 34 ++++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/maitake/src/sync/wait_cell.rs b/maitake/src/sync/wait_cell.rs index 024fe79c..6b6d8062 100644 --- a/maitake/src/sync/wait_cell.rs +++ b/maitake/src/sync/wait_cell.rs @@ -626,6 +626,39 @@ mod tests { assert_ready!(task.poll()); } + #[test] + fn wake_debounce() { + let _trace = crate::util::test::trace_init(); + let cell = Arc::new(WaitCell::new()); + + let mut task = task::spawn({ + let cell = cell.clone(); + async move { + cell.wait().await.unwrap(); + } + }); + + assert_pending!(task.poll()); + cell.wake(); + cell.wake(); + assert!(task.is_woken()); + assert_ready!(task.poll()); + + let mut task = task::spawn({ + let cell = cell.clone(); + async move { + cell.wait().await.unwrap(); + } + }); + + assert_pending!(task.poll()); + assert!(!task.is_woken()); + + cell.wake(); + assert!(task.is_woken()); + assert_ready!(task.poll()); + } + #[test] fn subscribe_doesnt_self_wake() { let _trace = crate::util::test::trace_init(); @@ -720,6 +753,7 @@ pub(crate) mod test_util { mod loom { use super::*; use crate::loom::{future, sync::Arc, thread}; + use tokio_test::assert_pending; #[test] fn basic() { From 850dd5641477f139e8489fde2476607a83e43440 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Mon, 24 Jul 2023 14:06:34 -0700 Subject: [PATCH 25/25] docs --- maitake/src/sync/wait_cell.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/maitake/src/sync/wait_cell.rs b/maitake/src/sync/wait_cell.rs index 6b6d8062..8df6f7ca 100644 --- a/maitake/src/sync/wait_cell.rs +++ b/maitake/src/sync/wait_cell.rs @@ -116,10 +116,11 @@ impl WaitCell { /// subsequent call to [`wake`]. /// - [`Poll::Ready`]`(`[`Ok`]`(()))` if the cell was woken by a call to /// [`wake`] while the [`Waker`] was being registered. - /// - [`Poll::Ready`]`(`[`Err`]`(`[`Error::Closed`]`))` if the [`WaitCell`] - /// has been closed. - /// - [`Poll::Ready`]`(`[`Err`]`(`[`Error::Busy`]`))` if another task was - /// concurrently registering its [`Waker`] with this [`WaitCell`]. + /// - [`Poll::Ready`]`(`[`Err`]`(`[`PollWaitError::Closed`]`))` if the + /// [`WaitCell`] has been closed. + /// - [`Poll::Ready`]`(`[`Err`]`(`[`PollWaitError::Busy`]`))` if another + /// task was concurrently registering its [`Waker`] with this + /// [`WaitCell`]. /// /// [`wake`]: Self::wake pub fn poll_wait(&self, cx: &mut Context<'_>) -> Poll> {