Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use futex-based locks and thread parker on {Free, Open, DragonFly}BSD. #96510

Merged
merged 11 commits into from
May 6, 2022
4 changes: 2 additions & 2 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -2070,9 +2070,9 @@ checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55"

[[package]]
name = "libc"
version = "0.2.121"
version = "0.2.125"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "efaa7b300f3b5fe8eb6bf21ce3895e1751d9665086af2d64b42f19701015ff4f"
checksum = "5916d2ae698f6de9bfb891ad7a8d65c09d232dc58cc4ac433c7da3b2fd84bc2b"
dependencies = [
"rustc-std-workspace-core",
]
Expand Down
2 changes: 1 addition & 1 deletion library/std/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ cfg-if = { version = "0.1.8", features = ['rustc-dep-of-std'] }
panic_unwind = { path = "../panic_unwind", optional = true }
panic_abort = { path = "../panic_abort" }
core = { path = "../core" }
libc = { version = "0.2.116", default-features = false, features = ['rustc-dep-of-std'] }
libc = { version = "0.2.125", default-features = false, features = ['rustc-dep-of-std'] }
compiler_builtins = { version = "0.1.71" }
profiler_builtins = { path = "../profiler_builtins", optional = true }
unwind = { path = "../unwind" }
Expand Down
21 changes: 0 additions & 21 deletions library/std/src/sync/condvar/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,24 +188,3 @@ fn wait_timeout_wake() {
break;
}
}

#[test]
#[should_panic]
#[cfg(all(unix, not(target_os = "linux"), not(target_os = "android")))]
fn two_mutexes() {
let m = Arc::new(Mutex::new(()));
let m2 = m.clone();
let c = Arc::new(Condvar::new());
let c2 = c.clone();

let mut g = m.lock().unwrap();
let _t = thread::spawn(move || {
let _g = m2.lock().unwrap();
c2.notify_one();
});
g = c.wait(g).unwrap();
drop(g);

let m = Mutex::new(());
let _ = c.wait(m.lock().unwrap()).unwrap();
}
178 changes: 154 additions & 24 deletions library/std/src/sys/unix/futex.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
#![cfg(any(
target_os = "linux",
target_os = "android",
all(target_os = "emscripten", target_feature = "atomics")
all(target_os = "emscripten", target_feature = "atomics"),
target_os = "freebsd",
target_os = "openbsd",
target_os = "dragonfly",
))]

use crate::sync::atomic::AtomicU32;
Expand All @@ -12,7 +15,7 @@ use crate::time::Duration;
/// Returns directly if the futex doesn't hold the expected value.
///
/// Returns false on timeout, and true in all other cases.
#[cfg(any(target_os = "linux", target_os = "android"))]
#[cfg(any(target_os = "linux", target_os = "android", target_os = "freebsd"))]
pub fn futex_wait(futex: &AtomicU32, expected: u32, timeout: Option<Duration>) -> bool {
use super::time::Timespec;
use crate::ptr::null;
Expand All @@ -30,18 +33,43 @@ pub fn futex_wait(futex: &AtomicU32, expected: u32, timeout: Option<Duration>) -
return true;
}

// Use FUTEX_WAIT_BITSET rather than FUTEX_WAIT to be able to give an
// absolute time rather than a relative time.
let r = unsafe {
libc::syscall(
libc::SYS_futex,
futex as *const AtomicU32,
libc::FUTEX_WAIT_BITSET | libc::FUTEX_PRIVATE_FLAG,
expected,
timespec.as_ref().map_or(null(), |t| &t.t as *const libc::timespec),
null::<u32>(), // This argument is unused for FUTEX_WAIT_BITSET.
!0u32, // A full bitmask, to make it behave like a regular FUTEX_WAIT.
)
cfg_if::cfg_if! {
if #[cfg(target_os = "freebsd")] {
// FreeBSD doesn't have futex(), but it has
// _umtx_op(UMTX_OP_WAIT_UINT_PRIVATE), which is nearly
// identical. It supports absolute timeouts through a flag
// in the _umtx_time struct.
let umtx_timeout = timespec.map(|t| libc::_umtx_time {
_timeout: t.t,
_flags: libc::UMTX_ABSTIME,
_clockid: libc::CLOCK_MONOTONIC as u32,
});
let umtx_timeout_ptr = umtx_timeout.as_ref().map_or(null(), |t| t as *const _);
let umtx_timeout_size = umtx_timeout.as_ref().map_or(0, |t| crate::mem::size_of_val(t));
libc::_umtx_op(
futex as *const AtomicU32 as *mut _,
libc::UMTX_OP_WAIT_UINT_PRIVATE,
expected as libc::c_ulong,
crate::ptr::invalid_mut(umtx_timeout_size),
umtx_timeout_ptr as *mut _,
)
} else if #[cfg(any(target_os = "linux", target_os = "android"))] {
// Use FUTEX_WAIT_BITSET rather than FUTEX_WAIT to be able to give an
// absolute time rather than a relative time.
libc::syscall(
libc::SYS_futex,
futex as *const AtomicU32,
libc::FUTEX_WAIT_BITSET | libc::FUTEX_PRIVATE_FLAG,
expected,
timespec.as_ref().map_or(null(), |t| &t.t as *const libc::timespec),
null::<u32>(), // This argument is unused for FUTEX_WAIT_BITSET.
!0u32, // A full bitmask, to make it behave like a regular FUTEX_WAIT.
)
} else {
compile_error!("unknown target_os");
}
}
};

match (r < 0).then(super::os::errno) {
Expand All @@ -56,31 +84,133 @@ pub fn futex_wait(futex: &AtomicU32, expected: u32, timeout: Option<Duration>) -
///
/// Returns true if this actually woke up such a thread,
/// or false if no thread was waiting on this futex.
///
/// On some platforms, this always returns false.
#[cfg(any(target_os = "linux", target_os = "android"))]
pub fn futex_wake(futex: &AtomicU32) -> bool {
let ptr = futex as *const AtomicU32;
let op = libc::FUTEX_WAKE | libc::FUTEX_PRIVATE_FLAG;
unsafe { libc::syscall(libc::SYS_futex, ptr, op, 1) > 0 }
}

/// Wake up all threads that are waiting on futex_wait on this futex.
#[cfg(any(target_os = "linux", target_os = "android"))]
pub fn futex_wake_all(futex: &AtomicU32) {
let ptr = futex as *const AtomicU32;
let op = libc::FUTEX_WAKE | libc::FUTEX_PRIVATE_FLAG;
unsafe {
libc::syscall(libc::SYS_futex, ptr, op, i32::MAX);
}
}

// FreeBSD doesn't tell us how many threads are woken up, so this always returns false.
#[cfg(target_os = "freebsd")]
pub fn futex_wake(futex: &AtomicU32) -> bool {
use crate::ptr::null_mut;
unsafe {
libc::syscall(
libc::SYS_futex,
futex as *const AtomicU32,
libc::FUTEX_WAKE | libc::FUTEX_PRIVATE_FLAG,
libc::_umtx_op(
futex as *const AtomicU32 as *mut _,
libc::UMTX_OP_WAKE_PRIVATE,
1,
) > 0
null_mut(),
null_mut(),
)
};
false
}

#[cfg(target_os = "freebsd")]
pub fn futex_wake_all(futex: &AtomicU32) {
use crate::ptr::null_mut;
unsafe {
libc::_umtx_op(
futex as *const AtomicU32 as *mut _,
libc::UMTX_OP_WAKE_PRIVATE,
i32::MAX as libc::c_ulong,
null_mut(),
null_mut(),
)
};
}

#[cfg(target_os = "openbsd")]
pub fn futex_wait(futex: &AtomicU32, expected: u32, timeout: Option<Duration>) -> bool {
use crate::convert::TryInto;
use crate::ptr::{null, null_mut};
let timespec = timeout.and_then(|d| {
Some(libc::timespec {
// Sleep forever if the timeout is longer than fits in a timespec.
tv_sec: d.as_secs().try_into().ok()?,
// This conversion never truncates, as subsec_nanos is always <1e9.
tv_nsec: d.subsec_nanos() as _,
})
});

let r = unsafe {
libc::futex(
futex as *const AtomicU32 as *mut u32,
libc::FUTEX_WAIT,
expected as i32,
timespec.as_ref().map_or(null(), |t| t as *const libc::timespec),
null_mut(),
)
};

r == 0 || super::os::errno() != libc::ETIMEDOUT
}

#[cfg(target_os = "openbsd")]
pub fn futex_wake(futex: &AtomicU32) -> bool {
use crate::ptr::{null, null_mut};
unsafe {
libc::futex(futex as *const AtomicU32 as *mut u32, libc::FUTEX_WAKE, 1, null(), null_mut())
> 0
}
}

/// Wake up all threads that are waiting on futex_wait on this futex.
#[cfg(any(target_os = "linux", target_os = "android"))]
#[cfg(target_os = "openbsd")]
pub fn futex_wake_all(futex: &AtomicU32) {
use crate::ptr::{null, null_mut};
unsafe {
libc::syscall(
libc::SYS_futex,
futex as *const AtomicU32,
libc::FUTEX_WAKE | libc::FUTEX_PRIVATE_FLAG,
libc::futex(
futex as *const AtomicU32 as *mut u32,
libc::FUTEX_WAKE,
i32::MAX,
null(),
null_mut(),
);
}
}

#[cfg(target_os = "dragonfly")]
pub fn futex_wait(futex: &AtomicU32, expected: u32, timeout: Option<Duration>) -> bool {
use crate::convert::TryFrom;

// A timeout of 0 means infinite.
// We round smaller timeouts up to 1 millisecond.
// Overflows are rounded up to an infinite timeout.
let timeout_ms =
timeout.and_then(|d| Some(i32::try_from(d.as_millis()).ok()?.max(1))).unwrap_or(0);

let r = unsafe {
libc::umtx_sleep(futex as *const AtomicU32 as *const i32, expected as i32, timeout_ms)
};

r == 0 || super::os::errno() != libc::ETIMEDOUT
}

// DragonflyBSD doesn't tell us how many threads are woken up, so this always returns false.
#[cfg(target_os = "dragonfly")]
pub fn futex_wake(futex: &AtomicU32) -> bool {
unsafe { libc::umtx_wakeup(futex as *const AtomicU32 as *const i32, 1) };
false
}

#[cfg(target_os = "dragonfly")]
pub fn futex_wake_all(futex: &AtomicU32) {
unsafe { libc::umtx_wakeup(futex as *const AtomicU32 as *const i32, i32::MAX) };
}

#[cfg(target_os = "emscripten")]
extern "C" {
fn emscripten_futex_wake(addr: *const AtomicU32, count: libc::c_int) -> libc::c_int;
Expand Down
4 changes: 4 additions & 0 deletions library/std/src/sys/unix/locks/futex_rwlock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,10 @@ impl RwLock {
fn wake_writer(&self) -> bool {
self.writer_notify.fetch_add(1, Release);
futex_wake(&self.writer_notify)
// Note that FreeBSD and DragonFlyBSD don't tell us whether they woke
// up any threads or not, and always return `false` here. That still
// results in correct behaviour: it just means readers get woken up as
// well in case both readers and writers were waiting.
}

/// Spin for a while, but stop directly at the given condition.
Expand Down
3 changes: 3 additions & 0 deletions library/std/src/sys/unix/locks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ cfg_if::cfg_if! {
target_os = "linux",
target_os = "android",
all(target_os = "emscripten", target_feature = "atomics"),
target_os = "freebsd",
target_os = "openbsd",
target_os = "dragonfly",
))] {
mod futex;
mod futex_rwlock;
Expand Down
5 changes: 4 additions & 1 deletion library/std/src/sys/unix/thread_parker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@
#![cfg(not(any(
target_os = "linux",
target_os = "android",
all(target_os = "emscripten", target_feature = "atomics")
all(target_os = "emscripten", target_feature = "atomics"),
target_os = "freebsd",
target_os = "openbsd",
target_os = "dragonfly",
)))]

use crate::cell::UnsafeCell;
Expand Down
3 changes: 3 additions & 0 deletions library/std/src/sys_common/thread_parker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ cfg_if::cfg_if! {
target_os = "linux",
target_os = "android",
all(target_arch = "wasm32", target_feature = "atomics"),
target_os = "freebsd",
target_os = "openbsd",
target_os = "dragonfly",
))] {
mod futex;
pub use futex::Parker;
Expand Down