From 8886039ac585a87b88a0765a1455a0fa9b9ca820 Mon Sep 17 00:00:00 2001 From: Marc-Antoine Perennou Date: Fri, 24 Jul 2020 21:03:50 +0200 Subject: [PATCH 1/6] fix build with -default +unstable and add a CI check for it Fixes #842 Signed-off-by: Marc-Antoine Perennou --- .github/workflows/ci.yml | 6 ++++++ src/os/windows/mod.rs | 1 + src/utils.rs | 2 +- 3 files changed, 8 insertions(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index a24367c85..a4f85ea2a 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -86,6 +86,12 @@ jobs: command: check args: --features attributes + - name: build unstable only + uses: actions-rs/cargo@v1 + with: + command: build + args: --no-default-features --features unstable + - name: tests uses: actions-rs/cargo@v1 with: diff --git a/src/os/windows/mod.rs b/src/os/windows/mod.rs index 6dac11b6e..67bf0372a 100644 --- a/src/os/windows/mod.rs +++ b/src/os/windows/mod.rs @@ -5,5 +5,6 @@ cfg_std! { } cfg_unstable! { + #[cfg(feature = "default")] pub mod fs; } diff --git a/src/utils.rs b/src/utils.rs index 31290e333..db916ed6b 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -66,7 +66,7 @@ mod timer { #[cfg(any(feature = "unstable", feature = "default"))] pub(crate) fn timer_after(dur: std::time::Duration) -> timer::Timer { - #[cfg(not(target_os = "unknown"))] + #[cfg(all(not(target_os = "unknown"), feature = "default"))] once_cell::sync::Lazy::force(&crate::rt::RUNTIME); Timer::after(dur) From 25e0e1abdc7d5b3fac5f28d2d3d47288fec761ab Mon Sep 17 00:00:00 2001 From: Marc-Antoine Perennou Date: Wed, 15 Jul 2020 22:05:28 +0200 Subject: [PATCH 2/6] switch to async-io Signed-off-by: Marc-Antoine Perennou --- Cargo.toml | 2 ++ src/net/tcp/listener.rs | 8 ++++---- src/net/tcp/stream.rs | 4 ++-- src/net/udp/mod.rs | 8 ++++---- src/os/unix/net/datagram.rs | 4 ++-- src/os/unix/net/listener.rs | 4 ++-- src/os/unix/net/stream.rs | 2 +- src/utils.rs | 6 +++--- 8 files changed, 20 insertions(+), 18 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index a5b5c4f06..e748953da 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,6 +24,7 @@ rustdoc-args = ["--cfg", "feature=\"docs\""] [features] default = [ "std", + "async-io", "async-task", "kv-log-macro", "log", @@ -77,6 +78,7 @@ futures-timer = { version = "3.0.2", optional = true } surf = { version = "1.0.3", optional = true } [target.'cfg(not(target_os = "unknown"))'.dependencies] +async-io = { version = "0.1.5", optional = true } smol = { version = "0.1.17", optional = true } [target.'cfg(target_arch = "wasm32")'.dependencies] diff --git a/src/net/tcp/listener.rs b/src/net/tcp/listener.rs index 09f5812fb..8c87fc5f0 100644 --- a/src/net/tcp/listener.rs +++ b/src/net/tcp/listener.rs @@ -2,7 +2,7 @@ use std::future::Future; use std::net::SocketAddr; use std::pin::Pin; -use smol::Async; +use async_io::Async; use crate::io; use crate::net::{TcpStream, ToSocketAddrs}; @@ -81,7 +81,7 @@ impl TcpListener { let addrs = addrs.to_socket_addrs().await?; for addr in addrs { - match Async::::bind(&addr) { + match Async::::bind(addr) { Ok(listener) => { return Ok(TcpListener { watcher: listener }); } @@ -227,7 +227,7 @@ cfg_unix! { impl IntoRawFd for TcpListener { fn into_raw_fd(self) -> RawFd { - self.watcher.into_raw_fd() + self.watcher.into_inner().unwrap().into_raw_fd() } } } @@ -251,7 +251,7 @@ cfg_windows! { impl IntoRawSocket for TcpListener { fn into_raw_socket(self) -> RawSocket { - self.watcher.into_raw_socket() + self.watcher.into_inner().unwrap().into_raw_socket() } } } diff --git a/src/net/tcp/stream.rs b/src/net/tcp/stream.rs index 63232fa35..f7bd5c919 100644 --- a/src/net/tcp/stream.rs +++ b/src/net/tcp/stream.rs @@ -2,7 +2,7 @@ use std::io::{IoSlice, IoSliceMut}; use std::net::SocketAddr; use std::pin::Pin; -use smol::Async; +use async_io::Async; use crate::io::{self, Read, Write}; use crate::net::ToSocketAddrs; @@ -77,7 +77,7 @@ impl TcpStream { let addrs = addrs.to_socket_addrs().await?; for addr in addrs { - match Async::::connect(&addr).await { + match Async::::connect(addr).await { Ok(stream) => { return Ok(TcpStream { watcher: Arc::new(stream), diff --git a/src/net/udp/mod.rs b/src/net/udp/mod.rs index d361a6fce..dd47e0582 100644 --- a/src/net/udp/mod.rs +++ b/src/net/udp/mod.rs @@ -2,7 +2,7 @@ use std::io; use std::net::SocketAddr; use std::net::{Ipv4Addr, Ipv6Addr}; -use smol::Async; +use async_io::Async; use crate::net::ToSocketAddrs; use crate::utils::Context as _; @@ -74,7 +74,7 @@ impl UdpSocket { let addrs = addrs.to_socket_addrs().await?; for addr in addrs { - match Async::::bind(&addr) { + match Async::::bind(addr) { Ok(socket) => { return Ok(UdpSocket { watcher: socket }); } @@ -506,7 +506,7 @@ cfg_unix! { impl IntoRawFd for UdpSocket { fn into_raw_fd(self) -> RawFd { - self.watcher.into_raw_fd() + self.watcher.into_inner().unwrap().into_raw_fd() } } } @@ -530,7 +530,7 @@ cfg_windows! { impl IntoRawSocket for UdpSocket { fn into_raw_socket(self) -> RawSocket { - self.watcher.into_raw_socket() + self.watcher.into_inner().unwrap().into_raw_socket() } } } diff --git a/src/os/unix/net/datagram.rs b/src/os/unix/net/datagram.rs index 52c6b07f1..83ef9fe95 100644 --- a/src/os/unix/net/datagram.rs +++ b/src/os/unix/net/datagram.rs @@ -4,7 +4,7 @@ use std::fmt; use std::net::Shutdown; use std::os::unix::net::UnixDatagram as StdUnixDatagram; -use smol::Async; +use async_io::Async; use super::SocketAddr; use crate::io; @@ -335,6 +335,6 @@ impl FromRawFd for UnixDatagram { impl IntoRawFd for UnixDatagram { fn into_raw_fd(self) -> RawFd { - self.watcher.into_raw_fd() + self.watcher.into_inner().unwrap().into_raw_fd() } } diff --git a/src/os/unix/net/listener.rs b/src/os/unix/net/listener.rs index a63bd4b65..078b780d0 100644 --- a/src/os/unix/net/listener.rs +++ b/src/os/unix/net/listener.rs @@ -5,7 +5,7 @@ use std::future::Future; use std::os::unix::net::UnixListener as StdUnixListener; use std::pin::Pin; -use smol::Async; +use async_io::Async; use super::SocketAddr; use super::UnixStream; @@ -217,6 +217,6 @@ impl FromRawFd for UnixListener { impl IntoRawFd for UnixListener { fn into_raw_fd(self) -> RawFd { - self.watcher.into_raw_fd() + self.watcher.into_inner().unwrap().into_raw_fd() } } diff --git a/src/os/unix/net/stream.rs b/src/os/unix/net/stream.rs index 74bd6aef9..3b2fe36f4 100644 --- a/src/os/unix/net/stream.rs +++ b/src/os/unix/net/stream.rs @@ -5,7 +5,7 @@ use std::net::Shutdown; use std::os::unix::net::UnixStream as StdUnixStream; use std::pin::Pin; -use smol::Async; +use async_io::Async; use super::SocketAddr; use crate::io::{self, Read, Write}; diff --git a/src/utils.rs b/src/utils.rs index db916ed6b..f3299f636 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -61,7 +61,7 @@ pub(crate) trait Context { #[cfg(all(not(target_os = "unknown"), feature = "default"))] mod timer { - pub type Timer = smol::Timer; + pub type Timer = async_io::Timer; } #[cfg(any(feature = "unstable", feature = "default"))] @@ -69,7 +69,7 @@ pub(crate) fn timer_after(dur: std::time::Duration) -> timer::Timer { #[cfg(all(not(target_os = "unknown"), feature = "default"))] once_cell::sync::Lazy::force(&crate::rt::RUNTIME); - Timer::after(dur) + Timer::new(dur) } #[cfg(any( @@ -84,7 +84,7 @@ mod timer { pub(crate) struct Timer(futures_timer::Delay); impl Timer { - pub(crate) fn after(dur: std::time::Duration) -> Self { + pub(crate) fn new(dur: std::time::Duration) -> Self { Timer(futures_timer::Delay::new(dur)) } } From 2fe087bd0aff4e585904dc42c565e34bf7836ae4 Mon Sep 17 00:00:00 2001 From: Marc-Antoine Perennou Date: Wed, 22 Jul 2020 09:12:48 +0200 Subject: [PATCH 3/6] switch to blocking Signed-off-by: Marc-Antoine Perennou --- Cargo.toml | 2 ++ src/task/spawn_blocking.rs | 7 ++----- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index e748953da..36322d99a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,6 +26,7 @@ default = [ "std", "async-io", "async-task", + "blocking", "kv-log-macro", "log", "num_cpus", @@ -79,6 +80,7 @@ surf = { version = "1.0.3", optional = true } [target.'cfg(not(target_os = "unknown"))'.dependencies] async-io = { version = "0.1.5", optional = true } +blocking = { version = "0.5.0", optional = true } smol = { version = "0.1.17", optional = true } [target.'cfg(target_arch = "wasm32")'.dependencies] diff --git a/src/task/spawn_blocking.rs b/src/task/spawn_blocking.rs index e9ed0c5a0..9c836f24b 100644 --- a/src/task/spawn_blocking.rs +++ b/src/task/spawn_blocking.rs @@ -1,4 +1,4 @@ -use crate::task::{JoinHandle, Task}; +use crate::task::{self, JoinHandle}; /// Spawns a blocking task. /// @@ -35,8 +35,5 @@ where F: FnOnce() -> T + Send + 'static, T: Send + 'static, { - once_cell::sync::Lazy::force(&crate::rt::RUNTIME); - - let handle = smol::Task::blocking(async move { f() }).into(); - JoinHandle::new(handle, Task::new(None)) + task::spawn(async move { blocking::unblock!(f()) }) } From 48693fccc3e7fff633441ddd644c804cef9ba4ae Mon Sep 17 00:00:00 2001 From: Marc-Antoine Perennou Date: Wed, 22 Jul 2020 09:13:16 +0200 Subject: [PATCH 4/6] switch to futures-lite Signed-off-by: Marc-Antoine Perennou --- Cargo.toml | 2 ++ src/fs/file.rs | 2 +- src/task/builder.rs | 2 +- 3 files changed, 4 insertions(+), 2 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 36322d99a..1f880665f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,6 +27,7 @@ default = [ "async-io", "async-task", "blocking", + "futures-lite", "kv-log-macro", "log", "num_cpus", @@ -81,6 +82,7 @@ surf = { version = "1.0.3", optional = true } [target.'cfg(not(target_os = "unknown"))'.dependencies] async-io = { version = "0.1.5", optional = true } blocking = { version = "0.5.0", optional = true } +futures-lite = { version = "0.1.8", optional = true } smol = { version = "0.1.17", optional = true } [target.'cfg(target_arch = "wasm32")'.dependencies] diff --git a/src/fs/file.rs b/src/fs/file.rs index 2ff5643e7..56d292b97 100644 --- a/src/fs/file.rs +++ b/src/fs/file.rs @@ -315,7 +315,7 @@ impl Drop for File { // non-blocking fashion, but our only other option here is losing data remaining in the // write cache. Good task schedulers should be resilient to occasional blocking hiccups in // file destructors so we don't expect this to be a common problem in practice. - let _ = smol::block_on(self.flush()); + let _ = futures_lite::future::block_on(self.flush()); } } diff --git a/src/task/builder.rs b/src/task/builder.rs index 0024f8ab8..2fbcff923 100644 --- a/src/task/builder.rs +++ b/src/task/builder.rs @@ -169,7 +169,7 @@ impl Builder { // The first call should use run. smol::run(wrapped) } else { - smol::block_on(wrapped) + futures_lite::future::block_on(wrapped) }; num_nested_blocking.replace(num_nested_blocking.get() - 1); res From 0c51283bfcfb1b76796431b3d668f47510abab23 Mon Sep 17 00:00:00 2001 From: Marc-Antoine Perennou Date: Sat, 18 Jul 2020 20:32:56 +0200 Subject: [PATCH 5/6] switch to multitask Signed-off-by: Marc-Antoine Perennou --- Cargo.toml | 12 ++++-- src/task/builder.rs | 14 +++---- src/task/executor.rs | 91 +++++++++++++++++++++++++++++++++++++++++ src/task/join_handle.rs | 21 ++++++---- src/task/mod.rs | 2 + 5 files changed, 121 insertions(+), 19 deletions(-) create mode 100644 src/task/executor.rs diff --git a/Cargo.toml b/Cargo.toml index 1f880665f..e58401463 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,9 +30,9 @@ default = [ "futures-lite", "kv-log-macro", "log", + "multitask", "num_cpus", "pin-project-lite", - "smol", ] docs = ["attributes", "unstable", "default"] unstable = [ @@ -57,7 +57,7 @@ alloc = [ "futures-core/alloc", "pin-project-lite", ] -tokio02 = ["smol/tokio02"] +tokio02 = ["tokio"] [dependencies] async-attributes = { version = "1.1.1", optional = true } @@ -83,7 +83,7 @@ surf = { version = "1.0.3", optional = true } async-io = { version = "0.1.5", optional = true } blocking = { version = "0.5.0", optional = true } futures-lite = { version = "0.1.8", optional = true } -smol = { version = "0.1.17", optional = true } +multitask = { version = "0.2.0", optional = true } [target.'cfg(target_arch = "wasm32")'.dependencies] futures-timer = { version = "3.0.2", optional = true, features = ["wasm-bindgen"] } @@ -93,6 +93,12 @@ futures-channel = { version = "0.3.4", optional = true } [target.'cfg(target_arch = "wasm32")'.dev-dependencies] wasm-bindgen-test = "0.3.10" +[dependencies.tokio] +version = "0.2" +default-features = false +features = ["rt-threaded"] +optional = true + [dev-dependencies] femme = "1.3.0" rand = "0.7.3" diff --git a/src/task/builder.rs b/src/task/builder.rs index 2fbcff923..236081c05 100644 --- a/src/task/builder.rs +++ b/src/task/builder.rs @@ -7,7 +7,7 @@ use std::task::{Context, Poll}; use pin_project_lite::pin_project; use crate::io; -use crate::task::{JoinHandle, Task, TaskLocalsWrapper}; +use crate::task::{self, JoinHandle, Task, TaskLocalsWrapper}; /// Task builder that configures the settings of a new task. #[derive(Debug, Default)] @@ -61,9 +61,9 @@ impl Builder { }); let task = wrapped.tag.task().clone(); - let smol_task = smol::Task::spawn(wrapped).into(); + let handle = task::executor::spawn(wrapped); - Ok(JoinHandle::new(smol_task, task)) + Ok(JoinHandle::new(handle, task)) } /// Spawns a task locally with the configured settings. @@ -81,9 +81,9 @@ impl Builder { }); let task = wrapped.tag.task().clone(); - let smol_task = smol::Task::local(wrapped).into(); + let handle = task::executor::local(wrapped); - Ok(JoinHandle::new(smol_task, task)) + Ok(JoinHandle::new(handle, task)) } /// Spawns a task locally with the configured settings. @@ -166,8 +166,8 @@ impl Builder { unsafe { TaskLocalsWrapper::set_current(&wrapped.tag, || { let res = if should_run { - // The first call should use run. - smol::run(wrapped) + // The first call should run the executor + task::executor::run(wrapped) } else { futures_lite::future::block_on(wrapped) }; diff --git a/src/task/executor.rs b/src/task/executor.rs new file mode 100644 index 000000000..02fa4ca7e --- /dev/null +++ b/src/task/executor.rs @@ -0,0 +1,91 @@ +use std::cell::RefCell; +use std::future::Future; +use std::task::{Context, Poll}; + +static GLOBAL_EXECUTOR: once_cell::sync::Lazy = once_cell::sync::Lazy::new(multitask::Executor::new); + +struct Executor { + local_executor: multitask::LocalExecutor, + parker: async_io::parking::Parker, +} + +thread_local! { + static EXECUTOR: RefCell = RefCell::new({ + let (parker, unparker) = async_io::parking::pair(); + let local_executor = multitask::LocalExecutor::new(move || unparker.unpark()); + Executor { local_executor, parker } + }); +} + +pub(crate) fn spawn(future: F) -> multitask::Task +where + F: Future + Send + 'static, + T: Send + 'static, +{ + GLOBAL_EXECUTOR.spawn(future) +} + +#[cfg(feature = "unstable")] +pub(crate) fn local(future: F) -> multitask::Task +where + F: Future + 'static, + T: 'static, +{ + EXECUTOR.with(|executor| executor.borrow().local_executor.spawn(future)) +} + +pub(crate) fn run(future: F) -> T +where + F: Future, +{ + enter(|| EXECUTOR.with(|executor| { + let executor = executor.borrow(); + let unparker = executor.parker.unparker(); + let global_ticker = GLOBAL_EXECUTOR.ticker(move || unparker.unpark()); + let unparker = executor.parker.unparker(); + let waker = async_task::waker_fn(move || unparker.unpark()); + let cx = &mut Context::from_waker(&waker); + pin_utils::pin_mut!(future); + loop { + if let Poll::Ready(res) = future.as_mut().poll(cx) { + return res; + } + if let Ok(false) = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| executor.local_executor.tick() || global_ticker.tick())) { + executor.parker.park(); + } + } + })) +} + +/// Enters the tokio context if the `tokio` feature is enabled. +fn enter(f: impl FnOnce() -> T) -> T { + #[cfg(not(feature = "tokio02"))] + return f(); + + #[cfg(feature = "tokio02")] + { + use std::cell::Cell; + use tokio::runtime::Runtime; + + thread_local! { + /// The level of nested `enter` calls we are in, to ensure that the outermost always + /// has a runtime spawned. + static NESTING: Cell = Cell::new(0); + } + + /// The global tokio runtime. + static RT: once_cell::sync::Lazy = once_cell::sync::Lazy::new(|| Runtime::new().expect("cannot initialize tokio")); + + NESTING.with(|nesting| { + let res = if nesting.get() == 0 { + nesting.replace(1); + RT.enter(f) + } else { + nesting.replace(nesting.get() + 1); + f() + }; + nesting.replace(nesting.get() - 1); + res + }) + } +} diff --git a/src/task/join_handle.rs b/src/task/join_handle.rs index 110b827e2..fd0d0fb77 100644 --- a/src/task/join_handle.rs +++ b/src/task/join_handle.rs @@ -18,7 +18,7 @@ pub struct JoinHandle { } #[cfg(not(target_os = "unknown"))] -type InnerHandle = async_task::JoinHandle; +type InnerHandle = multitask::Task; #[cfg(target_arch = "wasm32")] type InnerHandle = futures_channel::oneshot::Receiver; @@ -54,8 +54,7 @@ impl JoinHandle { #[cfg(not(target_os = "unknown"))] pub async fn cancel(mut self) -> Option { let handle = self.handle.take().unwrap(); - handle.cancel(); - handle.await + handle.cancel().await } /// Cancel this task. @@ -67,15 +66,19 @@ impl JoinHandle { } } +#[cfg(not(target_os = "unknown"))] +impl Drop for JoinHandle { + fn drop(&mut self) { + if let Some(handle) = self.handle.take() { + handle.detach(); + } + } +} + impl Future for JoinHandle { type Output = T; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - match Pin::new(&mut self.handle.as_mut().unwrap()).poll(cx) { - Poll::Pending => Poll::Pending, - Poll::Ready(output) => { - Poll::Ready(output.expect("cannot await the result of a panicked task")) - } - } + Pin::new(&mut self.handle.as_mut().unwrap()).poll(cx) } } diff --git a/src/task/mod.rs b/src/task/mod.rs index ca0b92a02..9e025baf4 100644 --- a/src/task/mod.rs +++ b/src/task/mod.rs @@ -148,6 +148,8 @@ cfg_default! { mod block_on; mod builder; mod current; + #[cfg(not(target_os = "unknown"))] + mod executor; mod join_handle; mod sleep; #[cfg(not(target_os = "unknown"))] From abc2929a8e0794e8f834a77d7ccd9ab4e80a01b5 Mon Sep 17 00:00:00 2001 From: Marc-Antoine Perennou Date: Thu, 23 Jul 2020 17:46:10 +0200 Subject: [PATCH 6/6] switch to async-executor Signed-off-by: Marc-Antoine Perennou --- Cargo.toml | 4 ++-- src/rt/mod.rs | 2 +- src/task/executor.rs | 45 ++++++++++++----------------------------- src/task/join_handle.rs | 2 +- src/task/mod.rs | 2 +- 5 files changed, 18 insertions(+), 37 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index e58401463..8c890125d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,13 +24,13 @@ rustdoc-args = ["--cfg", "feature=\"docs\""] [features] default = [ "std", + "async-executor", "async-io", "async-task", "blocking", "futures-lite", "kv-log-macro", "log", - "multitask", "num_cpus", "pin-project-lite", ] @@ -80,10 +80,10 @@ futures-timer = { version = "3.0.2", optional = true } surf = { version = "1.0.3", optional = true } [target.'cfg(not(target_os = "unknown"))'.dependencies] +async-executor = { version = "0.1.1", features = ["async-io"], optional = true } async-io = { version = "0.1.5", optional = true } blocking = { version = "0.5.0", optional = true } futures-lite = { version = "0.1.8", optional = true } -multitask = { version = "0.2.0", optional = true } [target.'cfg(target_arch = "wasm32")'.dependencies] futures-timer = { version = "3.0.2", optional = true, features = ["wasm-bindgen"] } diff --git a/src/rt/mod.rs b/src/rt/mod.rs index d8550aac8..f58afb180 100644 --- a/src/rt/mod.rs +++ b/src/rt/mod.rs @@ -27,7 +27,7 @@ pub static RUNTIME: Lazy = Lazy::new(|| { for _ in 0..thread_count { thread::Builder::new() .name(thread_name.clone()) - .spawn(|| crate::task::block_on(future::pending::<()>())) + .spawn(|| crate::task::executor::run_global(future::pending::<()>())) .expect("cannot start a runtime thread"); } Runtime {} diff --git a/src/task/executor.rs b/src/task/executor.rs index 02fa4ca7e..d9caf0531 100644 --- a/src/task/executor.rs +++ b/src/task/executor.rs @@ -1,23 +1,13 @@ use std::cell::RefCell; use std::future::Future; -use std::task::{Context, Poll}; -static GLOBAL_EXECUTOR: once_cell::sync::Lazy = once_cell::sync::Lazy::new(multitask::Executor::new); - -struct Executor { - local_executor: multitask::LocalExecutor, - parker: async_io::parking::Parker, -} +static GLOBAL_EXECUTOR: once_cell::sync::Lazy = once_cell::sync::Lazy::new(async_executor::Executor::new); thread_local! { - static EXECUTOR: RefCell = RefCell::new({ - let (parker, unparker) = async_io::parking::pair(); - let local_executor = multitask::LocalExecutor::new(move || unparker.unpark()); - Executor { local_executor, parker } - }); + static EXECUTOR: RefCell = RefCell::new(async_executor::LocalExecutor::new()); } -pub(crate) fn spawn(future: F) -> multitask::Task +pub(crate) fn spawn(future: F) -> async_executor::Task where F: Future + Send + 'static, T: Send + 'static, @@ -26,35 +16,26 @@ where } #[cfg(feature = "unstable")] -pub(crate) fn local(future: F) -> multitask::Task +pub(crate) fn local(future: F) -> async_executor::Task where F: Future + 'static, T: 'static, { - EXECUTOR.with(|executor| executor.borrow().local_executor.spawn(future)) + EXECUTOR.with(|executor| executor.borrow().spawn(future)) } pub(crate) fn run(future: F) -> T where F: Future, { - enter(|| EXECUTOR.with(|executor| { - let executor = executor.borrow(); - let unparker = executor.parker.unparker(); - let global_ticker = GLOBAL_EXECUTOR.ticker(move || unparker.unpark()); - let unparker = executor.parker.unparker(); - let waker = async_task::waker_fn(move || unparker.unpark()); - let cx = &mut Context::from_waker(&waker); - pin_utils::pin_mut!(future); - loop { - if let Poll::Ready(res) = future.as_mut().poll(cx) { - return res; - } - if let Ok(false) = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| executor.local_executor.tick() || global_ticker.tick())) { - executor.parker.park(); - } - } - })) + EXECUTOR.with(|executor| enter(|| GLOBAL_EXECUTOR.enter(|| executor.borrow().run(future)))) +} + +pub(crate) fn run_global(future: F) -> T +where + F: Future, +{ + enter(|| GLOBAL_EXECUTOR.run(future)) } /// Enters the tokio context if the `tokio` feature is enabled. diff --git a/src/task/join_handle.rs b/src/task/join_handle.rs index fd0d0fb77..9189ea576 100644 --- a/src/task/join_handle.rs +++ b/src/task/join_handle.rs @@ -18,7 +18,7 @@ pub struct JoinHandle { } #[cfg(not(target_os = "unknown"))] -type InnerHandle = multitask::Task; +type InnerHandle = async_executor::Task; #[cfg(target_arch = "wasm32")] type InnerHandle = futures_channel::oneshot::Receiver; diff --git a/src/task/mod.rs b/src/task/mod.rs index 9e025baf4..b528a788e 100644 --- a/src/task/mod.rs +++ b/src/task/mod.rs @@ -149,7 +149,7 @@ cfg_default! { mod builder; mod current; #[cfg(not(target_os = "unknown"))] - mod executor; + pub(crate) mod executor; mod join_handle; mod sleep; #[cfg(not(target_os = "unknown"))]