From 65987ae8f57e6f529c0de33ac0787a20bad266fb Mon Sep 17 00:00:00 2001 From: Mara Bos Date: Tue, 19 Apr 2022 09:20:51 +0200 Subject: [PATCH 1/2] Make std::sys::wasm::futex consistent with unix::futex. --- library/std/src/sys/wasm/atomics/futex.rs | 22 ++++++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/library/std/src/sys/wasm/atomics/futex.rs b/library/std/src/sys/wasm/atomics/futex.rs index bbe9bd6951af9..11413ba3bf564 100644 --- a/library/std/src/sys/wasm/atomics/futex.rs +++ b/library/std/src/sys/wasm/atomics/futex.rs @@ -3,19 +3,33 @@ use crate::convert::TryInto; use crate::sync::atomic::AtomicU32; use crate::time::Duration; -pub fn futex_wait(futex: &AtomicU32, expected: u32, timeout: Option) { +/// Wait for a futex_wake operation to wake us. +/// +/// Returns directly if the futex doesn't hold the expected value. +/// +/// Returns false on timeout, and true in all other cases. +pub fn futex_wait(futex: &AtomicU32, expected: u32, timeout: Option) -> bool { let timeout = timeout.and_then(|t| t.as_nanos().try_into().ok()).unwrap_or(-1); unsafe { wasm32::memory_atomic_wait32( futex as *const AtomicU32 as *mut i32, expected as i32, timeout, - ); + ) < 2 } } -pub fn futex_wake(futex: &AtomicU32) { +/// Wake up one thread that's blocked on futex_wait on this futex. +/// +/// Returns true if this actually woke up such a thread, +/// or false if no thread was waiting on this futex. +pub fn futex_wake(futex: &AtomicU32) -> bool { + unsafe { wasm32::memory_atomic_notify(futex as *const AtomicU32 as *mut i32, 1) > 0 } +} + +/// Wake up all threads that are waiting on futex_wait on this futex. +pub fn futex_wake_all(futex: &AtomicU32) { unsafe { - wasm32::memory_atomic_notify(futex as *const AtomicU32 as *mut i32, 1); + wasm32::memory_atomic_notify(futex as *const AtomicU32 as *mut i32, i32::MAX as u32); } } From 8f2913cc2475682da493b06ce81cf1e4fc621f0e Mon Sep 17 00:00:00 2001 From: Mara Bos Date: Tue, 19 Apr 2022 09:21:11 +0200 Subject: [PATCH 2/2] Use futex locks on wasm+atomics. --- library/std/src/sys/wasm/atomics/condvar.rs | 102 -------------- library/std/src/sys/wasm/atomics/mutex.rs | 64 --------- library/std/src/sys/wasm/atomics/rwlock.rs | 145 -------------------- library/std/src/sys/wasm/atomics/thread.rs | 34 ----- library/std/src/sys/wasm/mod.rs | 15 +- 5 files changed, 6 insertions(+), 354 deletions(-) delete mode 100644 library/std/src/sys/wasm/atomics/condvar.rs delete mode 100644 library/std/src/sys/wasm/atomics/mutex.rs delete mode 100644 library/std/src/sys/wasm/atomics/rwlock.rs diff --git a/library/std/src/sys/wasm/atomics/condvar.rs b/library/std/src/sys/wasm/atomics/condvar.rs deleted file mode 100644 index f06c07c54093f..0000000000000 --- a/library/std/src/sys/wasm/atomics/condvar.rs +++ /dev/null @@ -1,102 +0,0 @@ -use crate::arch::wasm32; -use crate::cmp; -use crate::mem; -use crate::sync::atomic::{AtomicUsize, Ordering::SeqCst}; -use crate::sys::locks::Mutex; -use crate::time::Duration; - -pub struct Condvar { - cnt: AtomicUsize, -} - -pub type MovableCondvar = Condvar; - -// Condition variables are implemented with a simple counter internally that is -// likely to cause spurious wakeups. Blocking on a condition variable will first -// read the value of the internal counter, unlock the given mutex, and then -// block if and only if the counter's value is still the same. Notifying a -// condition variable will modify the counter (add one for now) and then wake up -// a thread waiting on the address of the counter. -// -// A thread waiting on the condition variable will as a result avoid going to -// sleep if it's notified after the lock is unlocked but before it fully goes to -// sleep. A sleeping thread is guaranteed to be woken up at some point as it can -// only be woken up with a call to `wake`. -// -// Note that it's possible for 2 or more threads to be woken up by a call to -// `notify_one` with this implementation. That can happen where the modification -// of `cnt` causes any threads in the middle of `wait` to avoid going to sleep, -// and the subsequent `wake` may wake up a thread that's actually blocking. We -// consider this a spurious wakeup, though, which all users of condition -// variables must already be prepared to handle. As a result, this source of -// spurious wakeups is currently though to be ok, although it may be problematic -// later on if it causes too many spurious wakeups. - -impl Condvar { - pub const fn new() -> Condvar { - Condvar { cnt: AtomicUsize::new(0) } - } - - #[inline] - pub unsafe fn init(&mut self) { - // nothing to do - } - - pub unsafe fn notify_one(&self) { - self.cnt.fetch_add(1, SeqCst); - // SAFETY: ptr() is always valid - unsafe { - wasm32::memory_atomic_notify(self.ptr(), 1); - } - } - - #[inline] - pub unsafe fn notify_all(&self) { - self.cnt.fetch_add(1, SeqCst); - // SAFETY: ptr() is always valid - unsafe { - wasm32::memory_atomic_notify(self.ptr(), u32::MAX); // -1 == "wake everyone" - } - } - - pub unsafe fn wait(&self, mutex: &Mutex) { - // "atomically block and unlock" implemented by loading our current - // counter's value, unlocking the mutex, and blocking if the counter - // still has the same value. - // - // Notifications happen by incrementing the counter and then waking a - // thread. Incrementing the counter after we unlock the mutex will - // prevent us from sleeping and otherwise the call to `wake` will - // wake us up once we're asleep. - let ticket = self.cnt.load(SeqCst) as i32; - mutex.unlock(); - let val = wasm32::memory_atomic_wait32(self.ptr(), ticket, -1); - // 0 == woken, 1 == not equal to `ticket`, 2 == timeout (shouldn't happen) - debug_assert!(val == 0 || val == 1); - mutex.lock(); - } - - pub unsafe fn wait_timeout(&self, mutex: &Mutex, dur: Duration) -> bool { - let ticket = self.cnt.load(SeqCst) as i32; - mutex.unlock(); - let nanos = dur.as_nanos(); - let nanos = cmp::min(i64::MAX as u128, nanos); - - // If the return value is 2 then a timeout happened, so we return - // `false` as we weren't actually notified. - let ret = wasm32::memory_atomic_wait32(self.ptr(), ticket, nanos as i64) != 2; - mutex.lock(); - return ret; - } - - #[inline] - pub unsafe fn destroy(&self) { - // nothing to do - } - - #[inline] - fn ptr(&self) -> *mut i32 { - assert_eq!(mem::size_of::(), mem::size_of::()); - self.cnt.as_mut_ptr() as *mut i32 - } -} diff --git a/library/std/src/sys/wasm/atomics/mutex.rs b/library/std/src/sys/wasm/atomics/mutex.rs deleted file mode 100644 index 1acc8392444c1..0000000000000 --- a/library/std/src/sys/wasm/atomics/mutex.rs +++ /dev/null @@ -1,64 +0,0 @@ -use crate::arch::wasm32; -use crate::mem; -use crate::sync::atomic::{AtomicUsize, Ordering::SeqCst}; - -pub struct Mutex { - locked: AtomicUsize, -} - -pub type MovableMutex = Mutex; - -// Mutexes have a pretty simple implementation where they contain an `i32` -// internally that is 0 when unlocked and 1 when the mutex is locked. -// Acquisition has a fast path where it attempts to cmpxchg the 0 to a 1, and -// if it fails it then waits for a notification. Releasing a lock is then done -// by swapping in 0 and then notifying any waiters, if present. - -impl Mutex { - pub const fn new() -> Mutex { - Mutex { locked: AtomicUsize::new(0) } - } - - #[inline] - pub unsafe fn init(&mut self) { - // nothing to do - } - - pub unsafe fn lock(&self) { - while !self.try_lock() { - // SAFETY: the caller must uphold the safety contract for `memory_atomic_wait32`. - let val = unsafe { - wasm32::memory_atomic_wait32( - self.ptr(), - 1, // we expect our mutex is locked - -1, // wait infinitely - ) - }; - // we should have either woke up (0) or got a not-equal due to a - // race (1). We should never time out (2) - debug_assert!(val == 0 || val == 1); - } - } - - pub unsafe fn unlock(&self) { - let prev = self.locked.swap(0, SeqCst); - debug_assert_eq!(prev, 1); - wasm32::memory_atomic_notify(self.ptr(), 1); // wake up one waiter, if any - } - - #[inline] - pub unsafe fn try_lock(&self) -> bool { - self.locked.compare_exchange(0, 1, SeqCst, SeqCst).is_ok() - } - - #[inline] - pub unsafe fn destroy(&self) { - // nothing to do - } - - #[inline] - fn ptr(&self) -> *mut i32 { - assert_eq!(mem::size_of::(), mem::size_of::()); - self.locked.as_mut_ptr() as *mut i32 - } -} diff --git a/library/std/src/sys/wasm/atomics/rwlock.rs b/library/std/src/sys/wasm/atomics/rwlock.rs deleted file mode 100644 index 690bb155e1a27..0000000000000 --- a/library/std/src/sys/wasm/atomics/rwlock.rs +++ /dev/null @@ -1,145 +0,0 @@ -use crate::cell::UnsafeCell; -use crate::sys::locks::{Condvar, Mutex}; - -pub struct RwLock { - lock: Mutex, - cond: Condvar, - state: UnsafeCell, -} - -pub type MovableRwLock = RwLock; - -enum State { - Unlocked, - Reading(usize), - Writing, -} - -unsafe impl Send for RwLock {} -unsafe impl Sync for RwLock {} - -// This rwlock implementation is a relatively simple implementation which has a -// condition variable for readers/writers as well as a mutex protecting the -// internal state of the lock. A current downside of the implementation is that -// unlocking the lock will notify *all* waiters rather than just readers or just -// writers. This can cause lots of "thundering stampede" problems. While -// hopefully correct this implementation is very likely to want to be changed in -// the future. - -impl RwLock { - pub const fn new() -> RwLock { - RwLock { lock: Mutex::new(), cond: Condvar::new(), state: UnsafeCell::new(State::Unlocked) } - } - - #[inline] - pub unsafe fn read(&self) { - self.lock.lock(); - while !(*self.state.get()).inc_readers() { - self.cond.wait(&self.lock); - } - self.lock.unlock(); - } - - #[inline] - pub unsafe fn try_read(&self) -> bool { - self.lock.lock(); - let ok = (*self.state.get()).inc_readers(); - self.lock.unlock(); - return ok; - } - - #[inline] - pub unsafe fn write(&self) { - self.lock.lock(); - while !(*self.state.get()).inc_writers() { - self.cond.wait(&self.lock); - } - self.lock.unlock(); - } - - #[inline] - pub unsafe fn try_write(&self) -> bool { - self.lock.lock(); - let ok = (*self.state.get()).inc_writers(); - self.lock.unlock(); - return ok; - } - - #[inline] - pub unsafe fn read_unlock(&self) { - self.lock.lock(); - let notify = (*self.state.get()).dec_readers(); - self.lock.unlock(); - if notify { - // FIXME: should only wake up one of these some of the time - self.cond.notify_all(); - } - } - - #[inline] - pub unsafe fn write_unlock(&self) { - self.lock.lock(); - (*self.state.get()).dec_writers(); - self.lock.unlock(); - // FIXME: should only wake up one of these some of the time - self.cond.notify_all(); - } - - #[inline] - pub unsafe fn destroy(&self) { - self.lock.destroy(); - self.cond.destroy(); - } -} - -impl State { - fn inc_readers(&mut self) -> bool { - match *self { - State::Unlocked => { - *self = State::Reading(1); - true - } - State::Reading(ref mut cnt) => { - *cnt += 1; - true - } - State::Writing => false, - } - } - - fn inc_writers(&mut self) -> bool { - match *self { - State::Unlocked => { - *self = State::Writing; - true - } - State::Reading(_) | State::Writing => false, - } - } - - fn dec_readers(&mut self) -> bool { - let zero = match *self { - State::Reading(ref mut cnt) => { - *cnt -= 1; - *cnt == 0 - } - State::Unlocked | State::Writing => invalid(), - }; - if zero { - *self = State::Unlocked; - } - zero - } - - fn dec_writers(&mut self) { - match *self { - State::Writing => {} - State::Unlocked | State::Reading(_) => invalid(), - } - *self = State::Unlocked; - } -} - -fn invalid() -> ! { - panic!("inconsistent rwlock"); -} diff --git a/library/std/src/sys/wasm/atomics/thread.rs b/library/std/src/sys/wasm/atomics/thread.rs index 16418a06226e4..714b704922794 100644 --- a/library/std/src/sys/wasm/atomics/thread.rs +++ b/library/std/src/sys/wasm/atomics/thread.rs @@ -53,37 +53,3 @@ pub mod guard { None } } - -// We currently just use our own thread-local to store our -// current thread's ID, and then we lazily initialize it to something allocated -// from a global counter. -pub fn my_id() -> u32 { - use crate::sync::atomic::{AtomicU32, Ordering::SeqCst}; - - static NEXT_ID: AtomicU32 = AtomicU32::new(0); - - #[thread_local] - static mut MY_ID: u32 = 0; - - unsafe { - // If our thread ID isn't set yet then we need to allocate one. Do so - // with with a simple "atomically add to a global counter" strategy. - // This strategy doesn't handled what happens when the counter - // overflows, however, so just abort everything once the counter - // overflows and eventually we could have some sort of recycling scheme - // (or maybe this is all totally irrelevant by that point!). In any case - // though we're using a CAS loop instead of a `fetch_add` to ensure that - // the global counter never overflows. - if MY_ID == 0 { - let mut cur = NEXT_ID.load(SeqCst); - MY_ID = loop { - let next = cur.checked_add(1).unwrap_or_else(|| crate::process::abort()); - match NEXT_ID.compare_exchange(cur, next, SeqCst, SeqCst) { - Ok(_) => break next, - Err(i) => cur = i, - } - }; - } - MY_ID - } -} diff --git a/library/std/src/sys/wasm/mod.rs b/library/std/src/sys/wasm/mod.rs index 9f6700caf14bf..9992e44b0e756 100644 --- a/library/std/src/sys/wasm/mod.rs +++ b/library/std/src/sys/wasm/mod.rs @@ -49,16 +49,13 @@ pub mod time; cfg_if::cfg_if! { if #[cfg(target_feature = "atomics")] { - #[path = "atomics/condvar.rs"] - mod condvar; - #[path = "atomics/mutex.rs"] - mod mutex; - #[path = "atomics/rwlock.rs"] - mod rwlock; + #[path = "../unix/locks"] pub mod locks { - pub use super::condvar::*; - pub use super::mutex::*; - pub use super::rwlock::*; + #![allow(unsafe_op_in_unsafe_fn)] + mod futex; + mod futex_rwlock; + pub use futex::{Mutex, MovableMutex, Condvar, MovableCondvar}; + pub use futex_rwlock::{RwLock, MovableRwLock}; } #[path = "atomics/futex.rs"] pub mod futex;