Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

switch to async-io + blocking + multitask #836

Merged
merged 6 commits into from
Jul 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,12 @@ jobs:
command: check
args: --features attributes

- name: build unstable only
uses: actions-rs/cargo@v1
with:
command: build
args: --no-default-features --features unstable

- name: tests
uses: actions-rs/cargo@v1
with:
Expand Down
18 changes: 15 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,15 @@ rustdoc-args = ["--cfg", "feature=\"docs\""]
[features]
default = [
"std",
"async-executor",
"async-io",
"async-task",
"blocking",
"futures-lite",
"kv-log-macro",
"log",
"num_cpus",
"pin-project-lite",
"smol",
]
docs = ["attributes", "unstable", "default"]
unstable = [
Expand All @@ -54,7 +57,7 @@ alloc = [
"futures-core/alloc",
"pin-project-lite",
]
tokio02 = ["smol/tokio02"]
tokio02 = ["tokio"]

[dependencies]
async-attributes = { version = "1.1.1", optional = true }
Expand All @@ -77,7 +80,10 @@ futures-timer = { version = "3.0.2", optional = true }
surf = { version = "1.0.3", optional = true }

[target.'cfg(not(target_os = "unknown"))'.dependencies]
smol = { version = "0.1.17", optional = true }
async-executor = { version = "0.1.1", features = ["async-io"], optional = true }
async-io = { version = "0.1.5", optional = true }
blocking = { version = "0.5.0", optional = true }
futures-lite = { version = "0.1.8", optional = true }

[target.'cfg(target_arch = "wasm32")'.dependencies]
futures-timer = { version = "3.0.2", optional = true, features = ["wasm-bindgen"] }
Expand All @@ -87,6 +93,12 @@ futures-channel = { version = "0.3.4", optional = true }
[target.'cfg(target_arch = "wasm32")'.dev-dependencies]
wasm-bindgen-test = "0.3.10"

[dependencies.tokio]
version = "0.2"
default-features = false
features = ["rt-threaded"]
optional = true

[dev-dependencies]
femme = "1.3.0"
rand = "0.7.3"
Expand Down
2 changes: 1 addition & 1 deletion src/fs/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ impl Drop for File {
// non-blocking fashion, but our only other option here is losing data remaining in the
// write cache. Good task schedulers should be resilient to occasional blocking hiccups in
// file destructors so we don't expect this to be a common problem in practice.
let _ = smol::block_on(self.flush());
let _ = futures_lite::future::block_on(self.flush());
}
}

Expand Down
8 changes: 4 additions & 4 deletions src/net/tcp/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::future::Future;
use std::net::SocketAddr;
use std::pin::Pin;

use smol::Async;
use async_io::Async;

use crate::io;
use crate::net::{TcpStream, ToSocketAddrs};
Expand Down Expand Up @@ -81,7 +81,7 @@ impl TcpListener {
let addrs = addrs.to_socket_addrs().await?;

for addr in addrs {
match Async::<std::net::TcpListener>::bind(&addr) {
match Async::<std::net::TcpListener>::bind(addr) {
Ok(listener) => {
return Ok(TcpListener { watcher: listener });
}
Expand Down Expand Up @@ -227,7 +227,7 @@ cfg_unix! {

impl IntoRawFd for TcpListener {
fn into_raw_fd(self) -> RawFd {
self.watcher.into_raw_fd()
self.watcher.into_inner().unwrap().into_raw_fd()
}
}
}
Expand All @@ -251,7 +251,7 @@ cfg_windows! {

impl IntoRawSocket for TcpListener {
fn into_raw_socket(self) -> RawSocket {
self.watcher.into_raw_socket()
self.watcher.into_inner().unwrap().into_raw_socket()
}
}
}
4 changes: 2 additions & 2 deletions src/net/tcp/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::io::{IoSlice, IoSliceMut};
use std::net::SocketAddr;
use std::pin::Pin;

use smol::Async;
use async_io::Async;

use crate::io::{self, Read, Write};
use crate::net::ToSocketAddrs;
Expand Down Expand Up @@ -77,7 +77,7 @@ impl TcpStream {
let addrs = addrs.to_socket_addrs().await?;

for addr in addrs {
match Async::<std::net::TcpStream>::connect(&addr).await {
match Async::<std::net::TcpStream>::connect(addr).await {
Ok(stream) => {
return Ok(TcpStream {
watcher: Arc::new(stream),
Expand Down
8 changes: 4 additions & 4 deletions src/net/udp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::io;
use std::net::SocketAddr;
use std::net::{Ipv4Addr, Ipv6Addr};

use smol::Async;
use async_io::Async;

use crate::net::ToSocketAddrs;
use crate::utils::Context as _;
Expand Down Expand Up @@ -74,7 +74,7 @@ impl UdpSocket {
let addrs = addrs.to_socket_addrs().await?;

for addr in addrs {
match Async::<std::net::UdpSocket>::bind(&addr) {
match Async::<std::net::UdpSocket>::bind(addr) {
Ok(socket) => {
return Ok(UdpSocket { watcher: socket });
}
Expand Down Expand Up @@ -506,7 +506,7 @@ cfg_unix! {

impl IntoRawFd for UdpSocket {
fn into_raw_fd(self) -> RawFd {
self.watcher.into_raw_fd()
self.watcher.into_inner().unwrap().into_raw_fd()
}
}
}
Expand All @@ -530,7 +530,7 @@ cfg_windows! {

impl IntoRawSocket for UdpSocket {
fn into_raw_socket(self) -> RawSocket {
self.watcher.into_raw_socket()
self.watcher.into_inner().unwrap().into_raw_socket()
}
}
}
4 changes: 2 additions & 2 deletions src/os/unix/net/datagram.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::fmt;
use std::net::Shutdown;
use std::os::unix::net::UnixDatagram as StdUnixDatagram;

use smol::Async;
use async_io::Async;

use super::SocketAddr;
use crate::io;
Expand Down Expand Up @@ -335,6 +335,6 @@ impl FromRawFd for UnixDatagram {

impl IntoRawFd for UnixDatagram {
fn into_raw_fd(self) -> RawFd {
self.watcher.into_raw_fd()
self.watcher.into_inner().unwrap().into_raw_fd()
}
}
4 changes: 2 additions & 2 deletions src/os/unix/net/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::future::Future;
use std::os::unix::net::UnixListener as StdUnixListener;
use std::pin::Pin;

use smol::Async;
use async_io::Async;

use super::SocketAddr;
use super::UnixStream;
Expand Down Expand Up @@ -217,6 +217,6 @@ impl FromRawFd for UnixListener {

impl IntoRawFd for UnixListener {
fn into_raw_fd(self) -> RawFd {
self.watcher.into_raw_fd()
self.watcher.into_inner().unwrap().into_raw_fd()
}
}
2 changes: 1 addition & 1 deletion src/os/unix/net/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::net::Shutdown;
use std::os::unix::net::UnixStream as StdUnixStream;
use std::pin::Pin;

use smol::Async;
use async_io::Async;

use super::SocketAddr;
use crate::io::{self, Read, Write};
Expand Down
1 change: 1 addition & 0 deletions src/os/windows/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,6 @@ cfg_std! {
}

cfg_unstable! {
#[cfg(feature = "default")]
pub mod fs;
}
2 changes: 1 addition & 1 deletion src/rt/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ pub static RUNTIME: Lazy<Runtime> = Lazy::new(|| {
for _ in 0..thread_count {
thread::Builder::new()
.name(thread_name.clone())
.spawn(|| crate::task::block_on(future::pending::<()>()))
.spawn(|| crate::task::executor::run_global(future::pending::<()>()))
.expect("cannot start a runtime thread");
}
Runtime {}
Expand Down
16 changes: 8 additions & 8 deletions src/task/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::task::{Context, Poll};
use pin_project_lite::pin_project;

use crate::io;
use crate::task::{JoinHandle, Task, TaskLocalsWrapper};
use crate::task::{self, JoinHandle, Task, TaskLocalsWrapper};

/// Task builder that configures the settings of a new task.
#[derive(Debug, Default)]
Expand Down Expand Up @@ -61,9 +61,9 @@ impl Builder {
});

let task = wrapped.tag.task().clone();
let smol_task = smol::Task::spawn(wrapped).into();
let handle = task::executor::spawn(wrapped);

Ok(JoinHandle::new(smol_task, task))
Ok(JoinHandle::new(handle, task))
}

/// Spawns a task locally with the configured settings.
Expand All @@ -81,9 +81,9 @@ impl Builder {
});

let task = wrapped.tag.task().clone();
let smol_task = smol::Task::local(wrapped).into();
let handle = task::executor::local(wrapped);

Ok(JoinHandle::new(smol_task, task))
Ok(JoinHandle::new(handle, task))
}

/// Spawns a task locally with the configured settings.
Expand Down Expand Up @@ -166,10 +166,10 @@ impl Builder {
unsafe {
TaskLocalsWrapper::set_current(&wrapped.tag, || {
let res = if should_run {
// The first call should use run.
smol::run(wrapped)
// The first call should run the executor
task::executor::run(wrapped)
} else {
smol::block_on(wrapped)
futures_lite::future::block_on(wrapped)
};
num_nested_blocking.replace(num_nested_blocking.get() - 1);
res
Expand Down
72 changes: 72 additions & 0 deletions src/task/executor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
use std::cell::RefCell;
use std::future::Future;

static GLOBAL_EXECUTOR: once_cell::sync::Lazy<async_executor::Executor> = once_cell::sync::Lazy::new(async_executor::Executor::new);

thread_local! {
static EXECUTOR: RefCell<async_executor::LocalExecutor> = RefCell::new(async_executor::LocalExecutor::new());
}

pub(crate) fn spawn<F, T>(future: F) -> async_executor::Task<T>
where
F: Future<Output = T> + Send + 'static,
T: Send + 'static,
{
GLOBAL_EXECUTOR.spawn(future)
}

#[cfg(feature = "unstable")]
pub(crate) fn local<F, T>(future: F) -> async_executor::Task<T>
where
F: Future<Output = T> + 'static,
T: 'static,
{
EXECUTOR.with(|executor| executor.borrow().spawn(future))
}

pub(crate) fn run<F, T>(future: F) -> T
where
F: Future<Output = T>,
{
EXECUTOR.with(|executor| enter(|| GLOBAL_EXECUTOR.enter(|| executor.borrow().run(future))))
}

pub(crate) fn run_global<F, T>(future: F) -> T
where
F: Future<Output = T>,
{
enter(|| GLOBAL_EXECUTOR.run(future))
}

/// Enters the tokio context if the `tokio` feature is enabled.
fn enter<T>(f: impl FnOnce() -> T) -> T {
#[cfg(not(feature = "tokio02"))]
return f();

#[cfg(feature = "tokio02")]
{
use std::cell::Cell;
use tokio::runtime::Runtime;

thread_local! {
/// The level of nested `enter` calls we are in, to ensure that the outermost always
/// has a runtime spawned.
static NESTING: Cell<usize> = Cell::new(0);
}

/// The global tokio runtime.
static RT: once_cell::sync::Lazy<Runtime> = once_cell::sync::Lazy::new(|| Runtime::new().expect("cannot initialize tokio"));

NESTING.with(|nesting| {
let res = if nesting.get() == 0 {
nesting.replace(1);
RT.enter(f)
} else {
nesting.replace(nesting.get() + 1);
f()
};
nesting.replace(nesting.get() - 1);
res
})
}
}
21 changes: 12 additions & 9 deletions src/task/join_handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ pub struct JoinHandle<T> {
}

#[cfg(not(target_os = "unknown"))]
type InnerHandle<T> = async_task::JoinHandle<T, ()>;
type InnerHandle<T> = async_executor::Task<T>;
#[cfg(target_arch = "wasm32")]
type InnerHandle<T> = futures_channel::oneshot::Receiver<T>;

Expand Down Expand Up @@ -54,8 +54,7 @@ impl<T> JoinHandle<T> {
#[cfg(not(target_os = "unknown"))]
pub async fn cancel(mut self) -> Option<T> {
let handle = self.handle.take().unwrap();
handle.cancel();
handle.await
handle.cancel().await
}

/// Cancel this task.
Expand All @@ -67,15 +66,19 @@ impl<T> JoinHandle<T> {
}
}

#[cfg(not(target_os = "unknown"))]
impl<T> Drop for JoinHandle<T> {
fn drop(&mut self) {
if let Some(handle) = self.handle.take() {
handle.detach();
}
}
}

impl<T> Future for JoinHandle<T> {
type Output = T;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match Pin::new(&mut self.handle.as_mut().unwrap()).poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(output) => {
Poll::Ready(output.expect("cannot await the result of a panicked task"))
}
}
Pin::new(&mut self.handle.as_mut().unwrap()).poll(cx)
}
}
2 changes: 2 additions & 0 deletions src/task/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,8 @@ cfg_default! {
mod block_on;
mod builder;
mod current;
#[cfg(not(target_os = "unknown"))]
Keruspe marked this conversation as resolved.
Show resolved Hide resolved
pub(crate) mod executor;
mod join_handle;
mod sleep;
#[cfg(not(target_os = "unknown"))]
Expand Down
Loading