From 96aee3a19b74aee59de88a5522833ac3727e80e6 Mon Sep 17 00:00:00 2001 From: Timo Glane Date: Thu, 26 Sep 2024 15:01:34 +0200 Subject: [PATCH] sync, coop: apply cooperative scheduling to sync::broadcast::Receiver --- tokio/src/sync/broadcast.rs | 4 ++-- tokio/tests/sync_broadcast.rs | 16 ++++++++++++++++ 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/tokio/src/sync/broadcast.rs b/tokio/src/sync/broadcast.rs index 67d67a666e3..56c4cd6b92f 100644 --- a/tokio/src/sync/broadcast.rs +++ b/tokio/src/sync/broadcast.rs @@ -119,6 +119,7 @@ use crate::loom::cell::UnsafeCell; use crate::loom::sync::atomic::{AtomicBool, AtomicUsize}; use crate::loom::sync::{Arc, Mutex, MutexGuard, RwLock, RwLockReadGuard}; +use crate::runtime::coop::cooperative; use crate::util::linked_list::{self, GuardedLinkedList, LinkedList}; use crate::util::WakeList; @@ -1262,8 +1263,7 @@ impl Receiver { /// } /// ``` pub async fn recv(&mut self) -> Result { - let fut = Recv::new(self); - fut.await + cooperative(Recv::new(self)).await } /// Attempts to return a pending value on this receiver without awaiting. diff --git a/tokio/tests/sync_broadcast.rs b/tokio/tests/sync_broadcast.rs index 2638c1f33d4..3af96bdb5d5 100644 --- a/tokio/tests/sync_broadcast.rs +++ b/tokio/tests/sync_broadcast.rs @@ -640,3 +640,19 @@ fn send_in_waker_drop() { // Shouldn't deadlock. let _ = tx.send(()); } + +#[tokio::test] +async fn receiver_recv_is_cooperative() { + let (tx, mut rx) = broadcast::channel(8); + + tokio::select! { + biased; + _ = async { + loop { + assert!(tx.send(()).is_ok()); + assert!(rx.recv().await.is_ok()); + } + } => {}, + _ = tokio::task::yield_now() => {}, + } +}