Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wasi-io: Reimplement wasi-io/poll using a Pollable trait #7812

Open
wants to merge 29 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
6b4d634
Remove `+ Sync` constraints from preview2 implementation
badeend Jan 18, 2024
a2de6ec
Only use mutable references in WasiView to guarantee unique access to…
badeend Jan 20, 2024
1e766c2
Always use Descriptor::Directory for directories, regardless of wheth…
badeend Jan 20, 2024
75ce698
Merge branch 'main' of https://github.com/bytecodealliance/wasmtime i…
badeend Jan 20, 2024
942769e
Remove some more `+ Sync`s
badeend Jan 20, 2024
0b839b5
Remove one more
badeend Jan 20, 2024
16eaf07
typo
badeend Jan 20, 2024
562f83d
Fix build errors on Rust <= 1.73. Code already compiled fine on >= 1.74
badeend Jan 20, 2024
3ac45de
ResourceTable take+restore
badeend Jan 24, 2024
b376c31
Rename Pollable -> PollableResource
badeend Jan 24, 2024
fda1966
Reimplement wasi-io/poll. This introduces a new Pollable trait which …
badeend Jan 24, 2024
fcc3e87
Remove ResourceTable::iter_entries. It was used only by the old `poll…
badeend Jan 24, 2024
d9a9842
Eliminate the (now) unnecessary surrogate parent resource of clock po…
badeend Jan 24, 2024
537b3f6
Forbid empty poll list.
badeend Jan 24, 2024
59e379f
Merge branch 'main' of https://github.com/bytecodealliance/wasmtime i…
badeend Jan 24, 2024
cf3d161
Test for specific error
badeend Jan 25, 2024
6a713a8
Simplify ready() and pending()
badeend Jan 25, 2024
f0f5209
Typo
badeend Jan 25, 2024
d6905b6
Replace panics with errors
badeend Jan 27, 2024
53349a8
Remove Lease and SlotIdentity types.
badeend Jan 27, 2024
8d54bd3
Add take_any & restore_any variants
badeend Jan 27, 2024
f7d8c4c
Redesign Pollable interface to not drop read() Futures in between pol…
badeend Feb 1, 2024
96b79f0
Rename Subscribe -> PollableAsync. Because Pollable and PollableAsync…
badeend Feb 1, 2024
bd67f42
PollableResource -> PollableHandle
badeend Feb 1, 2024
3692f9d
Merge branch 'main' of https://github.com/bytecodealliance/wasmtime i…
badeend Feb 1, 2024
45ce638
Merge branch 'main' of https://github.com/bytecodealliance/wasmtime i…
badeend Feb 11, 2024
2130c43
Rename Pollable -> PollableInternal, PollableAsync -> Subscribe, Poll…
badeend Feb 11, 2024
6e12a8a
Make the internals actually internal.
badeend Feb 11, 2024
742987b
Update docs
badeend Feb 11, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions crates/test-programs/src/bin/preview2_pollable_correct.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
use test_programs::wasi::cli::stdin;

fn main() {
let stdin = stdin::get_stdin();
let p1 = stdin.subscribe();
let p2 = stdin.subscribe();

// Should work:
// - Exactly the same pollable passed in multiple times.
// - Distinct pollables for the same backing resource (stdin in this case).
test_programs::wasi::io::poll::poll(&[&p1, &p2, &p1, &p2]);
}
4 changes: 4 additions & 0 deletions crates/test-programs/src/bin/preview2_pollable_traps.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
fn main() {
// Polling an empty list should trap:
test_programs::wasi::io::poll::poll(&[]);
}
10 changes: 5 additions & 5 deletions crates/wasi-http/src/types_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use std::str::FromStr;
use wasmtime::component::{Resource, ResourceTable};
use wasmtime_wasi::preview2::{
bindings::io::streams::{InputStream, OutputStream},
Pollable,
PollableResource,
};

impl<T: WasiHttpView> crate::bindings::http::types::Host for T {
Expand Down Expand Up @@ -643,8 +643,8 @@ impl<T: WasiHttpView> crate::bindings::http::types::HostFutureTrailers for T {
fn subscribe(
&mut self,
index: Resource<HostFutureTrailers>,
) -> wasmtime::Result<Resource<Pollable>> {
wasmtime_wasi::preview2::subscribe(self.table(), index)
) -> wasmtime::Result<Resource<PollableResource>> {
wasmtime_wasi::preview2::subscribe(self.table(), &index)
}

fn get(
Expand Down Expand Up @@ -851,8 +851,8 @@ impl<T: WasiHttpView> crate::bindings::http::types::HostFutureIncomingResponse f
fn subscribe(
&mut self,
id: Resource<HostFutureIncomingResponse>,
) -> wasmtime::Result<Resource<Pollable>> {
wasmtime_wasi::preview2::subscribe(self.table(), id)
) -> wasmtime::Result<Resource<PollableResource>> {
wasmtime_wasi::preview2::subscribe(self.table(), &id)
}
}

Expand Down
5 changes: 3 additions & 2 deletions crates/wasi-http/wit/deps/io/poll.wit
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ interface poll {
/// The result `list<u32>` contains one or more indices of handles in the
/// argument list that is ready for I/O.
///
/// If the list contains more elements than can be indexed with a `u32`
/// value, this function traps.
/// This function traps if either:
/// - the list is empty, or:
/// - the list contains more elements than can be indexed with a `u32` value.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree we need this change in the wits, lets just be sure to upstream these to the spec repo as well. We will come up with some process for how we keep the docs evolving and improving while assuring that the interface itself doesn't change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

///
/// A timeout can be implemented by adding a pollable from the
/// wasi-clocks API to the list.
Expand Down
1 change: 1 addition & 0 deletions crates/wasi/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ io-lifetimes = { workspace = true, optional = true }
fs-set-times = { workspace = true, optional = true }
bitflags = { workspace = true, optional = true }
async-trait = { workspace = true, optional = true }
smallvec = { workspace = true }
system-interface = { workspace = true, optional = true}
futures = { workspace = true, optional = true }

Expand Down
57 changes: 19 additions & 38 deletions crates/wasi/src/preview2/host/clocks.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
#![allow(unused_variables)]

use crate::preview2::bindings::{
clocks::monotonic_clock::{self, Duration as WasiDuration, Instant},
clocks::wall_clock::{self, Datetime},
};
use crate::preview2::poll::{subscribe, Subscribe};
use crate::preview2::{Pollable, WasiView};
use crate::preview2::poll;
use crate::preview2::{Pollable, PollableResource, WasiView};
use cap_std::time::SystemTime;
use std::time::Duration;
use wasmtime::component::Resource;
Expand Down Expand Up @@ -42,23 +40,18 @@ impl<T: WasiView> wall_clock::Host for T {
}
}

fn subscribe_to_duration(
table: &mut wasmtime::component::ResourceTable,
duration: tokio::time::Duration,
) -> anyhow::Result<Resource<Pollable>> {
let sleep = if duration.is_zero() {
table.push(Deadline::Past)?
fn make_pollable(duration: tokio::time::Duration) -> Box<dyn Pollable> {
if duration.is_zero() {
Box::new(poll::ready())
} else if let Some(deadline) = tokio::time::Instant::now().checked_add(duration) {
// NB: this resource created here is not actually exposed to wasm, it's
// only an internal implementation detail used to match the signature
// expected by `subscribe`.
table.push(Deadline::Instant(deadline))?
Box::new(poll::once(async move {
tokio::time::sleep_until(deadline).await
}))
} else {
// If the user specifies a time so far in the future we can't
// represent it, wait forever rather than trap.
table.push(Deadline::Never)?
};
subscribe(table, sleep)
Box::new(poll::pending())
}
}

impl<T: WasiView> monotonic_clock::Host for T {
Expand All @@ -70,34 +63,22 @@ impl<T: WasiView> monotonic_clock::Host for T {
Ok(self.ctx().monotonic_clock.resolution())
}

fn subscribe_instant(&mut self, when: Instant) -> anyhow::Result<Resource<Pollable>> {
fn subscribe_instant(&mut self, when: Instant) -> anyhow::Result<Resource<PollableResource>> {
let clock_now = self.ctx().monotonic_clock.now();
let duration = if when > clock_now {
Duration::from_nanos(when - clock_now)
} else {
Duration::from_nanos(0)
};
subscribe_to_duration(&mut self.table(), duration)
}

fn subscribe_duration(&mut self, duration: WasiDuration) -> anyhow::Result<Resource<Pollable>> {
subscribe_to_duration(&mut self.table(), Duration::from_nanos(duration))
let pollable = make_pollable(duration);
Ok(self.table().push(PollableResource::new(pollable))?)
}
}

enum Deadline {
Past,
Instant(tokio::time::Instant),
Never,
}

#[async_trait::async_trait]
impl Subscribe for Deadline {
async fn ready(&mut self) {
match self {
Deadline::Past => {}
Deadline::Instant(instant) => tokio::time::sleep_until(*instant).await,
Deadline::Never => std::future::pending().await,
}
fn subscribe_duration(
&mut self,
duration: WasiDuration,
) -> anyhow::Result<Resource<PollableResource>> {
let pollable = make_pollable(Duration::from_nanos(duration));
Ok(self.table().push(PollableResource::new(pollable))?)
}
}
23 changes: 14 additions & 9 deletions crates/wasi/src/preview2/host/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::preview2::{
bindings::io::error,
bindings::io::streams::{self, InputStream, OutputStream},
poll::subscribe,
Pollable, StreamError, StreamResult, WasiView,
PollableResource, StreamError, StreamResult, WasiView,
};
use wasmtime::component::Resource;

Expand Down Expand Up @@ -48,8 +48,11 @@ impl<T: WasiView> streams::HostOutputStream for T {
Ok(())
}

fn subscribe(&mut self, stream: Resource<OutputStream>) -> anyhow::Result<Resource<Pollable>> {
subscribe(self.table(), stream)
fn subscribe(
&mut self,
stream: Resource<OutputStream>,
) -> anyhow::Result<Resource<PollableResource>> {
subscribe(self.table(), &stream)
}

async fn blocking_write_and_flush(
Expand Down Expand Up @@ -219,8 +222,11 @@ impl<T: WasiView> streams::HostInputStream for T {
self.skip(stream, len).await
}

fn subscribe(&mut self, stream: Resource<InputStream>) -> anyhow::Result<Resource<Pollable>> {
crate::preview2::poll::subscribe(self.table(), stream)
fn subscribe(
&mut self,
stream: Resource<InputStream>,
) -> anyhow::Result<Resource<PollableResource>> {
crate::preview2::poll::subscribe(self.table(), &stream)
}
}

Expand All @@ -230,9 +236,8 @@ pub mod sync {
self as async_streams, Host as AsyncHost, HostInputStream as AsyncHostInputStream,
HostOutputStream as AsyncHostOutputStream,
},
bindings::sync_io::io::poll::Pollable,
bindings::sync_io::io::streams::{self, InputStream, OutputStream},
in_tokio, StreamError, StreamResult, WasiView,
in_tokio, PollableResource, StreamError, StreamResult, WasiView,
};
use wasmtime::component::Resource;

Expand Down Expand Up @@ -290,7 +295,7 @@ pub mod sync {
fn subscribe(
&mut self,
stream: Resource<OutputStream>,
) -> anyhow::Result<Resource<Pollable>> {
) -> anyhow::Result<Resource<PollableResource>> {
Ok(AsyncHostOutputStream::subscribe(self, stream)?)
}

Expand Down Expand Up @@ -359,7 +364,7 @@ pub mod sync {
fn subscribe(
&mut self,
stream: Resource<InputStream>,
) -> anyhow::Result<Resource<Pollable>> {
) -> anyhow::Result<Resource<PollableResource>> {
AsyncHostInputStream::subscribe(self, stream)
}
}
Expand Down
9 changes: 6 additions & 3 deletions crates/wasi/src/preview2/host/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::preview2::{
},
network::SocketAddressFamily,
};
use crate::preview2::{Pollable, SocketResult, WasiView};
use crate::preview2::{PollableResource, SocketResult, WasiView};
use cap_net_ext::Blocking;
use io_lifetimes::AsSocketlike;
use rustix::io::Errno;
Expand Down Expand Up @@ -559,8 +559,11 @@ impl<T: WasiView> crate::preview2::host::tcp::tcp::HostTcpSocket for T {
Ok(())
}

fn subscribe(&mut self, this: Resource<tcp::TcpSocket>) -> anyhow::Result<Resource<Pollable>> {
crate::preview2::poll::subscribe(self.table(), this)
fn subscribe(
&mut self,
this: Resource<tcp::TcpSocket>,
) -> anyhow::Result<Resource<PollableResource>> {
crate::preview2::poll::subscribe(self.table(), &this)
}

fn shutdown(
Expand Down
17 changes: 10 additions & 7 deletions crates/wasi/src/preview2/host/udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::preview2::{
udp::{IncomingDatagramStream, OutgoingDatagramStream, SendState, UdpState},
Subscribe,
};
use crate::preview2::{Pollable, SocketError, SocketResult, WasiView};
use crate::preview2::{PollableResource, SocketError, SocketResult, WasiView};
use anyhow::anyhow;
use async_trait::async_trait;
use io_lifetimes::AsSocketlike;
Expand Down Expand Up @@ -282,8 +282,11 @@ impl<T: WasiView> udp::HostUdpSocket for T {
Ok(())
}

fn subscribe(&mut self, this: Resource<udp::UdpSocket>) -> anyhow::Result<Resource<Pollable>> {
crate::preview2::poll::subscribe(self.table(), this)
fn subscribe(
&mut self,
this: Resource<udp::UdpSocket>,
) -> anyhow::Result<Resource<PollableResource>> {
crate::preview2::poll::subscribe(self.table(), &this)
}

fn drop(&mut self, this: Resource<udp::UdpSocket>) -> Result<(), anyhow::Error> {
Expand Down Expand Up @@ -362,8 +365,8 @@ impl<T: WasiView> udp::HostIncomingDatagramStream for T {
fn subscribe(
&mut self,
this: Resource<udp::IncomingDatagramStream>,
) -> anyhow::Result<Resource<Pollable>> {
crate::preview2::poll::subscribe(self.table(), this)
) -> anyhow::Result<Resource<PollableResource>> {
crate::preview2::poll::subscribe(self.table(), &this)
}

fn drop(&mut self, this: Resource<udp::IncomingDatagramStream>) -> Result<(), anyhow::Error> {
Expand Down Expand Up @@ -496,8 +499,8 @@ impl<T: WasiView> udp::HostOutgoingDatagramStream for T {
fn subscribe(
&mut self,
this: Resource<udp::OutgoingDatagramStream>,
) -> anyhow::Result<Resource<Pollable>> {
crate::preview2::poll::subscribe(self.table(), this)
) -> anyhow::Result<Resource<PollableResource>> {
crate::preview2::poll::subscribe(self.table(), &this)
}

fn drop(&mut self, this: Resource<udp::OutgoingDatagramStream>) -> Result<(), anyhow::Error> {
Expand Down
6 changes: 3 additions & 3 deletions crates/wasi/src/preview2/ip_name_lookup.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::preview2::bindings::sockets::ip_name_lookup::{Host, HostResolveAddressStream};
use crate::preview2::bindings::sockets::network::{ErrorCode, IpAddress, Network};
use crate::preview2::host::network::util;
use crate::preview2::poll::{subscribe, Pollable, Subscribe};
use crate::preview2::poll::{subscribe, PollableResource, Subscribe};
use crate::preview2::{spawn_blocking, AbortOnDropJoinHandle, SocketError, WasiView};
use anyhow::Result;
use std::mem;
Expand Down Expand Up @@ -68,8 +68,8 @@ impl<T: WasiView> HostResolveAddressStream for T {
fn subscribe(
&mut self,
resource: Resource<ResolveAddressStream>,
) -> Result<Resource<Pollable>> {
subscribe(self.table(), resource)
) -> Result<Resource<PollableResource>> {
subscribe(self.table(), &resource)
}

fn drop(&mut self, resource: Resource<ResolveAddressStream>) -> Result<()> {
Expand Down
6 changes: 3 additions & 3 deletions crates/wasi/src/preview2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ mod host;
mod ip_name_lookup;
mod network;
pub mod pipe;
mod poll;
pub mod poll;
#[cfg(feature = "preview1-on-preview2")]
pub mod preview0;
#[cfg(feature = "preview1-on-preview2")]
Expand All @@ -45,7 +45,7 @@ pub use self::ctx::{WasiCtx, WasiCtxBuilder, WasiView};
pub use self::error::{I32Exit, TrappableError};
pub use self::filesystem::{DirPerms, FilePerms, FsError, FsResult};
pub use self::network::{Network, SocketError, SocketResult};
pub use self::poll::{subscribe, ClosureFuture, MakeFuture, Pollable, PollableFuture, Subscribe};
pub use self::poll::{subscribe, Pollable, PollableResource, Subscribe};
pub use self::random::{thread_rng, Deterministic};
pub use self::stdio::{
stderr, stdin, stdout, IsATTY, Stderr, Stdin, StdinStream, Stdout, StdoutStream,
Expand Down Expand Up @@ -171,7 +171,7 @@ pub mod bindings {
"wasi:io/streams/input-stream": super::stream::InputStream,
"wasi:io/streams/output-stream": super::stream::OutputStream,
"wasi:io/error/error": super::stream::Error,
"wasi:io/poll/pollable": super::poll::Pollable,
"wasi:io/poll/pollable": super::poll::PollableResource,
"wasi:cli/terminal-input/terminal-input": super::stdio::TerminalInput,
"wasi:cli/terminal-output/terminal-output": super::stdio::TerminalOutput,
},
Expand Down
Loading
Loading