diff --git a/.cargo/config.toml b/.cargo/config.toml index e9a242c..e31b6d3 100644 --- a/.cargo/config.toml +++ b/.cargo/config.toml @@ -1,5 +1,14 @@ [target.wasm32-wasip2] # wasmtime is given: +# * http enabled for wasi-http tests # * AWS auth environment variables, for running the wstd-aws integration tests. # * . directory is available at . runner = "wasmtime run -Shttp --env AWS_ACCESS_KEY_ID --env AWS_SECRET_ACCESS_KEY --env AWS_SESSION_TOKEN --dir .::." + +[target.wasm32-wasip3] +# wasmtime is given: +# * p3 enabled for wasip3 component support (backwards compat with p2) +# * http enabled for wasi-http tests +# * AWS auth environment variables, for running the wstd-aws integration tests. +# * . directory is available at . +runner = "wasmtime run -Wcomponent-model-async -Wcomponent-model-async-builtins -Wcomponent-model-async-stackful -Sp3 -Shttp --env AWS_ACCESS_KEY_ID --env AWS_SECRET_ACCESS_KEY --env AWS_SESSION_TOKEN --dir .::." diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 1362609..c70fa28 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -49,11 +49,16 @@ jobs: role-session-name: github-ci - name: check - run: cargo check --workspace --all --bins --examples + run: | + cargo check -p wstd -p wstd-axum --target wasm32-wasip2 --all-targets + cargo check -p test-programs - - name: wstd tests + - name: wstd tests (wasip2) run: cargo test -p wstd -p wstd-axum --target wasm32-wasip2 -- --nocapture + - name: wstd tests (wasip3) + run: cargo test -p wstd -p wstd-axum --target wasm32-wasip2 --no-default-features --features wasip3,json -- --nocapture + - name: test-programs tests run: cargo test -p test-programs -- --nocapture if: steps.creds.outcome == 'success' @@ -73,10 +78,12 @@ jobs: run: cargo fmt --all -- --check - name: Docs - run: cargo doc + run: cargo doc --target wasm32-wasip2 - name: Clippy - run: cargo clippy --all + run: | + cargo clippy -p wstd -p wstd-axum --target wasm32-wasip2 --all-targets + cargo clippy -p test-programs verify-publish: name: Verify publish diff --git a/Cargo.toml b/Cargo.toml index 4e248eb..02a7aa4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,13 +13,19 @@ repository.workspace = true rust-version.workspace = true [features] -default = ["json"] +default = ["json", "wasip2"] json = ["dep:serde", "dep:serde_json"] +wasip2 = ["dep:wasip2"] +wasip3 = ["dep:wasip3"] + +[build-dependencies] +cfg_aliases = "0.2" [dependencies] anyhow.workspace = true async-task.workspace = true bytes.workspace = true +cfg-if.workspace = true futures-lite.workspace = true http-body-util.workspace = true http-body.workspace = true @@ -27,13 +33,19 @@ http.workspace = true itoa.workspace = true pin-project-lite.workspace = true slab.workspace = true -wasip2.workspace = true +wasip2 = { workspace = true, optional = true } +wasip3 = { workspace = true, optional = true } wstd-macro.workspace = true # optional serde = { workspace = true, optional = true } serde_json = { workspace = true, optional = true } +# Auto-pull wasip3 deps when targeting wasm32-wasip3 so users don't need +# to also pass --features wasip3. +[target.'cfg(all(target_os = "wasi", target_env = "p3"))'.dependencies] +wasip3 = { workspace = true } + [dev-dependencies] anyhow.workspace = true clap.workspace = true @@ -72,6 +84,7 @@ async-task = "4.7" axum = { version = "0.8.6", default-features = false } bytes = "1.10.1" cargo_metadata = "0.22" +cfg-if = "1" clap = { version = "4.5.26", features = ["derive"] } futures-core = "0.3.19" futures-lite = "1.12.0" @@ -95,7 +108,9 @@ test-programs = { path = "test-programs" } tower-service = "0.3.3" ureq = { version = "3.1", default-features = false, features = ["json"] } wasip2 = "1.0" -wstd = { path = ".", version = "=0.6.6" } +wasip3 = "0.5" +wit-bindgen = { version = "0.54", default-features = false, features = ["async", "async-spawn", "inter-task-wakeup"] } +wstd = { path = ".", version = "=0.6.6", default-features = false } wstd-axum = { path = "./axum", version = "=0.6.6" } wstd-axum-macro = { path = "./axum/macro", version = "=0.6.6" } wstd-macro = { path = "./macro", version = "=0.6.6" } diff --git a/axum/Cargo.toml b/axum/Cargo.toml index 154a021..a7430fc 100644 --- a/axum/Cargo.toml +++ b/axum/Cargo.toml @@ -10,10 +10,15 @@ categories.workspace = true repository.workspace = true rust-version.workspace = true +[features] +default = ["wasip2"] +wasip2 = ["wstd/wasip2"] +wasip3 = ["wstd/wasip3"] + [dependencies] axum.workspace = true tower-service.workspace = true -wstd.workspace = true +wstd = { workspace = true, default-features = false } wstd-axum-macro.workspace = true [dev-dependencies] diff --git a/build.rs b/build.rs new file mode 100644 index 0000000..bec5023 --- /dev/null +++ b/build.rs @@ -0,0 +1,13 @@ +use cfg_aliases::cfg_aliases; + +fn main() { + cfg_aliases! { + // True when targeting a wasip3 component, either via the explicit + // `wasip3` feature or the `wasm32-wasip3` target (`target_env = "p3"`). + wstd_p3: { any(feature = "wasip3", target_env = "p3") }, + // True when targeting a wasip2 component, either via the `wasip2` + // feature (default) or the `wasm32-wasip2` target. wasip3 takes + // precedence when both apply. + wstd_p2: { all(any(feature = "wasip2", target_env = "p2"), not(wstd_p3)) }, + } +} diff --git a/ci/publish.rs b/ci/publish.rs index 66867ce..bf9ba79 100644 --- a/ci/publish.rs +++ b/ci/publish.rs @@ -365,6 +365,10 @@ fn verify(crates: &[Crate]) { .arg("--manifest-path") .arg(&krate.manifest) .env("CARGO_TARGET_DIR", "./target"); + // wstd and wstd-axum only compile for wasm32-wasip2 + if krate.name == "wstd" || krate.name == "wstd-axum" { + cmd.arg("--target").arg("wasm32-wasip2"); + } let status = cmd.status().unwrap(); assert!(status.success(), "failed to verify {:?}", &krate.manifest); let tar = Command::new("tar") diff --git a/macro/src/lib.rs b/macro/src/lib.rs index 6498377..b32f045 100644 --- a/macro/src/lib.rs +++ b/macro/src/lib.rs @@ -100,12 +100,6 @@ pub fn attr_macro_test(_attr: TokenStream, item: TokenStream) -> TokenStream { pub fn attr_macro_http_server(_attr: TokenStream, item: TokenStream) -> TokenStream { let input = parse_macro_input!(item as ItemFn); - let (run_async, run_await) = if input.sig.asyncness.is_some() { - (quote!(async), quote!(.await)) - } else { - (quote!(), quote!()) - }; - let output = &input.sig.output; let inputs = &input.sig.inputs; let name = &input.sig.ident; @@ -120,63 +114,27 @@ pub fn attr_macro_http_server(_attr: TokenStream, item: TokenStream) -> TokenStr .into(); } + // Delegate to wstd's conditionally-compiled declarative macro. + // The `cfg` checks in `__http_server_export!` run in wstd's context, + // so consumers don't need to define wasip2/wasip3 features themselves. + let asyncness = if input.sig.asyncness.is_some() { + quote!(@async) + } else { + quote!(@sync) + }; + + let run_async = if input.sig.asyncness.is_some() { + quote!(async) + } else { + quote!() + }; + quote! { - struct TheServer; - - impl ::wstd::__internal::wasip2::exports::http::incoming_handler::Guest for TheServer { - fn handle( - request: ::wstd::__internal::wasip2::http::types::IncomingRequest, - response_out: ::wstd::__internal::wasip2::http::types::ResponseOutparam - ) { - #(#attrs)* - #vis #run_async fn __run(#inputs) #output { - #body - } - - let responder = ::wstd::http::server::Responder::new(response_out); - ::wstd::runtime::block_on(async move { - match ::wstd::http::request::try_from_incoming(request) { - Ok(request) => match __run(request) #run_await { - Ok(response) => { responder.respond(response).await.unwrap() }, - Err(err) => responder.fail(err), - } - Err(err) => responder.fail(err), - } - }) - } + ::wstd::__http_server_export! { + #asyncness + { #(#attrs)* #vis #run_async fn __run(#inputs) #output { #body } } } - ::wstd::__internal::wasip2::http::proxy::export!(TheServer with_types_in ::wstd::__internal::wasip2); - - // Provide an actual function named `main`. - // - // WASI HTTP server components don't use a traditional `main` function. - // They export a function named `handle` which takes a `Request` - // argument, and which may be called multiple times on the same - // instance. To let users write a familiar `fn main` in a file - // named src/main.rs, we provide this `wstd::http_server` macro, which - // transforms the user's `fn main` into the appropriate `handle` - // function. - // - // However, when the top-level file is named src/main.rs, rustc - // requires there to be a function named `main` somewhere in it. This - // requirement can be disabled using `#![no_main]`, however we can't - // use that automatically because macros can't contain inner - // attributes, and we don't want to require users to add `#![no_main]` - // in their own code. - // - // So, we include a definition of a function named `main` here, which - // isn't intended to ever be called, and exists just to satify the - // requirement for a `main` function. - // - // Users could use `#![no_main]` if they want to. Or, they could name - // their top-level file src/lib.rs and add - // ```toml - // [lib] - // crate-type = ["cdylib"] - // ``` - // to their Cargo.toml. With either of these, this "main" function will - // be ignored as dead code. fn main() { unreachable!("HTTP server components should be run with `handle` rather than `run`") } diff --git a/src/future.rs b/src/future.rs new file mode 100644 index 0000000..b4762e0 --- /dev/null +++ b/src/future.rs @@ -0,0 +1,233 @@ +//! Asynchronous values. +//! +//! # Cancellation +//! +//! Futures can be cancelled by dropping them before they finish executing. This +//! is useful when we're no longer interested in the result of an operation, as +//! it allows us to stop doing needless work. This also means that a future may cancel at any `.await` point, and so just +//! like with `?` we have to be careful to roll back local state if our future +//! halts there. +//! +//! +//! ```no_run +//! use futures_lite::prelude::*; +//! use wstd::prelude::*; +//! use wstd::time::Duration; +//! +//! #[wstd::main] +//! async fn main() { +//! let mut counter = 0; +//! let value = async { "meow" } +//! .delay(Duration::from_millis(100)) +//! .timeout(Duration::from_millis(200)) +//! .await; +//! +//! assert_eq!(value.unwrap(), "meow"); +//! } +//! ``` + +use std::future::Future; +use std::io; +use std::pin::Pin; +use std::task::{Context, Poll, ready}; + +use pin_project_lite::pin_project; + +use crate::time::utils::timeout_err; + +pub use self::future_ext::FutureExt; + +// ---- Delay ---- + +pin_project! { + /// Suspends a future until the specified deadline. + /// + /// This `struct` is created by the [`delay`] method on [`FutureExt`]. See its + /// documentation for more. + /// + /// [`delay`]: crate::future::FutureExt::delay + /// [`FutureExt`]: crate::future::futureExt + #[must_use = "futures do nothing unless polled or .awaited"] + pub struct Delay { + #[pin] + future: F, + #[pin] + deadline: D, + state: State, + } +} + +/// The internal state +#[derive(Debug)] +enum State { + Started, + PollFuture, + Completed, +} + +impl Delay { + fn new(future: F, deadline: D) -> Self { + Self { + future, + deadline, + state: State::Started, + } + } +} + +impl Future for Delay { + type Output = F::Output; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut this = self.project(); + loop { + match this.state { + State::Started => { + ready!(this.deadline.as_mut().poll(cx)); + *this.state = State::PollFuture; + } + State::PollFuture => { + let value = ready!(this.future.as_mut().poll(cx)); + *this.state = State::Completed; + return Poll::Ready(value); + } + State::Completed => panic!("future polled after completing"), + } + } + } +} + +// ---- Timeout ---- + +pin_project! { + /// A future that times out after a duration of time. + /// + /// This `struct` is created by the [`timeout`] method on [`FutureExt`]. See its + /// documentation for more. + /// + /// [`timeout`]: crate::future::FutureExt::timeout + /// [`FutureExt`]: crate::future::futureExt + #[must_use = "futures do nothing unless polled or .awaited"] + pub struct Timeout { + #[pin] + future: F, + #[pin] + deadline: D, + completed: bool, + } +} + +impl Timeout { + fn new(future: F, deadline: D) -> Self { + Self { + future, + deadline, + completed: false, + } + } +} + +impl Future for Timeout { + type Output = io::Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + + assert!(!*this.completed, "future polled after completing"); + + match this.future.poll(cx) { + Poll::Ready(v) => { + *this.completed = true; + Poll::Ready(Ok(v)) + } + Poll::Pending => match this.deadline.poll(cx) { + Poll::Ready(_) => { + *this.completed = true; + Poll::Ready(Err(timeout_err("future timed out"))) + } + Poll::Pending => Poll::Pending, + }, + } + } +} + +// ---- FutureExt ---- + +mod future_ext { + use super::{Delay, Timeout}; + use std::future::{Future, IntoFuture}; + + /// Extend `Future` with time-based operations. + pub trait FutureExt: Future { + /// Return an error if a future does not complete within a given time span. + /// + /// Typically timeouts are, as the name implies, based on _time_. However + /// this method can time out based on any future. This can be useful in + /// combination with channels, as it allows (long-lived) futures to be + /// cancelled based on some external event. + /// + /// When a timeout is returned, the future will be dropped and destructors + /// will be run. + /// + /// # Example + /// + /// ```no_run + /// use wstd::prelude::*; + /// use wstd::time::{Instant, Duration}; + /// use std::io; + /// + /// #[wstd::main] + /// async fn main() { + /// let res = async { "meow" } + /// .delay(Duration::from_millis(100)) // longer delay + /// .timeout(Duration::from_millis(50)) // shorter timeout + /// .await; + /// assert_eq!(res.unwrap_err().kind(), io::ErrorKind::TimedOut); // error + /// + /// let res = async { "meow" } + /// .delay(Duration::from_millis(50)) // shorter delay + /// .timeout(Duration::from_millis(100)) // longer timeout + /// .await; + /// assert_eq!(res.unwrap(), "meow"); // success + /// } + /// ``` + fn timeout(self, deadline: D) -> Timeout + where + Self: Sized, + D: IntoFuture, + { + Timeout::new(self, deadline.into_future()) + } + + /// Delay resolving the future until the given deadline. + /// + /// The underlying future will not be polled until the deadline has expired. In addition + /// to using a time source as a deadline, any future can be used as a + /// deadline too. When used in combination with a multi-consumer channel, + /// this method can be used to synchronize the start of multiple futures and streams. + /// + /// # Example + /// + /// ```no_run + /// use wstd::prelude::*; + /// use wstd::time::{Instant, Duration}; + /// + /// #[wstd::main] + /// async fn main() { + /// let now = Instant::now(); + /// let delay = Duration::from_millis(100); + /// let _ = async { "meow" }.delay(delay).await; + /// assert!(now.elapsed() >= delay); + /// } + /// ``` + fn delay(self, deadline: D) -> Delay + where + Self: Sized, + D: IntoFuture, + { + Delay::new(self, deadline.into_future()) + } + } + + impl FutureExt for T where T: Future {} +} diff --git a/src/future/delay.rs b/src/future/delay.rs deleted file mode 100644 index 20d6753..0000000 --- a/src/future/delay.rs +++ /dev/null @@ -1,63 +0,0 @@ -use std::future::Future; -use std::pin::Pin; -use std::task::{Context, Poll, ready}; - -use pin_project_lite::pin_project; - -pin_project! { - /// Suspends a future until the specified deadline. - /// - /// This `struct` is created by the [`delay`] method on [`FutureExt`]. See its - /// documentation for more. - /// - /// [`delay`]: crate::future::FutureExt::delay - /// [`FutureExt`]: crate::future::futureExt - #[must_use = "futures do nothing unless polled or .awaited"] - pub struct Delay { - #[pin] - future: F, - #[pin] - deadline: D, - state: State, - } -} - -/// The internal state -#[derive(Debug)] -enum State { - Started, - PollFuture, - Completed, -} - -impl Delay { - pub(super) fn new(future: F, deadline: D) -> Self { - Self { - future, - deadline, - state: State::Started, - } - } -} - -impl Future for Delay { - type Output = F::Output; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let mut this = self.project(); - loop { - match this.state { - State::Started => { - ready!(this.deadline.as_mut().poll(cx)); - *this.state = State::PollFuture; - } - State::PollFuture => { - let value = ready!(this.future.as_mut().poll(cx)); - *this.state = State::Completed; - return Poll::Ready(value); - } - State::Completed => panic!("future polled after completing"), - } - } - } -} diff --git a/src/future/future_ext.rs b/src/future/future_ext.rs deleted file mode 100644 index 2835f4b..0000000 --- a/src/future/future_ext.rs +++ /dev/null @@ -1,76 +0,0 @@ -use super::{Delay, Timeout}; -use std::future::{Future, IntoFuture}; - -/// Extend `Future` with time-based operations. -pub trait FutureExt: Future { - /// Return an error if a future does not complete within a given time span. - /// - /// Typically timeouts are, as the name implies, based on _time_. However - /// this method can time out based on any future. This can be useful in - /// combination with channels, as it allows (long-lived) futures to be - /// cancelled based on some external event. - /// - /// When a timeout is returned, the future will be dropped and destructors - /// will be run. - /// - /// # Example - /// - /// ```no_run - /// use wstd::prelude::*; - /// use wstd::time::{Instant, Duration}; - /// use std::io; - /// - /// #[wstd::main] - /// async fn main() { - /// let res = async { "meow" } - /// .delay(Duration::from_millis(100)) // longer delay - /// .timeout(Duration::from_millis(50)) // shorter timeout - /// .await; - /// assert_eq!(res.unwrap_err().kind(), io::ErrorKind::TimedOut); // error - /// - /// let res = async { "meow" } - /// .delay(Duration::from_millis(50)) // shorter delay - /// .timeout(Duration::from_millis(100)) // longer timeout - /// .await; - /// assert_eq!(res.unwrap(), "meow"); // success - /// } - /// ``` - fn timeout(self, deadline: D) -> Timeout - where - Self: Sized, - D: IntoFuture, - { - Timeout::new(self, deadline.into_future()) - } - - /// Delay resolving the future until the given deadline. - /// - /// The underlying future will not be polled until the deadline has expired. In addition - /// to using a time source as a deadline, any future can be used as a - /// deadline too. When used in combination with a multi-consumer channel, - /// this method can be used to synchronize the start of multiple futures and streams. - /// - /// # Example - /// - /// ```no_run - /// use wstd::prelude::*; - /// use wstd::time::{Instant, Duration}; - /// - /// #[wstd::main] - /// async fn main() { - /// let now = Instant::now(); - /// let delay = Duration::from_millis(100); - /// let _ = async { "meow" }.delay(delay).await; - /// assert!(now.elapsed() >= delay); - /// } - /// ``` - fn delay(self, deadline: D) -> Delay - where - Self: Sized, - D: IntoFuture, - { - Delay::new(self, deadline.into_future()) - } -} - -impl FutureExt for T where T: Future {} diff --git a/src/future/mod.rs b/src/future/mod.rs deleted file mode 100644 index a359afd..0000000 --- a/src/future/mod.rs +++ /dev/null @@ -1,35 +0,0 @@ -//! Asynchronous values. -//! -//! # Cancellation -//! -//! Futures can be cancelled by dropping them before they finish executing. This -//! is useful when we're no longer interested in the result of an operation, as -//! it allows us to stop doing needless work. This also means that a future may cancel at any `.await` point, and so just -//! like with `?` we have to be careful to roll back local state if our future -//! halts there. -//! -//! -//! ```no_run -//! use futures_lite::prelude::*; -//! use wstd::prelude::*; -//! use wstd::time::Duration; -//! -//! #[wstd::main] -//! async fn main() { -//! let mut counter = 0; -//! let value = async { "meow" } -//! .delay(Duration::from_millis(100)) -//! .timeout(Duration::from_millis(200)) -//! .await; -//! -//! assert_eq!(value.unwrap(), "meow"); -//! } -//! ``` - -mod delay; -mod future_ext; -mod timeout; - -pub use delay::Delay; -pub use future_ext::FutureExt; -pub use timeout::Timeout; diff --git a/src/future/timeout.rs b/src/future/timeout.rs deleted file mode 100644 index 9b00e1b..0000000 --- a/src/future/timeout.rs +++ /dev/null @@ -1,60 +0,0 @@ -use crate::time::utils::timeout_err; - -use std::future::Future; -use std::io; -use std::pin::Pin; -use std::task::{Context, Poll}; - -use pin_project_lite::pin_project; - -pin_project! { - /// A future that times out after a duration of time. - /// - /// This `struct` is created by the [`timeout`] method on [`FutureExt`]. See its - /// documentation for more. - /// - /// [`timeout`]: crate::future::FutureExt::timeout - /// [`FutureExt`]: crate::future::futureExt - #[must_use = "futures do nothing unless polled or .awaited"] - pub struct Timeout { - #[pin] - future: F, - #[pin] - deadline: D, - completed: bool, - } -} - -impl Timeout { - pub(super) fn new(future: F, deadline: D) -> Self { - Self { - future, - deadline, - completed: false, - } - } -} - -impl Future for Timeout { - type Output = io::Result; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.project(); - - assert!(!*this.completed, "future polled after completing"); - - match this.future.poll(cx) { - Poll::Ready(v) => { - *this.completed = true; - Poll::Ready(Ok(v)) - } - Poll::Pending => match this.deadline.poll(cx) { - Poll::Ready(_) => { - *this.completed = true; - Poll::Ready(Err(timeout_err("future timed out"))) - } - Poll::Pending => Poll::Pending, - }, - } - } -} diff --git a/src/http.rs b/src/http.rs new file mode 100644 index 0000000..09f959a --- /dev/null +++ b/src/http.rs @@ -0,0 +1,205 @@ +//! HTTP networking support + +pub use http::status::StatusCode; +pub use http::uri::{Authority, PathAndQuery, Uri}; + +pub use crate::sys::http::client::Client; +pub use crate::sys::http::fields::{HeaderMap, HeaderName, HeaderValue}; +pub use crate::sys::http::method::Method; +pub use crate::sys::http::scheme::{InvalidUri, Scheme}; +#[doc(inline)] +pub use body::{Body, util::BodyExt}; +pub use error::{Error, ErrorCode, Result}; +pub use request::Request; +pub use response::Response; + +pub mod body { + //! HTTP body types. + pub use crate::sys::http::body::*; +} + +pub mod error { + //! The http portion of wstd uses `anyhow::Error` as its `Error` type. + //! + //! There are various concrete error types + + pub use crate::http::body::InvalidContentLength; + pub use crate::sys::http::{ErrorCode, HeaderError}; + pub use anyhow::Context; + pub use http::header::{InvalidHeaderName, InvalidHeaderValue}; + pub use http::method::InvalidMethod; + + pub type Error = anyhow::Error; + /// The `http` result type. + pub type Result = std::result::Result; +} + +pub mod request { + //! HTTP request types. + pub use crate::sys::http::request::*; +} + +pub mod response { + //! HTTP response types. + pub use crate::sys::http::response::*; +} + +pub mod server { + //! HTTP servers + //! + //! The WASI HTTP server uses the [typed main] idiom, with a `main` function + //! that takes a [`Request`] and succeeds with a [`Response`], using the + //! [`http_server`] macro: + //! + //! ```no_run + //! use wstd::http::{Request, Response, Body, Error}; + //! #[wstd::http_server] + //! async fn main(_request: Request) -> Result, Error> { + //! Ok(Response::new("Hello!\n".into())) + //! } + //! ``` + //! + //! [typed main]: https://sunfishcode.github.io/typed-main-wasi-presentation/chapter_1.html + //! [`Request`]: crate::http::Request + //! [`Responder`]: crate::http::server::Responder + //! [`Response`]: crate::http::Response + //! [`http_server`]: crate::http_server + + pub use crate::sys::http::server::*; +} + +// Conditionally-compiled declarative macro for HTTP server export +// +// The `#[wstd::http_server]` proc macro delegates to this declarative macro. +// Because `#[macro_export]` macros are compiled in wstd's context, the +// `wstd_p2` / `wstd_p3` cfg aliases (defined in build.rs) are evaluated against +// wstd's own features and target environment. Consumers don't need to define +// any features themselves. + +#[cfg(wstd_p2)] +#[macro_export] +#[doc(hidden)] +macro_rules! __http_server_export { + (@async { $($run_fn:tt)* }) => { + const _: () = { + struct TheServer; + + impl $crate::__internal::wasip2::exports::http::incoming_handler::Guest for TheServer { + fn handle( + wasi_request: $crate::__internal::wasip2::http::types::IncomingRequest, + response_out: $crate::__internal::wasip2::http::types::ResponseOutparam + ) { + $($run_fn)* + + let responder = $crate::http::server::Responder::new(response_out); + $crate::runtime::block_on(async move { + match $crate::http::request::try_from_incoming(wasi_request) { + ::core::result::Result::Ok(request) => match __run(request).await { + ::core::result::Result::Ok(response) => { responder.respond(response).await.unwrap() }, + ::core::result::Result::Err(err) => responder.fail(err), + } + ::core::result::Result::Err(err) => responder.fail(err), + } + }) + } + } + + $crate::__internal::wasip2::http::proxy::export!(TheServer with_types_in $crate::__internal::wasip2); + }; + }; + (@sync { $($run_fn:tt)* }) => { + const _: () = { + struct TheServer; + + impl $crate::__internal::wasip2::exports::http::incoming_handler::Guest for TheServer { + fn handle( + wasi_request: $crate::__internal::wasip2::http::types::IncomingRequest, + response_out: $crate::__internal::wasip2::http::types::ResponseOutparam + ) { + $($run_fn)* + + let responder = $crate::http::server::Responder::new(response_out); + $crate::runtime::block_on(async move { + match $crate::http::request::try_from_incoming(wasi_request) { + ::core::result::Result::Ok(request) => match __run(request) { + ::core::result::Result::Ok(response) => { responder.respond(response).await.unwrap() }, + ::core::result::Result::Err(err) => responder.fail(err), + } + ::core::result::Result::Err(err) => responder.fail(err), + } + }) + } + } + + $crate::__internal::wasip2::http::proxy::export!(TheServer with_types_in $crate::__internal::wasip2); + }; + }; +} + +#[cfg(wstd_p3)] +#[macro_export] +#[doc(hidden)] +macro_rules! __http_server_export { + (@async { $($run_fn:tt)* }) => { + const _: () = { + struct TheServer; + + impl $crate::__internal::wasip3::exports::http::handler::Guest for TheServer { + async fn handle( + wasi_request: $crate::__internal::wasip3::http::types::Request, + ) -> ::core::result::Result< + $crate::__internal::wasip3::http::types::Response, + $crate::__internal::wasip3::http::types::ErrorCode, + > { + $($run_fn)* + + let (_writer, completion_reader) = $crate::__internal::wasip3::wit_future::new::< + ::core::result::Result<(), $crate::__internal::wasip3::http::types::ErrorCode>, + >(|| ::core::result::Result::Ok(())); + ::core::mem::drop(_writer); + + let request = $crate::http::request::try_from_wasi_request(wasi_request, completion_reader) + .map_err($crate::http::server::error_to_wasi)?; + + let response = __run(request).await + .map_err($crate::http::server::error_to_wasi)?; + + $crate::http::server::response_to_wasi(response).await + } + } + + $crate::__internal::wasip3::http::service::export!(TheServer with_types_in $crate::__internal::wasip3); + }; + }; + (@sync { $($run_fn:tt)* }) => { + const _: () = { + struct TheServer; + + impl $crate::__internal::wasip3::exports::http::handler::Guest for TheServer { + async fn handle( + wasi_request: $crate::__internal::wasip3::http::types::Request, + ) -> ::core::result::Result< + $crate::__internal::wasip3::http::types::Response, + $crate::__internal::wasip3::http::types::ErrorCode, + > { + $($run_fn)* + + let (_writer, completion_reader) = $crate::__internal::wasip3::wit_future::new::< + ::core::result::Result<(), $crate::__internal::wasip3::http::types::ErrorCode>, + >(|| ::core::result::Result::Ok(())); + ::core::mem::drop(_writer); + + let request = $crate::http::request::try_from_wasi_request(wasi_request, completion_reader) + .map_err($crate::http::server::error_to_wasi)?; + + let response = __run(request) + .map_err($crate::http::server::error_to_wasi)?; + + $crate::http::server::response_to_wasi(response).await + } + } + + $crate::__internal::wasip3::http::service::export!(TheServer with_types_in $crate::__internal::wasip3); + }; + }; +} diff --git a/src/http/error.rs b/src/http/error.rs deleted file mode 100644 index a4f22b0..0000000 --- a/src/http/error.rs +++ /dev/null @@ -1,13 +0,0 @@ -//! The http portion of wstd uses `anyhow::Error` as its `Error` type. -//! -//! There are various concrete error types - -pub use crate::http::body::InvalidContentLength; -pub use anyhow::Context; -pub use http::header::{InvalidHeaderName, InvalidHeaderValue}; -pub use http::method::InvalidMethod; -pub use wasip2::http::types::{ErrorCode, HeaderError}; - -pub type Error = anyhow::Error; -/// The `http` result type. -pub type Result = std::result::Result; diff --git a/src/http/mod.rs b/src/http/mod.rs deleted file mode 100644 index 39f0a40..0000000 --- a/src/http/mod.rs +++ /dev/null @@ -1,25 +0,0 @@ -//! HTTP networking support -//! -pub use http::status::StatusCode; -pub use http::uri::{Authority, PathAndQuery, Uri}; - -#[doc(inline)] -pub use body::{Body, util::BodyExt}; -pub use client::Client; -pub use error::{Error, ErrorCode, Result}; -pub use fields::{HeaderMap, HeaderName, HeaderValue}; -pub use method::Method; -pub use request::Request; -pub use response::Response; -pub use scheme::{InvalidUri, Scheme}; - -pub mod body; - -mod client; -pub mod error; -mod fields; -mod method; -pub mod request; -pub mod response; -mod scheme; -pub mod server; diff --git a/src/io/copy.rs b/src/io/copy.rs index 4fd178e..9fa9ab4 100644 --- a/src/io/copy.rs +++ b/src/io/copy.rs @@ -7,7 +7,8 @@ where W: AsyncWrite, { // Optimized path when we have an `AsyncInputStream` and an - // `AsyncOutputStream`. + // `AsyncOutputStream` (p2 only — p2 can use wasi splice). + #[cfg(wstd_p2)] if let Some(reader) = reader.as_async_input_stream() && let Some(writer) = writer.as_async_output_stream() { diff --git a/src/io/mod.rs b/src/io/mod.rs index 0f34b1b..2be15db 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -5,18 +5,17 @@ mod cursor; mod empty; mod read; mod seek; -mod stdio; -mod streams; mod write; +#[cfg(wstd_p2)] pub use crate::runtime::AsyncPollable; +pub use crate::sys::io::*; +pub use crate::sys::stdio::*; pub use copy::*; pub use cursor::*; pub use empty::*; pub use read::*; pub use seek::*; -pub use stdio::*; -pub use streams::*; pub use write::*; /// The error type for I/O operations. diff --git a/src/iter/mod.rs b/src/iter.rs similarity index 100% rename from src/iter/mod.rs rename to src/iter.rs diff --git a/src/lib.rs b/src/lib.rs index ebc673d..2a97e71 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -55,6 +55,9 @@ //! These are unique capabilities provided by WASI 0.2, and because this library //! is specific to that are exposed from here. +#[allow(unreachable_pub)] +mod sys; + pub mod future; #[macro_use] pub mod http; @@ -70,10 +73,17 @@ pub use wstd_macro::attr_macro_http_server as http_server; pub use wstd_macro::attr_macro_main as main; pub use wstd_macro::attr_macro_test as test; -// Re-export the wasip2 crate for use only by `wstd-macro` macros. The proc +// Re-export the wasi bindings crate for use only by `wstd-macro` macros. The proc // macros need to generate code that uses these definitions, but we don't want // to treat it as part of our public API with regards to semver, so we keep it // under `__internal` as well as doc(hidden) to indicate it is private. +#[cfg(wstd_p3)] +#[doc(hidden)] +pub mod __internal { + pub use wasip3; +} + +#[cfg(wstd_p2)] #[doc(hidden)] pub mod __internal { pub use wasip2; diff --git a/src/net.rs b/src/net.rs new file mode 100644 index 0000000..6ddff7d --- /dev/null +++ b/src/net.rs @@ -0,0 +1,3 @@ +//! Async network abstractions. + +pub use crate::sys::net::*; diff --git a/src/rand.rs b/src/rand.rs new file mode 100644 index 0000000..e79a70b --- /dev/null +++ b/src/rand.rs @@ -0,0 +1,3 @@ +//! Random number generation. + +pub use crate::sys::random::{get_insecure_random_bytes, get_random_bytes}; diff --git a/src/runtime.rs b/src/runtime.rs new file mode 100644 index 0000000..19dd379 --- /dev/null +++ b/src/runtime.rs @@ -0,0 +1,13 @@ +//! Async event loop support. +//! +//! The way to use this is to call [`block_on()`]. Inside the future, [`Reactor::current`] +//! will give an instance of the [`Reactor`] running the event loop, which can be +//! to [`AsyncPollable::wait_for`] instances of +//! [`wasip2::Pollable`](https://docs.rs/wasi/latest/wasi/io/poll/struct.Pollable.html). +//! This will automatically wait for the futures to resolve, and call the +//! necessary wakers to work. + +#![deny(missing_debug_implementations, nonstandard_style)] +#![warn(missing_docs, unreachable_pub)] + +pub use crate::sys::runtime::*; diff --git a/src/runtime/mod.rs b/src/runtime/mod.rs deleted file mode 100644 index 24b9fc2..0000000 --- a/src/runtime/mod.rs +++ /dev/null @@ -1,36 +0,0 @@ -//! Async event loop support. -//! -//! The way to use this is to call [`block_on()`]. Inside the future, [`Reactor::current`] -//! will give an instance of the [`Reactor`] running the event loop, which can be -//! to [`AsyncPollable::wait_for`] instances of -//! [`wasip2::Pollable`](https://docs.rs/wasi/latest/wasi/io/poll/struct.Pollable.html). -//! This will automatically wait for the futures to resolve, and call the -//! necessary wakers to work. - -#![deny(missing_debug_implementations, nonstandard_style)] -#![warn(missing_docs, unreachable_pub)] - -mod block_on; -mod reactor; - -pub use ::async_task::Task; -pub use block_on::block_on; -pub use reactor::{AsyncPollable, Reactor, WaitFor}; -use std::cell::RefCell; - -// There are no threads in WASI 0.2, so this is just a safe way to thread a single reactor to all -// use sites in the background. -std::thread_local! { -pub(crate) static REACTOR: RefCell> = const { RefCell::new(None) }; -} - -/// Spawn a `Future` as a `Task` on the current `Reactor`. -/// -/// Panics if called from outside `block_on`. -pub fn spawn(fut: F) -> Task -where - F: std::future::Future + 'static, - T: 'static, -{ - Reactor::current().spawn(fut) -} diff --git a/src/sys/mod.rs b/src/sys/mod.rs new file mode 100644 index 0000000..2ffc532 --- /dev/null +++ b/src/sys/mod.rs @@ -0,0 +1,13 @@ +cfg_if::cfg_if! { + if #[cfg(wstd_p3)] { + mod p3; + use p3 as backend; + } else if #[cfg(wstd_p2)] { + mod p2; + use p2 as backend; + } else { + compile_error!("unsupported target: wstd requires a WASI target with either the `wasip2` or `wasip3` feature"); + } +} + +pub use backend::*; diff --git a/src/http/body.rs b/src/sys/p2/http/body.rs similarity index 99% rename from src/http/body.rs rename to src/sys/p2/http/body.rs index c23a3c0..b070a59 100644 --- a/src/http/body.rs +++ b/src/sys/p2/http/body.rs @@ -1,14 +1,11 @@ -use crate::http::{ - Error, HeaderMap, - error::Context as _, - fields::{header_map_from_wasi, header_map_to_wasi}, -}; +use super::fields::{header_map_from_wasi, header_map_to_wasi}; use crate::io::{AsyncInputStream, AsyncOutputStream}; use crate::runtime::{AsyncPollable, Reactor, WaitFor}; pub use ::http_body::{Body as HttpBody, Frame, SizeHint}; pub use bytes::Bytes; +use anyhow::Context as _; use http::header::CONTENT_LENGTH; use http_body_util::{BodyExt, combinators::UnsyncBoxBody}; use std::fmt; @@ -20,6 +17,9 @@ use wasip2::http::types::{ }; use wasip2::io::streams::{InputStream as WasiInputStream, StreamError}; +type Error = anyhow::Error; +type HeaderMap = http::header::HeaderMap; + pub mod util { pub use http_body_util::*; } diff --git a/src/http/client.rs b/src/sys/p2/http/client.rs similarity index 96% rename from src/http/client.rs rename to src/sys/p2/http/client.rs index 3676fa8..4957f72 100644 --- a/src/http/client.rs +++ b/src/sys/p2/http/client.rs @@ -1,6 +1,6 @@ -use super::{Body, Error, Request, Response}; -use crate::http::request::try_into_outgoing; -use crate::http::response::try_from_incoming; +use super::request::try_into_outgoing; +use super::response::try_from_incoming; +use crate::http::{Body, Error, Request, Response}; use crate::io::AsyncPollable; use crate::time::Duration; use wasip2::http::types::RequestOptions as WasiRequestOptions; diff --git a/src/http/fields.rs b/src/sys/p2/http/fields.rs similarity index 94% rename from src/http/fields.rs rename to src/sys/p2/http/fields.rs index de6df16..5c40d50 100644 --- a/src/http/fields.rs +++ b/src/sys/p2/http/fields.rs @@ -1,6 +1,7 @@ pub use http::header::{HeaderMap, HeaderName, HeaderValue}; -use super::{Error, error::Context}; +use crate::http::Error; +use crate::http::error::Context; use wasip2::http::types::Fields; pub(crate) fn header_map_from_wasi(wasi_fields: Fields) -> Result { diff --git a/src/http/method.rs b/src/sys/p2/http/method.rs similarity index 100% rename from src/http/method.rs rename to src/sys/p2/http/method.rs diff --git a/src/sys/p2/http/mod.rs b/src/sys/p2/http/mod.rs new file mode 100644 index 0000000..e62627a --- /dev/null +++ b/src/sys/p2/http/mod.rs @@ -0,0 +1,10 @@ +pub mod body; +pub(crate) mod client; +pub(crate) mod fields; +pub(crate) mod method; +pub mod request; +pub mod response; +pub(crate) mod scheme; +pub mod server; + +pub use wasip2::http::types::{ErrorCode, HeaderError}; diff --git a/src/http/request.rs b/src/sys/p2/http/request.rs similarity index 95% rename from src/http/request.rs rename to src/sys/p2/http/request.rs index 6694d03..a2aa9f8 100644 --- a/src/http/request.rs +++ b/src/sys/p2/http/request.rs @@ -1,11 +1,12 @@ -use super::{ +use super::fields::{header_map_from_wasi, header_map_to_wasi}; +use super::method::{from_wasi_method, to_wasi_method}; +use super::scheme::{from_wasi_scheme, to_wasi_scheme}; +use crate::http::{ Authority, HeaderMap, PathAndQuery, Uri, body::{Body, BodyHint}, error::{Context, Error, ErrorCode}, - fields::{header_map_from_wasi, header_map_to_wasi}, - method::{from_wasi_method, to_wasi_method}, - scheme::{from_wasi_scheme, to_wasi_scheme}, }; + use wasip2::http::outgoing_handler::OutgoingRequest; use wasip2::http::types::IncomingRequest; diff --git a/src/http/response.rs b/src/sys/p2/http/response.rs similarity index 96% rename from src/http/response.rs rename to src/sys/p2/http/response.rs index 2ab8d87..c3cae53 100644 --- a/src/http/response.rs +++ b/src/sys/p2/http/response.rs @@ -1,9 +1,9 @@ use http::StatusCode; use wasip2::http::types::IncomingResponse; +use super::fields::{HeaderMap, header_map_from_wasi}; use crate::http::body::{Body, BodyHint}; use crate::http::error::Error; -use crate::http::fields::{HeaderMap, header_map_from_wasi}; pub use http::response::{Builder, Response}; @@ -21,7 +21,6 @@ pub(crate) fn try_from_incoming(incoming: IncomingResponse) -> Result` or @@ -31,6 +30,7 @@ pub(crate) fn try_from_incoming(incoming: IncomingResponse) -> Result>>>, + // Lazily initialized pollable, used for lifetime of stream to check readiness. + // Field ordering matters: this child must be dropped before stream + subscription: OnceLock, + stream: InputStream, +} + +impl AsyncInputStream { + /// Construct an `AsyncInputStream` from a WASI `InputStream` resource. + pub fn new(stream: InputStream) -> Self { + Self { + wait_for: Mutex::new(None), + subscription: OnceLock::new(), + stream, + } + } + fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<()> { + // Lazily initialize the AsyncPollable + let subscription = self + .subscription + .get_or_init(|| AsyncPollable::new(self.stream.subscribe())); + // Lazily initialize the WaitFor. Clear it after it becomes ready. + let mut wait_for_slot = self.wait_for.lock().unwrap(); + let wait_for = wait_for_slot.get_or_insert_with(|| Box::pin(subscription.wait_for())); + match wait_for.as_mut().poll(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(()) => { + let _ = wait_for_slot.take(); + Poll::Ready(()) + } + } + } + /// Await for read readiness. + async fn ready(&self) { + poll_fn(|cx| self.poll_ready(cx)).await + } + /// Asynchronously read from the input stream. + /// This method is the same as [`AsyncRead::read`], but doesn't require a `&mut self`. + pub async fn read(&self, buf: &mut [u8]) -> std::io::Result { + let read = loop { + self.ready().await; + // Ideally, the ABI would be able to read directly into buf. + // However, with the default generated bindings, it returns a + // newly allocated vec, which we need to copy into buf. + match self.stream.read(buf.len() as u64) { + // A read of 0 bytes from WASI's `read` doesn't mean + // end-of-stream as it does in Rust. However, `self.ready()` + // cannot guarantee that at least one byte is ready for + // reading, so in this case we try again. + Ok(r) if r.is_empty() => continue, + Ok(r) => break r, + // 0 bytes from Rust's `read` means end-of-stream. + Err(StreamError::Closed) => return Ok(0), + Err(StreamError::LastOperationFailed(err)) => { + return Err(std::io::Error::other(err.to_debug_string())); + } + } + }; + let len = read.len(); + buf[0..len].copy_from_slice(&read); + Ok(len) + } + + /// Move the entire contents of an input stream directly into an output + /// stream, until the input stream has closed. This operation is optimized + /// to avoid copying stream contents into and out of memory. + pub async fn copy_to(&self, writer: &AsyncOutputStream) -> std::io::Result { + let mut written = 0; + loop { + self.ready().await; + writer.ready().await; + match writer.stream.splice(&self.stream, u64::MAX) { + Ok(n) => written += n, + Err(StreamError::Closed) => break Ok(written), + Err(StreamError::LastOperationFailed(err)) => { + break Err(std::io::Error::other(err.to_debug_string())); + } + } + } + } + + /// Use this `AsyncInputStream` as a `futures_lite::stream::Stream` with + /// items of `Result, std::io::Error>`. The returned byte vectors + /// will be at most 8k. If you want to control chunk size, use + /// `Self::into_stream_of`. + pub fn into_stream(self) -> AsyncInputChunkStream { + AsyncInputChunkStream { + stream: self, + chunk_size: 8 * 1024, + } + } + + /// Use this `AsyncInputStream` as a `futures_lite::stream::Stream` with + /// items of `Result, std::io::Error>`. The returned byte vectors + /// will be at most the `chunk_size` argument specified. + pub fn into_stream_of(self, chunk_size: usize) -> AsyncInputChunkStream { + AsyncInputChunkStream { + stream: self, + chunk_size, + } + } + + /// Use this `AsyncInputStream` as a `futures_lite::stream::Stream` with + /// items of `Result`. + pub fn into_bytestream(self) -> AsyncInputByteStream { + AsyncInputByteStream { + stream: self.into_stream(), + buffer: std::io::Read::bytes(std::io::Cursor::new(Vec::new())), + } + } +} + +impl AsyncRead for AsyncInputStream { + async fn read(&mut self, buf: &mut [u8]) -> std::io::Result { + Self::read(self, buf).await + } + + #[inline] + fn as_async_input_stream(&self) -> Option<&AsyncInputStream> { + Some(self) + } +} + +/// Wrapper of `AsyncInputStream` that impls `futures_lite::stream::Stream` +/// with an item of `Result, std::io::Error>` +pub struct AsyncInputChunkStream { + stream: AsyncInputStream, + chunk_size: usize, +} + +impl AsyncInputChunkStream { + /// Extract the `AsyncInputStream` which backs this stream. + pub fn into_inner(self) -> AsyncInputStream { + self.stream + } +} + +impl futures_lite::stream::Stream for AsyncInputChunkStream { + type Item = Result, std::io::Error>; + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match self.stream.poll_ready(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(()) => match self.stream.stream.read(self.chunk_size as u64) { + Ok(r) if r.is_empty() => Poll::Pending, + Ok(r) => Poll::Ready(Some(Ok(r))), + Err(StreamError::LastOperationFailed(err)) => { + Poll::Ready(Some(Err(std::io::Error::other(err.to_debug_string())))) + } + Err(StreamError::Closed) => Poll::Ready(None), + }, + } + } +} + +pin_project_lite::pin_project! { + /// Wrapper of `AsyncInputStream` that impls + /// `futures_lite::stream::Stream` with item `Result`. + pub struct AsyncInputByteStream { + #[pin] + stream: AsyncInputChunkStream, + buffer: std::io::Bytes>>, + } +} + +impl AsyncInputByteStream { + /// Extract the `AsyncInputStream` which backs this stream, and any bytes + /// read from the `AsyncInputStream` which have not yet been yielded by + /// the byte stream. + pub fn into_inner(self) -> (AsyncInputStream, Vec) { + ( + self.stream.into_inner(), + self.buffer + .collect::, std::io::Error>>() + .expect("read of Cursor> is infallible"), + ) + } +} + +impl futures_lite::stream::Stream for AsyncInputByteStream { + type Item = Result; + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.project(); + match this.buffer.next() { + Some(byte) => Poll::Ready(Some(Ok(byte.expect("cursor on Vec is infallible")))), + None => match futures_lite::stream::Stream::poll_next(this.stream, cx) { + Poll::Ready(Some(Ok(bytes))) => { + let mut bytes = std::io::Read::bytes(std::io::Cursor::new(bytes)); + match bytes.next() { + Some(Ok(byte)) => { + *this.buffer = bytes; + Poll::Ready(Some(Ok(byte))) + } + Some(Err(err)) => Poll::Ready(Some(Err(err))), + None => Poll::Ready(None), + } + } + Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(err))), + Poll::Ready(None) => Poll::Ready(None), + Poll::Pending => Poll::Pending, + }, + } + } +} + +/// A wrapper for WASI's `output-stream` resource that provides implementations of `AsyncWrite` and +/// `AsyncPollable`. +#[derive(Debug)] +pub struct AsyncOutputStream { + // Lazily initialized pollable, used for lifetime of stream to check readiness. + // Field ordering matters: this child must be dropped before stream + subscription: OnceLock, + stream: OutputStream, +} + +impl AsyncOutputStream { + /// Construct an `AsyncOutputStream` from a WASI `OutputStream` resource. + pub fn new(stream: OutputStream) -> Self { + Self { + subscription: OnceLock::new(), + stream, + } + } + /// Await write readiness. + async fn ready(&self) { + // Lazily initialize the AsyncPollable + let subscription = self + .subscription + .get_or_init(|| AsyncPollable::new(self.stream.subscribe())); + // Wait on readiness + subscription.wait_for().await; + } + /// Asynchronously write to the output stream. This method is the same as + /// [`AsyncWrite::write`], but doesn't require a `&mut self`. + /// + /// Awaits for write readiness, and then performs at most one write to the + /// output stream. Returns how much of the argument `buf` was written, or + /// a `std::io::Error` indicating either an error returned by the stream write + /// using the debug string provided by the WASI error, or else that the, + /// indicated by `std::io::ErrorKind::ConnectionReset`. + pub async fn write(&self, buf: &[u8]) -> std::io::Result { + // Loops at most twice. + loop { + match self.stream.check_write() { + Ok(0) => { + self.ready().await; + // Next loop guaranteed to have nonzero check_write, or error. + continue; + } + Ok(some) => { + let writable = some.try_into().unwrap_or(usize::MAX).min(buf.len()); + match self.stream.write(&buf[0..writable]) { + Ok(()) => return Ok(writable), + Err(StreamError::Closed) => { + return Err(std::io::Error::from(std::io::ErrorKind::ConnectionReset)); + } + Err(StreamError::LastOperationFailed(err)) => { + return Err(std::io::Error::other(err.to_debug_string())); + } + } + } + Err(StreamError::Closed) => { + return Err(std::io::Error::from(std::io::ErrorKind::ConnectionReset)); + } + Err(StreamError::LastOperationFailed(err)) => { + return Err(std::io::Error::other(err.to_debug_string())); + } + } + } + } + + /// Asynchronously write to the output stream. This method is the same as + /// [`AsyncWrite::write_all`], but doesn't require a `&mut self`. + pub async fn write_all(&self, buf: &[u8]) -> std::io::Result<()> { + let mut to_write = &buf[0..]; + loop { + let bytes_written = self.write(to_write).await?; + to_write = &to_write[bytes_written..]; + if to_write.is_empty() { + return Ok(()); + } + } + } + + /// Asyncronously flush the output stream. Initiates a flush, and then + /// awaits until the flush is complete and the output stream is ready for + /// writing again. + /// + /// This method is the same as [`AsyncWrite::flush`], but doesn't require + /// a `&mut self`. + /// + /// Fails with a `std::io::Error` indicating either an error returned by + /// the stream flush, using the debug string provided by the WASI error, + /// or else that the stream is closed, indicated by + /// `std::io::ErrorKind::ConnectionReset`. + pub async fn flush(&self) -> std::io::Result<()> { + match self.stream.flush() { + Ok(()) => { + self.ready().await; + Ok(()) + } + Err(StreamError::Closed) => { + Err(std::io::Error::from(std::io::ErrorKind::ConnectionReset)) + } + Err(StreamError::LastOperationFailed(err)) => { + Err(std::io::Error::other(err.to_debug_string())) + } + } + } +} + +impl AsyncWrite for AsyncOutputStream { + // Required methods + async fn write(&mut self, buf: &[u8]) -> std::io::Result { + Self::write(self, buf).await + } + async fn flush(&mut self) -> std::io::Result<()> { + Self::flush(self).await + } + + #[inline] + fn as_async_output_stream(&self) -> Option<&AsyncOutputStream> { + Some(self) + } +} diff --git a/src/sys/p2/mod.rs b/src/sys/p2/mod.rs new file mode 100644 index 0000000..b8a7c47 --- /dev/null +++ b/src/sys/p2/mod.rs @@ -0,0 +1,7 @@ +pub mod http; +pub mod io; +pub mod net; +pub mod random; +pub mod runtime; +pub mod stdio; +pub mod time; diff --git a/src/net/mod.rs b/src/sys/p2/net/mod.rs similarity index 100% rename from src/net/mod.rs rename to src/sys/p2/net/mod.rs diff --git a/src/net/tcp_listener.rs b/src/sys/p2/net/tcp_listener.rs similarity index 100% rename from src/net/tcp_listener.rs rename to src/sys/p2/net/tcp_listener.rs diff --git a/src/net/tcp_stream.rs b/src/sys/p2/net/tcp_stream.rs similarity index 100% rename from src/net/tcp_stream.rs rename to src/sys/p2/net/tcp_stream.rs diff --git a/src/rand/mod.rs b/src/sys/p2/random.rs similarity index 100% rename from src/rand/mod.rs rename to src/sys/p2/random.rs diff --git a/src/runtime/block_on.rs b/src/sys/p2/runtime/mod.rs similarity index 79% rename from src/runtime/block_on.rs rename to src/sys/p2/runtime/mod.rs index c7bbd31..713dc6f 100644 --- a/src/runtime/block_on.rs +++ b/src/sys/p2/runtime/mod.rs @@ -1,4 +1,14 @@ -use super::{REACTOR, Reactor}; +mod reactor; + +pub use ::async_task::Task; +pub use reactor::{AsyncPollable, Reactor, WaitFor}; +use std::cell::RefCell; + +// There are no threads in WASI 0.2, so this is just a safe way to thread a single reactor to all +// use sites in the background. +std::thread_local! { +pub(crate) static REACTOR: RefCell> = const { RefCell::new(None) }; +} use std::future::Future; use std::pin::pin; @@ -62,3 +72,14 @@ where } } } + +/// Spawn a `Future` as a `Task` on the current `Reactor`. +/// +/// Panics if called from outside `block_on`. +pub fn spawn(fut: F) -> Task +where + F: std::future::Future + 'static, + T: 'static, +{ + Reactor::current().spawn(fut) +} diff --git a/src/runtime/reactor.rs b/src/sys/p2/runtime/reactor.rs similarity index 100% rename from src/runtime/reactor.rs rename to src/sys/p2/runtime/reactor.rs diff --git a/src/sys/p2/stdio.rs b/src/sys/p2/stdio.rs new file mode 100644 index 0000000..fa183a3 --- /dev/null +++ b/src/sys/p2/stdio.rs @@ -0,0 +1,179 @@ +use crate::io::{AsyncInputStream, AsyncOutputStream, AsyncRead, AsyncWrite, Result}; +use std::cell::LazyCell; +use wasip2::cli::terminal_input::TerminalInput; +use wasip2::cli::terminal_output::TerminalOutput; + +/// Use the program's stdin as an `AsyncInputStream`. +#[derive(Debug)] +pub struct Stdin { + stream: AsyncInputStream, + terminput: LazyCell>, +} + +/// Get the program's stdin for use as an `AsyncInputStream`. +pub fn stdin() -> Stdin { + let stream = AsyncInputStream::new(wasip2::cli::stdin::get_stdin()); + Stdin { + stream, + terminput: LazyCell::new(wasip2::cli::terminal_stdin::get_terminal_stdin), + } +} + +impl Stdin { + /// Check if stdin is a terminal. + pub fn is_terminal(&self) -> bool { + LazyCell::force(&self.terminput).is_some() + } + + /// Get the `AsyncInputStream` used to implement `Stdin` + pub fn into_inner(self) -> AsyncInputStream { + self.stream + } +} + +impl AsyncRead for Stdin { + #[inline] + async fn read(&mut self, buf: &mut [u8]) -> Result { + self.stream.read(buf).await + } + + #[inline] + async fn read_to_end(&mut self, buf: &mut Vec) -> Result { + self.stream.read_to_end(buf).await + } + + #[inline] + fn as_async_input_stream(&self) -> Option<&AsyncInputStream> { + Some(&self.stream) + } +} + +/// Use the program's stdout as an `AsyncOutputStream`. +#[derive(Debug)] +pub struct Stdout { + stream: AsyncOutputStream, + termoutput: LazyCell>, +} + +/// Get the program's stdout for use as an `AsyncOutputStream`. +pub fn stdout() -> Stdout { + let stream = AsyncOutputStream::new(wasip2::cli::stdout::get_stdout()); + Stdout { + stream, + termoutput: LazyCell::new(wasip2::cli::terminal_stdout::get_terminal_stdout), + } +} + +impl Stdout { + /// Check if stdout is a terminal. + pub fn is_terminal(&self) -> bool { + LazyCell::force(&self.termoutput).is_some() + } + + /// Get the `AsyncOutputStream` used to implement `Stdout` + pub fn into_inner(self) -> AsyncOutputStream { + self.stream + } +} + +impl AsyncWrite for Stdout { + #[inline] + async fn write(&mut self, buf: &[u8]) -> Result { + self.stream.write(buf).await + } + + #[inline] + async fn flush(&mut self) -> Result<()> { + self.stream.flush().await + } + + #[inline] + async fn write_all(&mut self, buf: &[u8]) -> Result<()> { + self.stream.write_all(buf).await + } + + #[inline] + fn as_async_output_stream(&self) -> Option<&AsyncOutputStream> { + self.stream.as_async_output_stream() + } +} + +/// Use the program's stdout as an `AsyncOutputStream`. +#[derive(Debug)] +pub struct Stderr { + stream: AsyncOutputStream, + termoutput: LazyCell>, +} + +/// Get the program's stdout for use as an `AsyncOutputStream`. +pub fn stderr() -> Stderr { + let stream = AsyncOutputStream::new(wasip2::cli::stderr::get_stderr()); + Stderr { + stream, + termoutput: LazyCell::new(wasip2::cli::terminal_stderr::get_terminal_stderr), + } +} + +impl Stderr { + /// Check if stderr is a terminal. + pub fn is_terminal(&self) -> bool { + LazyCell::force(&self.termoutput).is_some() + } + + /// Get the `AsyncOutputStream` used to implement `Stderr` + pub fn into_inner(self) -> AsyncOutputStream { + self.stream + } +} + +impl AsyncWrite for Stderr { + #[inline] + async fn write(&mut self, buf: &[u8]) -> Result { + self.stream.write(buf).await + } + + #[inline] + async fn flush(&mut self) -> Result<()> { + self.stream.flush().await + } + + #[inline] + async fn write_all(&mut self, buf: &[u8]) -> Result<()> { + self.stream.write_all(buf).await + } + + #[inline] + fn as_async_output_stream(&self) -> Option<&AsyncOutputStream> { + self.stream.as_async_output_stream() + } +} + +#[cfg(test)] +mod test { + use crate::io::AsyncWrite; + use crate::runtime::block_on; + #[test] + // No internal predicate. Run test with --nocapture and inspect output manually. + fn stdout_println_hello_world() { + block_on(async { + let mut stdout = super::stdout(); + let term = if stdout.is_terminal() { "is" } else { "is not" }; + stdout + .write_all(format!("hello, world! stdout {term} a terminal\n",).as_bytes()) + .await + .unwrap(); + }) + } + #[test] + // No internal predicate. Run test with --nocapture and inspect output manually. + fn stderr_println_hello_world() { + block_on(async { + let mut stdout = super::stdout(); + let term = if stdout.is_terminal() { "is" } else { "is not" }; + stdout + .write_all(format!("hello, world! stderr {term} a terminal\n",).as_bytes()) + .await + .unwrap(); + }) + } +} diff --git a/src/sys/p2/time.rs b/src/sys/p2/time.rs new file mode 100644 index 0000000..238a218 --- /dev/null +++ b/src/sys/p2/time.rs @@ -0,0 +1,49 @@ +use wasip2::clocks::{ + monotonic_clock::{self, subscribe_duration, subscribe_instant}, + wall_clock, +}; + +use crate::runtime::{AsyncPollable, Reactor}; + +/// A measurement of a monotonically nondecreasing clock. Opaque and useful only +/// with Duration. +pub type MonotonicInstant = monotonic_clock::Instant; + +/// A duration from the monotonic clock, in nanoseconds. +pub type MonotonicDuration = monotonic_clock::Duration; + +/// Return the current monotonic clock instant. +pub fn now() -> MonotonicInstant { + monotonic_clock::now() +} + +/// A measurement of the system clock, useful for talking to external entities +/// like the file system or other processes. May be converted losslessly to a +/// more useful `std::time::SystemTime` to provide more methods. +#[derive(Debug, Clone, Copy)] +#[allow(dead_code)] +pub struct SystemTime(wall_clock::Datetime); + +impl SystemTime { + pub fn now() -> Self { + Self(wall_clock::now()) + } +} + +impl From for std::time::SystemTime { + fn from(st: SystemTime) -> Self { + std::time::SystemTime::UNIX_EPOCH + + std::time::Duration::from_secs(st.0.seconds) + + std::time::Duration::from_nanos(st.0.nanoseconds.into()) + } +} + +/// Create a timer that fires at a specific monotonic clock instant. +pub fn subscribe_at(instant: MonotonicInstant) -> AsyncPollable { + Reactor::current().schedule(subscribe_instant(instant)) +} + +/// Create a timer that fires after a monotonic clock duration. +pub fn subscribe_after(duration: MonotonicDuration) -> AsyncPollable { + Reactor::current().schedule(subscribe_duration(duration)) +} diff --git a/src/sys/p3/http/body.rs b/src/sys/p3/http/body.rs new file mode 100644 index 0000000..eadaa66 --- /dev/null +++ b/src/sys/p3/http/body.rs @@ -0,0 +1,274 @@ +use crate::http::{Error, error::Context as _}; +use crate::io::AsyncInputStream; + +pub use ::http_body::{Body as HttpBody, Frame, SizeHint}; +pub use bytes::Bytes; +use http::header::CONTENT_LENGTH; +use http_body_util::{BodyExt, combinators::UnsyncBoxBody}; +use std::fmt; + +type HeaderMap = http::header::HeaderMap; + +pub mod util { + pub use http_body_util::*; +} + +#[derive(Debug)] +pub struct Body(BodyInner); + +#[derive(Debug)] +enum BodyInner { + Boxed(UnsyncBoxBody), + P3Stream(P3StreamBody), + Complete { + data: Bytes, + trailers: Option, + }, +} + +struct P3StreamBody { + reader: Option>, + size_hint: BodyHint, +} + +impl fmt::Debug for P3StreamBody { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("P3StreamBody").finish() + } +} + +impl Body { + pub(crate) fn from_p3_stream( + reader: wasip3::wit_bindgen::rt::async_support::StreamReader, + size_hint: BodyHint, + ) -> Self { + Body(BodyInner::P3Stream(P3StreamBody { + reader: Some(reader), + size_hint, + })) + } + + /// Convert this `Body` into an `UnsyncBoxBody`. + pub fn into_boxed_body(self) -> UnsyncBoxBody { + fn map_e(_: std::convert::Infallible) -> Error { + unreachable!() + } + match self.0 { + BodyInner::P3Stream(p3) => { + let stream = AsyncInputStream::new(p3.reader.unwrap()); + use futures_lite::stream::StreamExt; + http_body_util::StreamBody::new(stream.into_stream().map(|res| { + res.map(|bytevec| Frame::data(Bytes::from_owner(bytevec))) + .map_err(Into::into) + })) + .boxed_unsync() + } + BodyInner::Complete { data, trailers } => http_body_util::Full::new(data) + .map_err(map_e) + .with_trailers(async move { Ok(trailers).transpose() }) + .boxed_unsync(), + BodyInner::Boxed(b) => b, + } + } + + pub async fn contents(&mut self) -> Result<&[u8], Error> { + match &mut self.0 { + BodyInner::Complete { data, .. } => Ok(&*data), + inner => { + let mut prev = BodyInner::Complete { + data: Bytes::new(), + trailers: None, + }; + std::mem::swap(inner, &mut prev); + + // For p3 streams, read directly using the async read method + if let BodyInner::P3Stream(p3) = prev { + let mut stream = AsyncInputStream::new(p3.reader.unwrap()); + let mut all_data = Vec::new(); + let mut buf = vec![0u8; 64 * 1024]; + loop { + match stream.read(&mut buf).await { + Ok(0) => break, + Ok(n) => all_data.extend_from_slice(&buf[..n]), + Err(e) => return Err(Error::from(e).context("reading p3 body stream")), + } + } + *inner = BodyInner::Complete { + data: Bytes::from(all_data), + trailers: None, + }; + return Ok(match inner { + BodyInner::Complete { data, .. } => &*data, + _ => unreachable!(), + }); + } + + let boxed_body = match prev { + BodyInner::Boxed(b) => b, + BodyInner::Complete { .. } => unreachable!(), + BodyInner::P3Stream(_) => unreachable!(), + }; + let collected = boxed_body.collect().await?; + let trailers = collected.trailers().cloned(); + *inner = BodyInner::Complete { + data: collected.to_bytes(), + trailers, + }; + Ok(match inner { + BodyInner::Complete { data, .. } => &*data, + _ => unreachable!(), + }) + } + } + } + + pub fn content_length(&self) -> Option { + match &self.0 { + BodyInner::Boxed(b) => b.size_hint().exact(), + BodyInner::Complete { data, .. } => Some(data.len() as u64), + BodyInner::P3Stream(p3) => p3.size_hint.content_length(), + } + } + + pub fn empty() -> Self { + Body(BodyInner::Complete { + data: Bytes::new(), + trailers: None, + }) + } + + pub async fn str_contents(&mut self) -> Result<&str, Error> { + let bs = self.contents().await?; + std::str::from_utf8(bs).context("decoding body contents as string") + } + + #[cfg(feature = "json")] + pub fn from_json(data: &T) -> Result { + Ok(Self::from(serde_json::to_vec(data)?)) + } + + #[cfg(feature = "json")] + pub async fn json serde::Deserialize<'a>>(&mut self) -> Result { + let str = self.str_contents().await?; + serde_json::from_str(str).context("decoding body contents as json") + } + + pub fn from_stream(stream: S) -> Self + where + S: futures_lite::Stream + Send + 'static, + ::Item: Into, + { + use futures_lite::StreamExt; + Self::from_http_body(http_body_util::StreamBody::new( + stream.map(|bs| Ok::<_, Error>(Frame::data(bs.into()))), + )) + } + + pub fn from_try_stream(stream: S) -> Self + where + S: futures_lite::Stream> + Send + 'static, + D: Into, + E: std::error::Error + Send + Sync + 'static, + { + use futures_lite::StreamExt; + Self::from_http_body(http_body_util::StreamBody::new( + stream.map(|bs| Ok::<_, Error>(Frame::data(bs?.into()))), + )) + } + + pub fn from_http_body(http_body: B) -> Self + where + B: HttpBody + Send + 'static, + ::Data: Into, + ::Error: Into, + { + use util::BodyExt; + Body(BodyInner::Boxed( + http_body + .map_frame(|f| f.map_data(Into::into)) + .map_err(Into::into) + .boxed_unsync(), + )) + } +} + +impl From<()> for Body { + fn from(_: ()) -> Body { + Body::empty() + } +} +impl From<&[u8]> for Body { + fn from(bytes: &[u8]) -> Body { + Body::from(bytes.to_owned()) + } +} +impl From> for Body { + fn from(bytes: Vec) -> Body { + Body::from(Bytes::from(bytes)) + } +} +impl From for Body { + fn from(data: Bytes) -> Body { + Body(BodyInner::Complete { + data, + trailers: None, + }) + } +} +impl From<&str> for Body { + fn from(data: &str) -> Body { + Body::from(data.as_bytes()) + } +} +impl From for Body { + fn from(data: String) -> Body { + Body::from(data.into_bytes()) + } +} + +impl From for Body { + fn from(r: crate::io::AsyncInputStream) -> Body { + use futures_lite::stream::StreamExt; + Body(BodyInner::Boxed(http_body_util::BodyExt::boxed_unsync( + http_body_util::StreamBody::new(r.into_stream().map(|res| { + res.map(|bytevec| Frame::data(Bytes::from_owner(bytevec))) + .map_err(Into::into) + })), + ))) + } +} + +#[derive(Clone, Copy, Debug)] +pub enum BodyHint { + ContentLength(u64), + Unknown, +} + +impl BodyHint { + pub fn from_headers(headers: &HeaderMap) -> Result { + if let Some(val) = headers.get(CONTENT_LENGTH) { + let len = std::str::from_utf8(val.as_ref()) + .map_err(|_| InvalidContentLength)? + .parse::() + .map_err(|_| InvalidContentLength)?; + Ok(BodyHint::ContentLength(len)) + } else { + Ok(BodyHint::Unknown) + } + } + fn content_length(&self) -> Option { + match self { + BodyHint::ContentLength(l) => Some(*l), + _ => None, + } + } +} + +#[derive(Debug)] +pub struct InvalidContentLength; +impl fmt::Display for InvalidContentLength { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "Invalid Content-Length header") + } +} +impl std::error::Error for InvalidContentLength {} diff --git a/src/sys/p3/http/client.rs b/src/sys/p3/http/client.rs new file mode 100644 index 0000000..52b4e9f --- /dev/null +++ b/src/sys/p3/http/client.rs @@ -0,0 +1,82 @@ +use super::request::try_into_wasi_request; +use super::response::try_from_wasi_response; +use crate::http::{Body, Error, Request, Response}; +use crate::time::Duration; + +/// An HTTP client. +#[derive(Debug, Clone)] +pub struct Client { + options: Option, +} + +impl Default for Client { + fn default() -> Self { + Self::new() + } +} + +impl Client { + /// Create a new instance of `Client` + pub fn new() -> Self { + Self { options: None } + } + + /// Send an HTTP request. + pub async fn send>(&self, req: Request) -> Result, Error> { + let parts = try_into_wasi_request(req, self.options.as_ref())?; + + // Send body data through the stream writer + if let Some(mut body_writer) = parts.body_writer { + let mut body = parts.body; + let body_bytes = body.contents().await?; + if !body_bytes.is_empty() { + let remaining = body_writer.write_all(body_bytes.to_vec()).await; + if !remaining.is_empty() { + return Err(anyhow::anyhow!("failed to write full request body")); + } + } + drop(body_writer); + } + + let wasi_resp = wasip3::http::client::send(parts.request).await?; + + // Create a completion future for consuming the response body + let (_completion_writer, completion_reader) = + wasip3::wit_future::new::>(|| Ok(())); + drop(_completion_writer); + + try_from_wasi_response(wasi_resp, completion_reader) + } + + /// Set timeout on connecting to HTTP server + pub fn set_connect_timeout(&mut self, d: impl Into) { + self.options_mut().connect_timeout = Some(d.into()); + } + + /// Set timeout on recieving first byte of the Response body + pub fn set_first_byte_timeout(&mut self, d: impl Into) { + self.options_mut().first_byte_timeout = Some(d.into()); + } + + /// Set timeout on recieving subsequent chunks of bytes in the Response body stream + pub fn set_between_bytes_timeout(&mut self, d: impl Into) { + self.options_mut().between_bytes_timeout = Some(d.into()); + } + + fn options_mut(&mut self) -> &mut RequestOptions { + match &mut self.options { + Some(o) => o, + uninit => { + *uninit = Some(RequestOptions::default()); + uninit.as_mut().unwrap() + } + } + } +} + +#[derive(Default, Debug, Clone)] +pub(crate) struct RequestOptions { + pub(crate) connect_timeout: Option, + pub(crate) first_byte_timeout: Option, + pub(crate) between_bytes_timeout: Option, +} diff --git a/src/sys/p3/http/fields.rs b/src/sys/p3/http/fields.rs new file mode 100644 index 0000000..da9069d --- /dev/null +++ b/src/sys/p3/http/fields.rs @@ -0,0 +1,27 @@ +pub use http::header::{HeaderMap, HeaderName, HeaderValue}; + +use crate::http::Error; +use crate::http::error::Context; +use wasip3::http::types::Fields; + +pub(crate) fn header_map_from_wasi(wasi_fields: Fields) -> Result { + let mut output = HeaderMap::new(); + for (key, value) in wasi_fields.copy_all() { + let key = + HeaderName::from_bytes(key.as_bytes()).with_context(|| format!("header name {key}"))?; + let value = + HeaderValue::from_bytes(&value).with_context(|| format!("header value for {key}"))?; + output.append(key, value); + } + Ok(output) +} + +pub(crate) fn header_map_to_wasi(header_map: &HeaderMap) -> Result { + let wasi_fields = Fields::new(); + for (key, value) in header_map { + wasi_fields + .append(key.as_str(), value.as_bytes()) + .with_context(|| format!("wasi rejected header `{key}: {value:?}`"))? + } + Ok(wasi_fields) +} diff --git a/src/sys/p3/http/method.rs b/src/sys/p3/http/method.rs new file mode 100644 index 0000000..4aa6b84 --- /dev/null +++ b/src/sys/p3/http/method.rs @@ -0,0 +1,34 @@ +use wasip3::http::types::Method as WasiMethod; + +pub use http::Method; +use http::method::InvalidMethod; + +pub(crate) fn to_wasi_method(value: Method) -> WasiMethod { + match value { + Method::GET => WasiMethod::Get, + Method::HEAD => WasiMethod::Head, + Method::POST => WasiMethod::Post, + Method::PUT => WasiMethod::Put, + Method::DELETE => WasiMethod::Delete, + Method::CONNECT => WasiMethod::Connect, + Method::OPTIONS => WasiMethod::Options, + Method::TRACE => WasiMethod::Trace, + Method::PATCH => WasiMethod::Patch, + other => WasiMethod::Other(other.as_str().to_owned()), + } +} + +pub(crate) fn from_wasi_method(value: WasiMethod) -> Result { + Ok(match value { + WasiMethod::Get => Method::GET, + WasiMethod::Head => Method::HEAD, + WasiMethod::Post => Method::POST, + WasiMethod::Put => Method::PUT, + WasiMethod::Delete => Method::DELETE, + WasiMethod::Connect => Method::CONNECT, + WasiMethod::Options => Method::OPTIONS, + WasiMethod::Trace => Method::TRACE, + WasiMethod::Patch => Method::PATCH, + WasiMethod::Other(s) => Method::from_bytes(s.as_bytes())?, + }) +} diff --git a/src/sys/p3/http/mod.rs b/src/sys/p3/http/mod.rs new file mode 100644 index 0000000..fa0f5af --- /dev/null +++ b/src/sys/p3/http/mod.rs @@ -0,0 +1,10 @@ +pub mod body; +pub(crate) mod client; +pub(crate) mod fields; +pub(crate) mod method; +pub mod request; +pub mod response; +pub(crate) mod scheme; +pub mod server; + +pub use wasip3::http::types::{ErrorCode, HeaderError}; diff --git a/src/sys/p3/http/request.rs b/src/sys/p3/http/request.rs new file mode 100644 index 0000000..85bd13a --- /dev/null +++ b/src/sys/p3/http/request.rs @@ -0,0 +1,155 @@ +use super::fields::{header_map_from_wasi, header_map_to_wasi}; +use super::method::{from_wasi_method, to_wasi_method}; +use super::scheme::{from_wasi_scheme, to_wasi_scheme}; +use crate::http::{ + Authority, HeaderMap, PathAndQuery, Uri, + body::{Body, BodyHint}, + error::{Context, Error, ErrorCode}, +}; + +use wasip3::http::types::{ + Request as WasiRequest, RequestOptions as WasiRequestOptions, Scheme as WasiScheme, +}; + +pub use http::request::{Builder, Request}; + +/// Result of converting an http::Request into a p3 WASI Request. +pub(crate) struct WasiRequestParts { + pub request: WasiRequest, + pub body: Body, + pub body_writer: Option>, + pub _completion: wasip3::wit_bindgen::rt::async_support::FutureReader>, +} + +/// Convert an http::Request into a p3 WASI Request for sending. +pub(crate) fn try_into_wasi_request>( + request: Request, + request_options: Option<&super::client::RequestOptions>, +) -> Result { + let headers = header_map_to_wasi(request.headers())?; + let (parts, body) = request.into_parts(); + let body: Body = body.into(); + + // Create trailers future (no trailers for now) + let (trailers_writer, trailers_reader) = wasip3::wit_future::new::< + Result, ErrorCode>, + >(|| Ok(None)); + drop(trailers_writer); + + // Create body stream — keep the writer for the caller to send body data + let (body_writer, body_reader) = if body.content_length() == Some(0) { + (None, None) + } else { + let (writer, reader) = wasip3::wit_stream::new::(); + (Some(writer), Some(reader)) + }; + + let options = WasiRequestOptions::new(); + if let Some(opts) = request_options { + if let Some(timeout) = opts.connect_timeout { + let _ = options.set_connect_timeout(Some(timeout.0)); + } + if let Some(timeout) = opts.first_byte_timeout { + let _ = options.set_first_byte_timeout(Some(timeout.0)); + } + if let Some(timeout) = opts.between_bytes_timeout { + let _ = options.set_between_bytes_timeout(Some(timeout.0)); + } + } + + let (wasi_req, completion) = + WasiRequest::new(headers, body_reader, trailers_reader, Some(options)); + + // Set the HTTP method + let method = to_wasi_method(parts.method); + wasi_req + .set_method(&method) + .map_err(|()| anyhow::anyhow!("method rejected by wasi-http: {method:?}"))?; + + // Set the url scheme + let scheme = parts + .uri + .scheme() + .map(to_wasi_scheme) + .unwrap_or(WasiScheme::Https); + wasi_req + .set_scheme(Some(&scheme)) + .map_err(|()| anyhow::anyhow!("scheme rejected by wasi-http: {scheme:?}"))?; + + // Set authority + let authority = parts.uri.authority().map(Authority::as_str); + wasi_req + .set_authority(authority) + .map_err(|()| anyhow::anyhow!("authority rejected by wasi-http {authority:?}"))?; + + // Set the url path + query string + if let Some(p_and_q) = parts.uri.path_and_query() { + wasi_req + .set_path_with_query(Some(p_and_q.as_str())) + .map_err(|()| anyhow::anyhow!("path and query rejected by wasi-http {p_and_q:?}"))?; + } + + Ok(WasiRequestParts { + request: wasi_req, + body, + body_writer, + _completion: completion, + }) +} + +/// Convert a p3 WASI Request into an http::Request (for the server handler). +#[doc(hidden)] +pub fn try_from_wasi_request( + incoming: WasiRequest, + completion: wasip3::wit_bindgen::rt::async_support::FutureReader>, +) -> Result, Error> { + let headers: HeaderMap = header_map_from_wasi(incoming.get_headers()) + .context("headers provided by wasi rejected by http::HeaderMap")?; + + let method = + from_wasi_method(incoming.get_method()).map_err(|_| ErrorCode::HttpRequestMethodInvalid)?; + let scheme = incoming + .get_scheme() + .map(|scheme| { + from_wasi_scheme(scheme).context("scheme provided by wasi rejected by http::Scheme") + }) + .transpose()?; + let authority = incoming + .get_authority() + .map(|authority| { + Authority::from_maybe_shared(authority) + .context("authority provided by wasi rejected by http::Authority") + }) + .transpose()?; + let path_and_query = incoming + .get_path_with_query() + .map(|path_and_query| { + PathAndQuery::from_maybe_shared(path_and_query) + .context("path and query provided by wasi rejected by http::PathAndQuery") + }) + .transpose()?; + + let hint = BodyHint::from_headers(&headers)?; + + // Consume the request body + let (body_stream, _trailers_future) = WasiRequest::consume_body(incoming, completion); + let body = Body::from_p3_stream(body_stream, hint); + + let mut uri = Uri::builder(); + if let Some(scheme) = scheme { + uri = uri.scheme(scheme); + } + if let Some(authority) = authority { + uri = uri.authority(authority); + } + if let Some(path_and_query) = path_and_query { + uri = uri.path_and_query(path_and_query); + } + let uri = uri.build().context("building uri from wasi")?; + + let mut request = Request::builder().method(method).uri(uri); + if let Some(headers_mut) = request.headers_mut() { + *headers_mut = headers; + } + request.body(body).context("building request from wasi") +} diff --git a/src/sys/p3/http/response.rs b/src/sys/p3/http/response.rs new file mode 100644 index 0000000..d57e573 --- /dev/null +++ b/src/sys/p3/http/response.rs @@ -0,0 +1,30 @@ +use http::StatusCode; + +use super::fields::{HeaderMap, header_map_from_wasi}; +use crate::http::body::{Body, BodyHint}; +use crate::http::error::{Error, ErrorCode}; + +use wasip3::http::types::Response as WasiResponse; + +pub use http::response::{Builder, Response}; + +pub(crate) fn try_from_wasi_response( + incoming: WasiResponse, + completion: wasip3::wit_bindgen::rt::async_support::FutureReader>, +) -> Result, Error> { + let headers: HeaderMap = header_map_from_wasi(incoming.get_headers())?; + let status = StatusCode::from_u16(incoming.get_status_code()) + .map_err(|err| anyhow::anyhow!("wasi provided invalid status code ({err})"))?; + + let hint = BodyHint::from_headers(&headers)?; + + // Consume the response body + let (body_stream, _trailers_future) = WasiResponse::consume_body(incoming, completion); + let body = Body::from_p3_stream(body_stream, hint); + + let mut builder = Response::builder().status(status); + *builder.headers_mut().expect("builder has not errored") = headers; + Ok(builder + .body(body) + .expect("response builder should not error")) +} diff --git a/src/sys/p3/http/scheme.rs b/src/sys/p3/http/scheme.rs new file mode 100644 index 0000000..6e88434 --- /dev/null +++ b/src/sys/p3/http/scheme.rs @@ -0,0 +1,20 @@ +use wasip3::http::types::Scheme as WasiScheme; + +pub use http::uri::{InvalidUri, Scheme}; +use std::str::FromStr; + +pub(crate) fn to_wasi_scheme(value: &Scheme) -> WasiScheme { + match value.as_str() { + "http" => WasiScheme::Http, + "https" => WasiScheme::Https, + other => WasiScheme::Other(other.to_owned()), + } +} + +pub(crate) fn from_wasi_scheme(value: WasiScheme) -> Result { + Ok(match value { + WasiScheme::Http => Scheme::HTTP, + WasiScheme::Https => Scheme::HTTPS, + WasiScheme::Other(other) => Scheme::from_str(&other)?, + }) +} diff --git a/src/sys/p3/http/server.rs b/src/sys/p3/http/server.rs new file mode 100644 index 0000000..7cbdb56 --- /dev/null +++ b/src/sys/p3/http/server.rs @@ -0,0 +1,82 @@ +//! HTTP servers (p3) +//! +//! In p3, the handler trait is `async fn handle(Request) -> Result`. +//! The `#[wstd::http_server]` macro generates the appropriate export code. + +use super::fields::header_map_to_wasi; +use crate::http::{Body, Error, Response, error::ErrorCode}; +use http::header::CONTENT_LENGTH; +use wasip3::http::types::{Response as WasiResponse, Trailers}; + +/// Convert a wstd Response into a p3 WASI Response for the handler. +#[doc(hidden)] +pub async fn response_to_wasi>( + response: Response, +) -> Result { + let headers = response.headers(); + let status = response.status().as_u16(); + + let wasi_headers = header_map_to_wasi(headers) + .map_err(|_| ErrorCode::InternalError(Some("header error".to_string())))?; + + let mut body: Body = response.into_body().into(); + + if let Some(len) = body.content_length() { + let mut buffer = itoa::Buffer::new(); + wasi_headers + .append(CONTENT_LENGTH.as_str(), buffer.format(len).as_bytes()) + .map_err(|_| { + ErrorCode::InternalError(Some("content-length header error".to_string())) + })?; + } + + // Create body stream and write body data. + // The write must be spawned as a separate task because the stream reader + // can only make progress once the response is returned to the runtime. + // Writing inline would deadlock: write waits for reader, reader waits + // for response, response waits for write. + let body_bytes = body + .contents() + .await + .map_err(|e| ErrorCode::InternalError(Some(format!("collecting body: {e:?}"))))? + .to_vec(); + + let body_reader = if body_bytes.is_empty() { + None + } else { + let (writer, reader) = wasip3::wit_stream::new::(); + wasip3::wit_bindgen::rt::async_support::spawn(async move { + let mut writer = writer; + let remaining = writer.write_all(body_bytes).await; + if !remaining.is_empty() { + #[cfg(debug_assertions)] + panic!( + "response body write incomplete: {} bytes remaining", + remaining.len() + ); + } + }); + Some(reader) + }; + + let (trailers_writer, trailers_reader) = + wasip3::wit_future::new::, ErrorCode>>(|| Ok(None)); + drop(trailers_writer); + + let (wasi_response, _completion) = + WasiResponse::new(wasi_headers, body_reader, trailers_reader); + wasi_response + .set_status_code(status) + .map_err(|()| ErrorCode::InternalError(Some("status code error".to_string())))?; + + Ok(wasi_response) +} + +/// Convert an error to a p3 ErrorCode. +#[doc(hidden)] +pub fn error_to_wasi(err: Error) -> ErrorCode { + match err.downcast_ref::() { + Some(e) => e.clone(), + None => ErrorCode::InternalError(Some(format!("{err:?}"))), + } +} diff --git a/src/sys/p3/io.rs b/src/sys/p3/io.rs new file mode 100644 index 0000000..6fdf64c --- /dev/null +++ b/src/sys/p3/io.rs @@ -0,0 +1,211 @@ +use crate::io::{AsyncRead, AsyncWrite}; +use std::pin::Pin; +use std::task::{Context, Poll}; +use wasip3::wit_bindgen::rt::async_support::{StreamReader, StreamResult, StreamWriter}; + +/// A wrapper for a p3 `StreamReader` that provides `AsyncRead`. +pub struct AsyncInputStream { + reader: StreamReader, +} + +impl std::fmt::Debug for AsyncInputStream { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("AsyncInputStream").finish() + } +} + +impl AsyncInputStream { + /// Construct an `AsyncInputStream` from a p3 `StreamReader`. + pub fn new(reader: StreamReader) -> Self { + Self { reader } + } + + /// Asynchronously read from the input stream. + pub async fn read(&mut self, buf: &mut [u8]) -> std::io::Result { + let read_buf = Vec::with_capacity(buf.len()); + let (result, data) = self.reader.read(read_buf).await; + match result { + StreamResult::Complete(_n) => { + if data.is_empty() { + return Ok(0); + } + let len = data.len(); + buf[0..len].copy_from_slice(&data); + Ok(len) + } + StreamResult::Dropped => Ok(0), + StreamResult::Cancelled => Ok(0), + } + } + + /// Use this `AsyncInputStream` as a `futures_lite::stream::Stream` with + /// items of `Result, std::io::Error>`. + pub fn into_stream(self) -> AsyncInputChunkStream { + AsyncInputChunkStream { + stream: self, + chunk_size: 8 * 1024, + } + } + + /// Use this `AsyncInputStream` as a `futures_lite::stream::Stream` with + /// items of `Result, std::io::Error>`. The returned byte vectors + /// will be at most the `chunk_size` argument specified. + pub fn into_stream_of(self, chunk_size: usize) -> AsyncInputChunkStream { + AsyncInputChunkStream { + stream: self, + chunk_size, + } + } + + /// Use this `AsyncInputStream` as a `futures_lite::stream::Stream` with + /// items of `Result`. + pub fn into_bytestream(self) -> AsyncInputByteStream { + AsyncInputByteStream { + stream: self.into_stream(), + buffer: std::io::Read::bytes(std::io::Cursor::new(Vec::new())), + } + } +} + +impl AsyncRead for AsyncInputStream { + async fn read(&mut self, buf: &mut [u8]) -> std::io::Result { + AsyncInputStream::read(self, buf).await + } + + #[inline] + fn as_async_input_stream(&self) -> Option<&AsyncInputStream> { + Some(self) + } +} + +/// Wrapper of `AsyncInputStream` that impls `futures_lite::stream::Stream` +pub struct AsyncInputChunkStream { + stream: AsyncInputStream, + chunk_size: usize, +} + +impl AsyncInputChunkStream { + pub fn into_inner(self) -> AsyncInputStream { + self.stream + } +} + +impl futures_lite::stream::Stream for AsyncInputChunkStream { + type Item = Result, std::io::Error>; + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.get_mut(); + let read_buf = Vec::with_capacity(this.chunk_size); + let mut fut = std::pin::pin!(this.stream.reader.read(read_buf)); + match fut.as_mut().poll(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready((result, data)) => match result { + StreamResult::Complete(_) if data.is_empty() => Poll::Pending, + StreamResult::Complete(_) => Poll::Ready(Some(Ok(data))), + StreamResult::Dropped => Poll::Ready(None), + StreamResult::Cancelled => Poll::Ready(None), + }, + } + } +} + +pin_project_lite::pin_project! { + /// Wrapper of `AsyncInputStream` that impls + /// `futures_lite::stream::Stream` with item `Result`. + pub struct AsyncInputByteStream { + #[pin] + stream: AsyncInputChunkStream, + buffer: std::io::Bytes>>, + } +} + +impl AsyncInputByteStream { + pub fn into_inner(self) -> (AsyncInputStream, Vec) { + ( + self.stream.into_inner(), + self.buffer + .collect::, std::io::Error>>() + .expect("read of Cursor> is infallible"), + ) + } +} + +impl futures_lite::stream::Stream for AsyncInputByteStream { + type Item = Result; + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.project(); + match this.buffer.next() { + Some(byte) => Poll::Ready(Some(Ok(byte.expect("cursor on Vec is infallible")))), + None => match futures_lite::stream::Stream::poll_next(this.stream, cx) { + Poll::Ready(Some(Ok(bytes))) => { + let mut bytes = std::io::Read::bytes(std::io::Cursor::new(bytes)); + match bytes.next() { + Some(Ok(byte)) => { + *this.buffer = bytes; + Poll::Ready(Some(Ok(byte))) + } + Some(Err(err)) => Poll::Ready(Some(Err(err))), + None => Poll::Ready(None), + } + } + Poll::Ready(Some(Err(err))) => Poll::Ready(Some(Err(err))), + Poll::Ready(None) => Poll::Ready(None), + Poll::Pending => Poll::Pending, + }, + } + } +} + +/// A wrapper for a p3 `StreamWriter` that provides `AsyncWrite`. +pub struct AsyncOutputStream { + writer: StreamWriter, +} + +impl std::fmt::Debug for AsyncOutputStream { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("AsyncOutputStream").finish() + } +} + +impl AsyncOutputStream { + /// Construct an `AsyncOutputStream` from a p3 `StreamWriter`. + pub fn new(writer: StreamWriter) -> Self { + Self { writer } + } + + /// Asynchronously write to the output stream. + pub async fn write(&mut self, buf: &[u8]) -> std::io::Result { + let data = buf.to_vec(); + let remaining = self.writer.write_all(data).await; + Ok(buf.len() - remaining.len()) + } + + /// Asynchronously write all bytes to the output stream. + pub async fn write_all(&mut self, buf: &[u8]) -> std::io::Result<()> { + let data = buf.to_vec(); + let remaining = self.writer.write_all(data).await; + if remaining.is_empty() { + Ok(()) + } else { + Err(std::io::Error::from(std::io::ErrorKind::ConnectionReset)) + } + } + + /// Flush the output stream (no-op for p3 streams). + pub async fn flush(&mut self) -> std::io::Result<()> { + Ok(()) + } +} + +impl AsyncWrite for AsyncOutputStream { + async fn write(&mut self, buf: &[u8]) -> std::io::Result { + AsyncOutputStream::write(self, buf).await + } + async fn flush(&mut self) -> std::io::Result<()> { + AsyncOutputStream::flush(self).await + } + + #[inline] + fn as_async_output_stream(&self) -> Option<&AsyncOutputStream> { + Some(self) + } +} diff --git a/src/sys/p3/mod.rs b/src/sys/p3/mod.rs new file mode 100644 index 0000000..b8a7c47 --- /dev/null +++ b/src/sys/p3/mod.rs @@ -0,0 +1,7 @@ +pub mod http; +pub mod io; +pub mod net; +pub mod random; +pub mod runtime; +pub mod stdio; +pub mod time; diff --git a/src/sys/p3/net/mod.rs b/src/sys/p3/net/mod.rs new file mode 100644 index 0000000..d0bba06 --- /dev/null +++ b/src/sys/p3/net/mod.rs @@ -0,0 +1,27 @@ +//! Async network abstractions. + +use std::io::{self, ErrorKind}; +use wasip3::sockets::types::ErrorCode; + +mod tcp_listener; +mod tcp_stream; + +pub use tcp_listener::*; +pub use tcp_stream::*; + +fn to_io_err(err: ErrorCode) -> io::Error { + match err { + ErrorCode::AccessDenied => ErrorKind::PermissionDenied.into(), + ErrorCode::NotSupported => ErrorKind::Unsupported.into(), + ErrorCode::InvalidArgument => ErrorKind::InvalidInput.into(), + ErrorCode::OutOfMemory => ErrorKind::OutOfMemory.into(), + ErrorCode::Timeout => ErrorKind::TimedOut.into(), + ErrorCode::InvalidState => ErrorKind::InvalidData.into(), + ErrorCode::AddressInUse => ErrorKind::AddrInUse.into(), + ErrorCode::ConnectionRefused => ErrorKind::ConnectionRefused.into(), + ErrorCode::ConnectionReset => ErrorKind::ConnectionReset.into(), + ErrorCode::ConnectionAborted => ErrorKind::ConnectionAborted.into(), + ErrorCode::RemoteUnreachable => ErrorKind::HostUnreachable.into(), + _ => ErrorKind::Other.into(), + } +} diff --git a/src/sys/p3/net/tcp_listener.rs b/src/sys/p3/net/tcp_listener.rs new file mode 100644 index 0000000..7ab5596 --- /dev/null +++ b/src/sys/p3/net/tcp_listener.rs @@ -0,0 +1,126 @@ +use crate::io; +use crate::iter::AsyncIterator; +use std::net::SocketAddr; + +use super::{TcpStream, to_io_err}; +use wasip3::sockets::types::{IpAddressFamily, IpSocketAddress, Ipv4SocketAddress, TcpSocket}; +use wasip3::wit_bindgen::rt::async_support::StreamReader; + +/// A TCP socket server, listening for connections. +pub struct TcpListener { + accept_stream: StreamReader, + socket: TcpSocket, +} + +impl std::fmt::Debug for TcpListener { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("TcpListener").finish() + } +} + +impl TcpListener { + /// Creates a new TcpListener which will be bound to the specified address. + /// + /// The returned listener is ready for accepting connections. + pub async fn bind(addr: &str) -> io::Result { + let addr: SocketAddr = addr + .parse() + .map_err(|_| io::Error::other("failed to parse string to socket addr"))?; + let family = match addr { + SocketAddr::V4(_) => IpAddressFamily::Ipv4, + SocketAddr::V6(_) => IpAddressFamily::Ipv6, + }; + let socket = TcpSocket::create(family).map_err(to_io_err)?; + let local_address = sockaddr_to_wasi(addr); + socket.bind(local_address).map_err(to_io_err)?; + let accept_stream = socket.listen().map_err(to_io_err)?; + Ok(Self { + accept_stream, + socket, + }) + } + + /// Returns the local socket address of this listener. + pub fn local_addr(&self) -> io::Result { + self.socket + .get_local_address() + .map_err(to_io_err) + .map(sockaddr_from_wasi) + } + + /// Returns an iterator over the connections being received on this listener. + pub fn incoming(&mut self) -> Incoming<'_> { + Incoming { listener: self } + } +} + +/// An iterator that infinitely accepts connections on a TcpListener. +pub struct Incoming<'a> { + listener: &'a mut TcpListener, +} + +impl<'a> std::fmt::Debug for Incoming<'a> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Incoming").finish() + } +} + +impl<'a> AsyncIterator for Incoming<'a> { + type Item = io::Result; + + async fn next(&mut self) -> Option { + self.listener + .accept_stream + .next() + .await + .map(TcpStream::from_connected_socket) + } +} + +fn sockaddr_from_wasi(addr: IpSocketAddress) -> std::net::SocketAddr { + use wasip3::sockets::types::Ipv6SocketAddress; + match addr { + IpSocketAddress::Ipv4(Ipv4SocketAddress { address, port }) => { + std::net::SocketAddr::V4(std::net::SocketAddrV4::new( + std::net::Ipv4Addr::new(address.0, address.1, address.2, address.3), + port, + )) + } + IpSocketAddress::Ipv6(Ipv6SocketAddress { + address, + port, + flow_info, + scope_id, + }) => std::net::SocketAddr::V6(std::net::SocketAddrV6::new( + std::net::Ipv6Addr::new( + address.0, address.1, address.2, address.3, address.4, address.5, address.6, + address.7, + ), + port, + flow_info, + scope_id, + )), + } +} + +fn sockaddr_to_wasi(addr: std::net::SocketAddr) -> IpSocketAddress { + use wasip3::sockets::types::Ipv6SocketAddress; + match addr { + std::net::SocketAddr::V4(addr) => { + let ip = addr.ip().octets(); + IpSocketAddress::Ipv4(Ipv4SocketAddress { + address: (ip[0], ip[1], ip[2], ip[3]), + port: addr.port(), + }) + } + std::net::SocketAddr::V6(addr) => { + let ip = addr.ip().segments(); + IpSocketAddress::Ipv6(Ipv6SocketAddress { + address: (ip[0], ip[1], ip[2], ip[3], ip[4], ip[5], ip[6], ip[7]), + port: addr.port(), + flow_info: addr.flowinfo(), + scope_id: addr.scope_id(), + }) + } + } +} diff --git a/src/sys/p3/net/tcp_stream.rs b/src/sys/p3/net/tcp_stream.rs new file mode 100644 index 0000000..f049892 --- /dev/null +++ b/src/sys/p3/net/tcp_stream.rs @@ -0,0 +1,137 @@ +use std::io::ErrorKind; +use std::net::{SocketAddr, ToSocketAddrs}; + +use super::to_io_err; +use crate::io::{self, AsyncInputStream, AsyncOutputStream}; +use wasip3::sockets::types::{IpAddressFamily, IpSocketAddress, Ipv4SocketAddress, TcpSocket}; + +/// A TCP stream between a local and a remote socket. +pub struct TcpStream { + input: AsyncInputStream, + output: AsyncOutputStream, +} + +impl std::fmt::Debug for TcpStream { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("TcpStream").finish() + } +} + +impl TcpStream { + pub(crate) fn new(input: AsyncInputStream, output: AsyncOutputStream) -> Self { + TcpStream { input, output } + } + + /// Opens a TCP connection to a remote host. + pub async fn connect(addr: impl ToSocketAddrs) -> io::Result { + let addrs = addr.to_socket_addrs()?; + let mut last_err = None; + for addr in addrs { + match TcpStream::connect_addr(addr).await { + Ok(stream) => return Ok(stream), + Err(e) => last_err = Some(e), + } + } + + Err(last_err.unwrap_or_else(|| { + io::Error::new(ErrorKind::InvalidInput, "could not resolve to any address") + })) + } + + /// Establishes a connection to the specified `addr`. + pub async fn connect_addr(addr: SocketAddr) -> io::Result { + let family = match addr { + SocketAddr::V4(_) => IpAddressFamily::Ipv4, + SocketAddr::V6(_) => IpAddressFamily::Ipv6, + }; + let socket = TcpSocket::create(family).map_err(to_io_err)?; + + let remote_address = match addr { + SocketAddr::V4(addr) => { + let ip = addr.ip().octets(); + let address = (ip[0], ip[1], ip[2], ip[3]); + let port = addr.port(); + IpSocketAddress::Ipv4(Ipv4SocketAddress { port, address }) + } + SocketAddr::V6(_) => todo!("IPv6 not yet supported in `wstd::net::TcpStream`"), + }; + + // p3 connect is async + socket.connect(remote_address).await.map_err(to_io_err)?; + + Self::from_connected_socket(socket) + } + + /// Create a TcpStream from an already-connected socket. + pub(crate) fn from_connected_socket(socket: TcpSocket) -> io::Result { + // Get receive stream + let (recv_reader, _recv_completion) = socket.receive(); + let input = AsyncInputStream::new(recv_reader); + + // Create a send stream and wire it to the socket + let (send_writer, send_reader) = wasip3::wit_stream::new::(); + let _send_completion = socket.send(send_reader); + let output = AsyncOutputStream::new(send_writer); + + Ok(TcpStream::new(input, output)) + } + + pub fn split(&mut self) -> (ReadHalf<'_>, WriteHalf<'_>) { + let ptr = self as *mut TcpStream; + // Safety: ReadHalf only accesses input, WriteHalf only accesses output + #[allow(unsafe_code)] + unsafe { + (ReadHalf(&mut *ptr), WriteHalf(&mut *ptr)) + } + } +} + +impl io::AsyncRead for TcpStream { + async fn read(&mut self, buf: &mut [u8]) -> io::Result { + self.input.read(buf).await + } + + fn as_async_input_stream(&self) -> Option<&AsyncInputStream> { + Some(&self.input) + } +} + +impl io::AsyncWrite for TcpStream { + async fn write(&mut self, buf: &[u8]) -> io::Result { + self.output.write(buf).await + } + + async fn flush(&mut self) -> io::Result<()> { + self.output.flush().await + } + + fn as_async_output_stream(&self) -> Option<&AsyncOutputStream> { + Some(&self.output) + } +} + +pub struct ReadHalf<'a>(&'a mut TcpStream); +impl<'a> io::AsyncRead for ReadHalf<'a> { + async fn read(&mut self, buf: &mut [u8]) -> io::Result { + self.0.input.read(buf).await + } + + fn as_async_input_stream(&self) -> Option<&AsyncInputStream> { + Some(&self.0.input) + } +} + +pub struct WriteHalf<'a>(&'a mut TcpStream); +impl<'a> io::AsyncWrite for WriteHalf<'a> { + async fn write(&mut self, buf: &[u8]) -> io::Result { + self.0.output.write(buf).await + } + + async fn flush(&mut self) -> io::Result<()> { + self.0.output.flush().await + } + + fn as_async_output_stream(&self) -> Option<&AsyncOutputStream> { + Some(&self.0.output) + } +} diff --git a/src/sys/p3/random.rs b/src/sys/p3/random.rs new file mode 100644 index 0000000..9bddf37 --- /dev/null +++ b/src/sys/p3/random.rs @@ -0,0 +1,25 @@ +//! Random number generation. + +use wasip3::random; + +/// Fill the slice with cryptographically secure random bytes. +pub fn get_random_bytes(buf: &mut [u8]) { + match buf.len() { + 0 => {} + _ => { + let output = random::random::get_random_bytes(buf.len() as u64); + buf.copy_from_slice(&output[..]); + } + } +} + +/// Fill the slice with insecure random bytes. +pub fn get_insecure_random_bytes(buf: &mut [u8]) { + match buf.len() { + 0 => {} + _ => { + let output = random::insecure::get_insecure_random_bytes(buf.len() as u64); + buf.copy_from_slice(&output[..]); + } + } +} diff --git a/src/sys/p3/runtime.rs b/src/sys/p3/runtime.rs new file mode 100644 index 0000000..bf9e3ff --- /dev/null +++ b/src/sys/p3/runtime.rs @@ -0,0 +1,97 @@ +pub use ::async_task::Task; + +use async_task::{Runnable, Task as AsyncTask}; +use core::future::Future; +use std::cell::RefCell; +use std::collections::VecDeque; +use std::sync::{Arc, Mutex}; + +// There are no threads in WASI, so this is just a safe way to thread a single reactor to all +// use sites in the background. +std::thread_local! { + pub(crate) static REACTOR: RefCell> = const { RefCell::new(None) }; +} + +/// Start the event loop. Blocks until the future completes. +/// +/// Delegates to wit-bindgen's block_on which integrates with the component +/// model's async runtime (waitable-set polling) for native p3 async support. +pub fn block_on(fut: F) -> F::Output +where + F: Future + 'static, + F::Output: 'static, +{ + // Set up the reactor for spawn support + let reactor = Reactor::new(); + let prev = REACTOR.replace(Some(reactor)); + if prev.is_some() { + panic!("cannot wstd::runtime::block_on inside an existing block_on!") + } + + let result = wasip3::wit_bindgen::rt::async_support::block_on(fut); + + REACTOR.replace(None); + result +} + +/// Spawn a `Future` as a `Task` on the current `Reactor`. +/// +/// Panics if called from outside `block_on`. +pub fn spawn(fut: F) -> Task +where + F: std::future::Future + 'static, + T: 'static, +{ + Reactor::current().spawn(fut) +} + +/// Manage async task scheduling for WASI 0.3 +#[derive(Debug, Clone)] +pub struct Reactor { + inner: Arc, +} + +#[derive(Debug)] +struct InnerReactor { + ready_list: Mutex>, +} + +impl Reactor { + /// Return a `Reactor` for the currently running `wstd::runtime::block_on`. + /// + /// # Panic + /// This will panic if called outside of `wstd::runtime::block_on`. + pub fn current() -> Self { + REACTOR.with(|r| { + r.borrow() + .as_ref() + .expect("Reactor::current must be called within a wstd runtime") + .clone() + }) + } + + /// Create a new instance of `Reactor` + pub(crate) fn new() -> Self { + Self { + inner: Arc::new(InnerReactor { + ready_list: Mutex::new(VecDeque::new()), + }), + } + } + + /// Spawn a `Task` on the `Reactor`. + pub fn spawn(&self, fut: F) -> AsyncTask + where + F: Future + 'static, + T: 'static, + { + let this = self.clone(); + let schedule = move |runnable| this.inner.ready_list.lock().unwrap().push_back(runnable); + + // Safety: 'static constraints satisfy the lifetime requirements + #[allow(unsafe_code)] + let (runnable, task) = unsafe { async_task::spawn_unchecked(fut, schedule) }; + self.inner.ready_list.lock().unwrap().push_back(runnable); + task + } +} diff --git a/src/sys/p3/stdio.rs b/src/sys/p3/stdio.rs new file mode 100644 index 0000000..25bdaad --- /dev/null +++ b/src/sys/p3/stdio.rs @@ -0,0 +1,157 @@ +use crate::io::{AsyncInputStream, AsyncOutputStream, AsyncRead, AsyncWrite, Result}; +use std::cell::LazyCell; +use wasip3::cli::terminal_input::TerminalInput; +use wasip3::cli::terminal_output::TerminalOutput; + +/// Use the program's stdin as an `AsyncInputStream`. +#[derive(Debug)] +pub struct Stdin { + stream: AsyncInputStream, + terminput: LazyCell>, +} + +/// Get the program's stdin for use as an `AsyncInputStream`. +pub fn stdin() -> Stdin { + let (reader, _completion) = wasip3::cli::stdin::read_via_stream(); + let stream = AsyncInputStream::new(reader); + Stdin { + stream, + terminput: LazyCell::new(wasip3::cli::terminal_stdin::get_terminal_stdin), + } +} + +impl Stdin { + /// Check if stdin is a terminal. + pub fn is_terminal(&self) -> bool { + LazyCell::force(&self.terminput).is_some() + } + + /// Get the `AsyncInputStream` used to implement `Stdin` + pub fn into_inner(self) -> AsyncInputStream { + self.stream + } +} + +impl AsyncRead for Stdin { + #[inline] + async fn read(&mut self, buf: &mut [u8]) -> Result { + self.stream.read(buf).await + } + + #[inline] + async fn read_to_end(&mut self, buf: &mut Vec) -> Result { + self.stream.read_to_end(buf).await + } + + #[inline] + fn as_async_input_stream(&self) -> Option<&AsyncInputStream> { + Some(&self.stream) + } +} + +/// Use the program's stdout as an `AsyncOutputStream`. +#[derive(Debug)] +pub struct Stdout { + stream: AsyncOutputStream, + termoutput: LazyCell>, +} + +/// Get the program's stdout for use as an `AsyncOutputStream`. +pub fn stdout() -> Stdout { + let (writer, reader) = wasip3::wit_stream::new::(); + // Wire the reader end to the WASI stdout sink. The returned future resolves + // when the stream is fully consumed; we intentionally leak it so the pipe + // stays open for the lifetime of the program. + let _completion = wasip3::cli::stdout::write_via_stream(reader); + let stream = AsyncOutputStream::new(writer); + Stdout { + stream, + termoutput: LazyCell::new(wasip3::cli::terminal_stdout::get_terminal_stdout), + } +} + +impl Stdout { + /// Check if stdout is a terminal. + pub fn is_terminal(&self) -> bool { + LazyCell::force(&self.termoutput).is_some() + } + + /// Get the `AsyncOutputStream` used to implement `Stdout` + pub fn into_inner(self) -> AsyncOutputStream { + self.stream + } +} + +impl AsyncWrite for Stdout { + #[inline] + async fn write(&mut self, buf: &[u8]) -> Result { + self.stream.write(buf).await + } + + #[inline] + async fn flush(&mut self) -> Result<()> { + self.stream.flush().await + } + + #[inline] + async fn write_all(&mut self, buf: &[u8]) -> Result<()> { + self.stream.write_all(buf).await + } + + #[inline] + fn as_async_output_stream(&self) -> Option<&AsyncOutputStream> { + self.stream.as_async_output_stream() + } +} + +/// Use the program's stderr as an `AsyncOutputStream`. +#[derive(Debug)] +pub struct Stderr { + stream: AsyncOutputStream, + termoutput: LazyCell>, +} + +/// Get the program's stderr for use as an `AsyncOutputStream`. +pub fn stderr() -> Stderr { + let (writer, reader) = wasip3::wit_stream::new::(); + let _completion = wasip3::cli::stderr::write_via_stream(reader); + let stream = AsyncOutputStream::new(writer); + Stderr { + stream, + termoutput: LazyCell::new(wasip3::cli::terminal_stderr::get_terminal_stderr), + } +} + +impl Stderr { + /// Check if stderr is a terminal. + pub fn is_terminal(&self) -> bool { + LazyCell::force(&self.termoutput).is_some() + } + + /// Get the `AsyncOutputStream` used to implement `Stderr` + pub fn into_inner(self) -> AsyncOutputStream { + self.stream + } +} + +impl AsyncWrite for Stderr { + #[inline] + async fn write(&mut self, buf: &[u8]) -> Result { + self.stream.write(buf).await + } + + #[inline] + async fn flush(&mut self) -> Result<()> { + self.stream.flush().await + } + + #[inline] + async fn write_all(&mut self, buf: &[u8]) -> Result<()> { + self.stream.write_all(buf).await + } + + #[inline] + fn as_async_output_stream(&self) -> Option<&AsyncOutputStream> { + self.stream.as_async_output_stream() + } +} diff --git a/src/sys/p3/time.rs b/src/sys/p3/time.rs new file mode 100644 index 0000000..629b4a2 --- /dev/null +++ b/src/sys/p3/time.rs @@ -0,0 +1,51 @@ +use wasip3::clocks::{monotonic_clock, system_clock}; + +/// A measurement of a monotonically nondecreasing clock. Opaque and useful only +/// with Duration. +pub type MonotonicInstant = monotonic_clock::Mark; + +/// A duration from the monotonic clock, in nanoseconds. +pub type MonotonicDuration = monotonic_clock::Duration; + +/// Return the current monotonic clock instant. +pub fn now() -> MonotonicInstant { + monotonic_clock::now() +} + +/// A measurement of the system clock, useful for talking to external entities +/// like the file system or other processes. May be converted losslessly to a +/// more useful `std::time::SystemTime` to provide more methods. +#[derive(Debug, Clone, Copy)] +#[allow(dead_code)] +pub struct SystemTime(system_clock::Instant); + +impl SystemTime { + pub fn now() -> Self { + Self(system_clock::now()) + } +} + +impl From for std::time::SystemTime { + fn from(st: SystemTime) -> Self { + // p3 system_clock::Instant has i64 seconds + if st.0.seconds >= 0 { + std::time::SystemTime::UNIX_EPOCH + + std::time::Duration::from_secs(st.0.seconds as u64) + + std::time::Duration::from_nanos(st.0.nanoseconds.into()) + } else { + std::time::SystemTime::UNIX_EPOCH + - std::time::Duration::from_secs((-st.0.seconds) as u64) + + std::time::Duration::from_nanos(st.0.nanoseconds.into()) + } + } +} + +/// Create a timer that fires at a specific monotonic clock instant. +pub async fn timer_wait_until(instant: MonotonicInstant) { + monotonic_clock::wait_until(instant).await +} + +/// Create a timer that fires after a monotonic clock duration. +pub async fn timer_wait_for(duration: MonotonicDuration) { + monotonic_clock::wait_for(duration).await +} diff --git a/src/time.rs b/src/time.rs new file mode 100644 index 0000000..9844aea --- /dev/null +++ b/src/time.rs @@ -0,0 +1,494 @@ +//! Async time interfaces. + +pub(crate) mod utils { + use std::io; + + pub(crate) fn timeout_err(msg: &'static str) -> io::Error { + io::Error::new(io::ErrorKind::TimedOut, msg) + } +} + +#[cfg(wstd_p2)] +use pin_project_lite::pin_project; +use std::future::{Future, IntoFuture}; +use std::ops::{Add, AddAssign, Sub, SubAssign}; +use std::pin::Pin; +use std::task::{Context, Poll}; + +use crate::iter::AsyncIterator; + +#[cfg(wstd_p2)] +use crate::runtime::AsyncPollable; + +pub use crate::sys::time::SystemTime; + +// ---- Duration ---- + +/// A Duration type to represent a span of time, typically used for system +/// timeouts. +/// +/// This type wraps `std::time::Duration` so we can implement traits on it +/// without coherence issues, just like if we were implementing this in the +/// stdlib. +#[derive(Debug, PartialEq, PartialOrd, Ord, Eq, Hash, Clone, Copy)] +pub struct Duration(pub(crate) crate::sys::time::MonotonicDuration); +impl Duration { + /// Creates a new `Duration` from the specified number of whole seconds and + /// additional nanoseconds. + #[must_use] + #[inline] + pub fn new(secs: u64, nanos: u32) -> Duration { + std::time::Duration::new(secs, nanos).into() + } + + /// Creates a new `Duration` from the specified number of whole seconds. + #[must_use] + #[inline] + pub fn from_secs(secs: u64) -> Duration { + std::time::Duration::from_secs(secs).into() + } + + /// Creates a new `Duration` from the specified number of milliseconds. + #[must_use] + #[inline] + pub fn from_millis(millis: u64) -> Self { + std::time::Duration::from_millis(millis).into() + } + + /// Creates a new `Duration` from the specified number of microseconds. + #[must_use] + #[inline] + pub fn from_micros(micros: u64) -> Self { + std::time::Duration::from_micros(micros).into() + } + + /// Creates a new `Duration` from the specified number of nanoseconds. + #[must_use] + #[inline] + pub fn from_nanos(nanos: u64) -> Self { + std::time::Duration::from_nanos(nanos).into() + } + + /// Creates a new `Duration` from the specified number of seconds represented + /// as `f64`. + /// + /// # Panics + /// This constructor will panic if `secs` is not finite, negative or overflows `Duration`. + /// + /// # Examples + /// ```no_run + /// use wstd::time::Duration; + /// + /// let dur = Duration::from_secs_f64(2.7); + /// assert_eq!(dur, Duration::new(2, 700_000_000)); + /// ``` + #[must_use] + #[inline] + pub fn from_secs_f64(secs: f64) -> Duration { + std::time::Duration::from_secs_f64(secs).into() + } + + /// Creates a new `Duration` from the specified number of seconds represented + /// as `f32`. + /// + /// # Panics + /// This constructor will panic if `secs` is not finite, negative or overflows `Duration`. + #[must_use] + #[inline] + pub fn from_secs_f32(secs: f32) -> Duration { + std::time::Duration::from_secs_f32(secs).into() + } + + /// Returns the number of whole seconds contained by this `Duration`. + #[must_use] + #[inline] + pub const fn as_secs(&self) -> u64 { + self.0 / 1_000_000_000 + } + + /// Returns the number of whole milliseconds contained by this `Duration`. + #[must_use] + #[inline] + pub const fn as_millis(&self) -> u128 { + (self.0 / 1_000_000) as u128 + } + + /// Returns the number of whole microseconds contained by this `Duration`. + #[must_use] + #[inline] + pub const fn as_micros(&self) -> u128 { + (self.0 / 1_000) as u128 + } + + /// Returns the total number of nanoseconds contained by this `Duration`. + #[must_use] + #[inline] + pub const fn as_nanos(&self) -> u128 { + self.0 as u128 + } +} + +impl From for Duration { + fn from(inner: std::time::Duration) -> Self { + Self( + inner + .as_nanos() + .try_into() + .expect("only dealing with durations that can fit in u64"), + ) + } +} + +impl From for std::time::Duration { + fn from(duration: Duration) -> Self { + Self::from_nanos(duration.0) + } +} + +impl Add for Duration { + type Output = Self; + + fn add(self, rhs: Duration) -> Self::Output { + Self(self.0 + rhs.0) + } +} + +impl AddAssign for Duration { + fn add_assign(&mut self, rhs: Duration) { + *self = Self(self.0 + rhs.0) + } +} + +impl Sub for Duration { + type Output = Self; + + fn sub(self, rhs: Duration) -> Self::Output { + Self(self.0 - rhs.0) + } +} + +impl SubAssign for Duration { + fn sub_assign(&mut self, rhs: Duration) { + *self = Self(self.0 - rhs.0) + } +} + +impl IntoFuture for Duration { + type Output = Instant; + + type IntoFuture = Wait; + + fn into_future(self) -> Self::IntoFuture { + crate::task::sleep(self) + } +} + +// ---- Instant ---- + +/// A measurement of a monotonically nondecreasing clock. Opaque and useful only +/// with Duration. +/// +/// This type wraps `std::time::Duration` so we can implement traits on it +/// without coherence issues, just like if we were implementing this in the +/// stdlib. +#[derive(Debug, PartialEq, PartialOrd, Ord, Eq, Hash, Clone, Copy)] +pub struct Instant(pub(crate) crate::sys::time::MonotonicInstant); + +impl Instant { + /// Returns an instant corresponding to "now". + /// + /// # Examples + /// + /// ```no_run + /// use wstd::time::Instant; + /// + /// let now = Instant::now(); + /// ``` + #[must_use] + pub fn now() -> Self { + Instant(crate::sys::time::now()) + } + + /// Returns the amount of time elapsed from another instant to this one, or zero duration if + /// that instant is later than this one. + pub fn duration_since(&self, earlier: Instant) -> Duration { + Duration::from_nanos(self.0.saturating_sub(earlier.0)) + } + + /// Returns the amount of time elapsed since this instant. + pub fn elapsed(&self) -> Duration { + Instant::now().duration_since(*self) + } +} + +impl Add for Instant { + type Output = Self; + + fn add(self, rhs: Duration) -> Self::Output { + Self(self.0 + rhs.0) + } +} + +impl AddAssign for Instant { + fn add_assign(&mut self, rhs: Duration) { + *self = Self(self.0 + rhs.0) + } +} + +impl Sub for Instant { + type Output = Self; + + fn sub(self, rhs: Duration) -> Self::Output { + Self(self.0 - rhs.0) + } +} + +impl SubAssign for Instant { + fn sub_assign(&mut self, rhs: Duration) { + *self = Self(self.0 - rhs.0) + } +} + +impl IntoFuture for Instant { + type Output = Instant; + + type IntoFuture = Wait; + + fn into_future(self) -> Self::IntoFuture { + crate::task::sleep_until(self) + } +} + +// ---- Timer / Interval ---- + +/// An async iterator representing notifications at fixed interval. +pub fn interval(duration: Duration) -> Interval { + Interval { duration } +} + +/// An async iterator representing notifications at fixed interval. +/// +/// See the [`interval`] function for more. +#[derive(Debug)] +pub struct Interval { + duration: Duration, +} +impl AsyncIterator for Interval { + type Item = Instant; + + async fn next(&mut self) -> Option { + Some(Timer::after(self.duration).wait().await) + } +} + +// ---- Timer (p2) ---- + +#[cfg(wstd_p2)] +#[derive(Debug)] +pub struct Timer(Option); + +#[cfg(wstd_p2)] +impl Timer { + pub fn never() -> Timer { + Timer(None) + } + pub fn at(deadline: Instant) -> Timer { + let pollable = crate::sys::time::subscribe_at(deadline.0); + Timer(Some(pollable)) + } + pub fn after(duration: Duration) -> Timer { + let pollable = crate::sys::time::subscribe_after(duration.0); + Timer(Some(pollable)) + } + pub fn set_after(&mut self, duration: Duration) { + *self = Self::after(duration); + } + pub fn wait(&self) -> Wait { + let wait_for = self.0.as_ref().map(AsyncPollable::wait_for); + Wait { wait_for } + } +} + +#[cfg(wstd_p2)] +pin_project! { + /// Future created by [`Timer::wait`] + #[must_use = "futures do nothing unless polled or .awaited"] + pub struct Wait { + #[pin] + wait_for: Option + } +} + +#[cfg(wstd_p2)] +impl Future for Wait { + type Output = Instant; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + match this.wait_for.as_pin_mut() { + None => Poll::Pending, + Some(f) => match f.poll(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(()) => Poll::Ready(Instant::now()), + }, + } + } +} + +// ---- Timer (p3) ---- + +#[cfg(wstd_p3)] +pub struct Timer { + kind: TimerKind, +} + +#[cfg(wstd_p3)] +enum TimerKind { + Never, + After(Duration), + At(Instant), +} + +#[cfg(wstd_p3)] +impl std::fmt::Debug for Timer { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Timer").finish() + } +} + +#[cfg(wstd_p3)] +impl Timer { + pub fn never() -> Timer { + Timer { + kind: TimerKind::Never, + } + } + pub fn at(deadline: Instant) -> Timer { + Timer { + kind: TimerKind::At(deadline), + } + } + pub fn after(duration: Duration) -> Timer { + Timer { + kind: TimerKind::After(duration), + } + } + pub fn set_after(&mut self, duration: Duration) { + *self = Self::after(duration); + } + pub fn wait(&self) -> Wait { + let inner: Pin>> = match self.kind { + TimerKind::Never => Box::pin(std::future::pending()), + TimerKind::After(d) => Box::pin(crate::sys::time::timer_wait_for(d.0)), + TimerKind::At(deadline) => Box::pin(crate::sys::time::timer_wait_until(deadline.0)), + }; + Wait { inner } + } +} + +#[cfg(wstd_p3)] +#[must_use = "futures do nothing unless polled or .awaited"] +pub struct Wait { + inner: Pin>>, +} + +#[cfg(wstd_p3)] +impl Future for Wait { + type Output = Instant; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match self.inner.as_mut().poll(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(()) => Poll::Ready(Instant::now()), + } + } +} + +#[cfg(test)] +mod test { + use super::*; + + async fn debug_duration(what: &str, f: impl Future) { + let start = Instant::now(); + let now = f.await; + let d = now.duration_since(start); + let d: std::time::Duration = d.into(); + println!("{what} awaited for {} s", d.as_secs_f32()); + } + + #[test] + fn timer_now() { + crate::runtime::block_on(debug_duration("timer_now", async { + Timer::at(Instant::now()).wait().await + })); + } + + #[test] + fn timer_after_100_milliseconds() { + crate::runtime::block_on(debug_duration("timer_after_100_milliseconds", async { + Timer::after(Duration::from_millis(100)).wait().await + })); + } + + #[test] + fn test_duration_since() { + let x = Instant::now(); + let d = Duration::new(456, 789); + let y = x + d; + assert_eq!(y.duration_since(x), d); + } + + #[test] + fn test_new_from_as() { + assert_eq!(Duration::new(456, 864209753).as_secs(), 456); + assert_eq!(Duration::new(456, 864209753).as_millis(), 456864); + assert_eq!(Duration::new(456, 864209753).as_micros(), 456864209); + assert_eq!(Duration::new(456, 864209753).as_nanos(), 456864209753); + + assert_eq!(Duration::from_secs(9876543210).as_secs(), 9876543210); + assert_eq!( + Duration::from_secs(9876543210).as_millis(), + 9_876_543_210_000 + ); + assert_eq!( + Duration::from_secs(9876543210).as_micros(), + 9_876_543_210_000_000 + ); + assert_eq!( + Duration::from_secs(9876543210).as_nanos(), + 9_876_543_210_000_000_000 + ); + + assert_eq!(Duration::from_millis(9876543210).as_secs(), 9876543); + assert_eq!(Duration::from_millis(9876543210).as_millis(), 9876543210); + assert_eq!( + Duration::from_millis(9876543210).as_micros(), + 9_876_543_210_000 + ); + assert_eq!( + Duration::from_millis(9876543210).as_nanos(), + 9_876_543_210_000_000 + ); + + assert_eq!(Duration::from_micros(9876543210).as_secs(), 9876); + assert_eq!(Duration::from_micros(9876543210).as_millis(), 9876543); + assert_eq!(Duration::from_micros(9876543210).as_micros(), 9876543210); + assert_eq!( + Duration::from_micros(9876543210).as_nanos(), + 9_876_543_210_000 + ); + + assert_eq!(Duration::from_nanos(9876543210).as_secs(), 9); + assert_eq!(Duration::from_nanos(9876543210).as_millis(), 9876); + assert_eq!(Duration::from_nanos(9876543210).as_micros(), 9876543); + assert_eq!(Duration::from_nanos(9876543210).as_nanos(), 9876543210); + } + + #[test] + fn test_from_secs_float() { + assert_eq!(Duration::from_secs_f64(158.9).as_secs(), 158); + assert_eq!(Duration::from_secs_f32(158.9).as_secs(), 158); + assert_eq!(Duration::from_secs_f64(159.1).as_secs(), 159); + assert_eq!(Duration::from_secs_f32(159.1).as_secs(), 159); + } +} diff --git a/src/time/duration.rs b/src/time/duration.rs deleted file mode 100644 index 7f67ceb..0000000 --- a/src/time/duration.rs +++ /dev/null @@ -1,216 +0,0 @@ -use super::{Instant, Wait}; -use std::future::IntoFuture; -use std::ops::{Add, AddAssign, Sub, SubAssign}; -use wasip2::clocks::monotonic_clock; - -/// A Duration type to represent a span of time, typically used for system -/// timeouts. -/// -/// This type wraps `std::time::Duration` so we can implement traits on it -/// without coherence issues, just like if we were implementing this in the -/// stdlib. -#[derive(Debug, PartialEq, PartialOrd, Ord, Eq, Hash, Clone, Copy)] -pub struct Duration(pub(crate) monotonic_clock::Duration); -impl Duration { - /// Creates a new `Duration` from the specified number of whole seconds and - /// additional nanoseconds. - #[must_use] - #[inline] - pub fn new(secs: u64, nanos: u32) -> Duration { - std::time::Duration::new(secs, nanos).into() - } - - /// Creates a new `Duration` from the specified number of whole seconds. - #[must_use] - #[inline] - pub fn from_secs(secs: u64) -> Duration { - std::time::Duration::from_secs(secs).into() - } - - /// Creates a new `Duration` from the specified number of milliseconds. - #[must_use] - #[inline] - pub fn from_millis(millis: u64) -> Self { - std::time::Duration::from_millis(millis).into() - } - - /// Creates a new `Duration` from the specified number of microseconds. - #[must_use] - #[inline] - pub fn from_micros(micros: u64) -> Self { - std::time::Duration::from_micros(micros).into() - } - - /// Creates a new `Duration` from the specified number of nanoseconds. - #[must_use] - #[inline] - pub fn from_nanos(nanos: u64) -> Self { - std::time::Duration::from_nanos(nanos).into() - } - - /// Creates a new `Duration` from the specified number of seconds represented - /// as `f64`. - /// - /// # Panics - /// This constructor will panic if `secs` is not finite, negative or overflows `Duration`. - /// - /// # Examples - /// ```no_run - /// use wstd::time::Duration; - /// - /// let dur = Duration::from_secs_f64(2.7); - /// assert_eq!(dur, Duration::new(2, 700_000_000)); - /// ``` - #[must_use] - #[inline] - pub fn from_secs_f64(secs: f64) -> Duration { - std::time::Duration::from_secs_f64(secs).into() - } - - /// Creates a new `Duration` from the specified number of seconds represented - /// as `f32`. - /// - /// # Panics - /// This constructor will panic if `secs` is not finite, negative or overflows `Duration`. - #[must_use] - #[inline] - pub fn from_secs_f32(secs: f32) -> Duration { - std::time::Duration::from_secs_f32(secs).into() - } - - /// Returns the number of whole seconds contained by this `Duration`. - #[must_use] - #[inline] - pub const fn as_secs(&self) -> u64 { - self.0 / 1_000_000_000 - } - - /// Returns the number of whole milliseconds contained by this `Duration`. - #[must_use] - #[inline] - pub const fn as_millis(&self) -> u128 { - (self.0 / 1_000_000) as u128 - } - - /// Returns the number of whole microseconds contained by this `Duration`. - #[must_use] - #[inline] - pub const fn as_micros(&self) -> u128 { - (self.0 / 1_000) as u128 - } - - /// Returns the total number of nanoseconds contained by this `Duration`. - #[must_use] - #[inline] - pub const fn as_nanos(&self) -> u128 { - self.0 as u128 - } -} - -impl From for Duration { - fn from(inner: std::time::Duration) -> Self { - Self( - inner - .as_nanos() - .try_into() - .expect("only dealing with durations that can fit in u64"), - ) - } -} - -impl From for std::time::Duration { - fn from(duration: Duration) -> Self { - Self::from_nanos(duration.0) - } -} - -impl Add for Duration { - type Output = Self; - - fn add(self, rhs: Duration) -> Self::Output { - Self(self.0 + rhs.0) - } -} - -impl AddAssign for Duration { - fn add_assign(&mut self, rhs: Duration) { - *self = Self(self.0 + rhs.0) - } -} - -impl Sub for Duration { - type Output = Self; - - fn sub(self, rhs: Duration) -> Self::Output { - Self(self.0 - rhs.0) - } -} - -impl SubAssign for Duration { - fn sub_assign(&mut self, rhs: Duration) { - *self = Self(self.0 - rhs.0) - } -} - -impl IntoFuture for Duration { - type Output = Instant; - - type IntoFuture = Wait; - - fn into_future(self) -> Self::IntoFuture { - crate::task::sleep(self) - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_new_from_as() { - assert_eq!(Duration::new(456, 864209753).as_secs(), 456); - assert_eq!(Duration::new(456, 864209753).as_millis(), 456864); - assert_eq!(Duration::new(456, 864209753).as_micros(), 456864209); - assert_eq!(Duration::new(456, 864209753).as_nanos(), 456864209753); - - assert_eq!(Duration::from_secs(9876543210).as_secs(), 9876543210); - assert_eq!(Duration::from_secs(9876543210).as_millis(), 9876543210_000); - assert_eq!( - Duration::from_secs(9876543210).as_micros(), - 9876543210_000000 - ); - assert_eq!( - Duration::from_secs(9876543210).as_nanos(), - 9876543210_000000000 - ); - - assert_eq!(Duration::from_millis(9876543210).as_secs(), 9876543); - assert_eq!(Duration::from_millis(9876543210).as_millis(), 9876543210); - assert_eq!( - Duration::from_millis(9876543210).as_micros(), - 9876543210_000 - ); - assert_eq!( - Duration::from_millis(9876543210).as_nanos(), - 9876543210_000000 - ); - - assert_eq!(Duration::from_micros(9876543210).as_secs(), 9876); - assert_eq!(Duration::from_micros(9876543210).as_millis(), 9876543); - assert_eq!(Duration::from_micros(9876543210).as_micros(), 9876543210); - assert_eq!(Duration::from_micros(9876543210).as_nanos(), 9876543210_000); - - assert_eq!(Duration::from_nanos(9876543210).as_secs(), 9); - assert_eq!(Duration::from_nanos(9876543210).as_millis(), 9876); - assert_eq!(Duration::from_nanos(9876543210).as_micros(), 9876543); - assert_eq!(Duration::from_nanos(9876543210).as_nanos(), 9876543210); - } - - #[test] - fn test_from_secs_float() { - assert_eq!(Duration::from_secs_f64(158.9).as_secs(), 158); - assert_eq!(Duration::from_secs_f32(158.9).as_secs(), 158); - assert_eq!(Duration::from_secs_f64(159.1).as_secs(), 159); - assert_eq!(Duration::from_secs_f32(159.1).as_secs(), 159); - } -} diff --git a/src/time/instant.rs b/src/time/instant.rs deleted file mode 100644 index 6e9cf97..0000000 --- a/src/time/instant.rs +++ /dev/null @@ -1,91 +0,0 @@ -use super::{Duration, Wait}; -use std::future::IntoFuture; -use std::ops::{Add, AddAssign, Sub, SubAssign}; -use wasip2::clocks::monotonic_clock; - -/// A measurement of a monotonically nondecreasing clock. Opaque and useful only -/// with Duration. -/// -/// This type wraps `std::time::Duration` so we can implement traits on it -/// without coherence issues, just like if we were implementing this in the -/// stdlib. -#[derive(Debug, PartialEq, PartialOrd, Ord, Eq, Hash, Clone, Copy)] -pub struct Instant(pub(crate) monotonic_clock::Instant); - -impl Instant { - /// Returns an instant corresponding to "now". - /// - /// # Examples - /// - /// ```no_run - /// use wstd::time::Instant; - /// - /// let now = Instant::now(); - /// ``` - #[must_use] - pub fn now() -> Self { - Instant(wasip2::clocks::monotonic_clock::now()) - } - - /// Returns the amount of time elapsed from another instant to this one, or zero duration if - /// that instant is later than this one. - pub fn duration_since(&self, earlier: Instant) -> Duration { - Duration::from_nanos(self.0.saturating_sub(earlier.0)) - } - - /// Returns the amount of time elapsed since this instant. - pub fn elapsed(&self) -> Duration { - Instant::now().duration_since(*self) - } -} - -impl Add for Instant { - type Output = Self; - - fn add(self, rhs: Duration) -> Self::Output { - Self(self.0 + rhs.0) - } -} - -impl AddAssign for Instant { - fn add_assign(&mut self, rhs: Duration) { - *self = Self(self.0 + rhs.0) - } -} - -impl Sub for Instant { - type Output = Self; - - fn sub(self, rhs: Duration) -> Self::Output { - Self(self.0 - rhs.0) - } -} - -impl SubAssign for Instant { - fn sub_assign(&mut self, rhs: Duration) { - *self = Self(self.0 - rhs.0) - } -} - -impl IntoFuture for Instant { - type Output = Instant; - - type IntoFuture = Wait; - - fn into_future(self) -> Self::IntoFuture { - crate::task::sleep_until(self) - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_duration_since() { - let x = Instant::now(); - let d = Duration::new(456, 789); - let y = x + d; - assert_eq!(y.duration_since(x), d); - } -} diff --git a/src/time/mod.rs b/src/time/mod.rs deleted file mode 100644 index db0e1b3..0000000 --- a/src/time/mod.rs +++ /dev/null @@ -1,138 +0,0 @@ -//! Async time interfaces. - -pub(crate) mod utils; - -mod duration; -mod instant; -pub use duration::Duration; -pub use instant::Instant; - -use pin_project_lite::pin_project; -use std::future::Future; -use std::pin::Pin; -use std::task::{Context, Poll}; -use wasip2::clocks::{ - monotonic_clock::{subscribe_duration, subscribe_instant}, - wall_clock, -}; - -use crate::{ - iter::AsyncIterator, - runtime::{AsyncPollable, Reactor}, -}; - -/// A measurement of the system clock, useful for talking to external entities -/// like the file system or other processes. May be converted losslessly to a -/// more useful `std::time::SystemTime` to provide more methods. -#[derive(Debug, Clone, Copy)] -#[allow(dead_code)] -pub struct SystemTime(wall_clock::Datetime); - -impl SystemTime { - pub fn now() -> Self { - Self(wall_clock::now()) - } -} - -impl From for std::time::SystemTime { - fn from(st: SystemTime) -> Self { - std::time::SystemTime::UNIX_EPOCH - + std::time::Duration::from_secs(st.0.seconds) - + std::time::Duration::from_nanos(st.0.nanoseconds.into()) - } -} - -/// An async iterator representing notifications at fixed interval. -pub fn interval(duration: Duration) -> Interval { - Interval { duration } -} - -/// An async iterator representing notifications at fixed interval. -/// -/// See the [`interval`] function for more. -#[derive(Debug)] -pub struct Interval { - duration: Duration, -} -impl AsyncIterator for Interval { - type Item = Instant; - - async fn next(&mut self) -> Option { - Some(Timer::after(self.duration).wait().await) - } -} - -#[derive(Debug)] -pub struct Timer(Option); - -impl Timer { - pub fn never() -> Timer { - Timer(None) - } - pub fn at(deadline: Instant) -> Timer { - let pollable = Reactor::current().schedule(subscribe_instant(deadline.0)); - Timer(Some(pollable)) - } - pub fn after(duration: Duration) -> Timer { - let pollable = Reactor::current().schedule(subscribe_duration(duration.0)); - Timer(Some(pollable)) - } - pub fn set_after(&mut self, duration: Duration) { - *self = Self::after(duration); - } - pub fn wait(&self) -> Wait { - let wait_for = self.0.as_ref().map(AsyncPollable::wait_for); - Wait { wait_for } - } -} - -pin_project! { - /// Future created by [`Timer::wait`] - #[must_use = "futures do nothing unless polled or .awaited"] - pub struct Wait { - #[pin] - wait_for: Option - } -} - -impl Future for Wait { - type Output = Instant; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.project(); - match this.wait_for.as_pin_mut() { - None => Poll::Pending, - Some(f) => match f.poll(cx) { - Poll::Pending => Poll::Pending, - Poll::Ready(()) => Poll::Ready(Instant::now()), - }, - } - } -} - -#[cfg(test)] -mod test { - use super::*; - - async fn debug_duration(what: &str, f: impl Future) { - let start = Instant::now(); - let now = f.await; - let d = now.duration_since(start); - let d: std::time::Duration = d.into(); - println!("{what} awaited for {} s", d.as_secs_f32()); - } - - #[test] - fn timer_now() { - crate::runtime::block_on(debug_duration("timer_now", async { - Timer::at(Instant::now()).wait().await - })); - } - - #[test] - fn timer_after_100_milliseconds() { - crate::runtime::block_on(debug_duration("timer_after_100_milliseconds", async { - Timer::after(Duration::from_millis(100)).wait().await - })); - } -} diff --git a/src/time/utils.rs b/src/time/utils.rs deleted file mode 100644 index e6e3993..0000000 --- a/src/time/utils.rs +++ /dev/null @@ -1,5 +0,0 @@ -use std::io; - -pub(crate) fn timeout_err(msg: &'static str) -> io::Error { - io::Error::new(io::ErrorKind::TimedOut, msg) -} diff --git a/tests/sleep.rs b/tests/sleep.rs index eb55ea0..3217a42 100644 --- a/tests/sleep.rs +++ b/tests/sleep.rs @@ -1,9 +1,25 @@ use std::error::Error; use wstd::task::sleep; -use wstd::time::Duration; +use wstd::time::{Duration, Instant}; #[wstd::test] async fn just_sleep() -> Result<(), Box> { sleep(Duration::from_secs(1)).await; Ok(()) } + +#[wstd::test] +async fn sleep_elapsed() -> Result<(), Box> { + let start = Instant::now(); + sleep(Duration::from_millis(100)).await; + let elapsed = start.elapsed(); + assert!( + elapsed >= Duration::from_millis(80), + "sleep: elapsed {elapsed:?} should be >= 80ms" + ); + assert!( + elapsed < Duration::from_secs(2), + "sleep: elapsed {elapsed:?} should be < 2s" + ); + Ok(()) +} diff --git a/tests/stdio.rs b/tests/stdio.rs new file mode 100644 index 0000000..a87ae3c --- /dev/null +++ b/tests/stdio.rs @@ -0,0 +1,18 @@ +use std::error::Error; +use wstd::io::{AsyncWrite, stderr, stdout}; + +#[wstd::test] +async fn write_stdout() -> Result<(), Box> { + let mut out = stdout(); + out.write_all(b"hello from stdout\n").await?; + out.flush().await?; + Ok(()) +} + +#[wstd::test] +async fn write_stderr() -> Result<(), Box> { + let mut err = stderr(); + err.write_all(b"hello from stderr\n").await?; + err.flush().await?; + Ok(()) +} diff --git a/tests/timer.rs b/tests/timer.rs new file mode 100644 index 0000000..fa3d692 --- /dev/null +++ b/tests/timer.rs @@ -0,0 +1,35 @@ +use std::error::Error; +use wstd::time::{Duration, Instant, Timer}; + +#[wstd::test] +async fn timer_after() -> Result<(), Box> { + let start = Instant::now(); + Timer::after(Duration::from_millis(50)).wait().await; + let elapsed = start.elapsed(); + assert!( + elapsed >= Duration::from_millis(40), + "timer_after: elapsed {elapsed:?} should be >= 40ms" + ); + Ok(()) +} + +#[wstd::test] +async fn timer_at() -> Result<(), Box> { + let start = Instant::now(); + let deadline = start + Duration::from_millis(50); + Timer::at(deadline).wait().await; + let elapsed = start.elapsed(); + assert!( + elapsed >= Duration::from_millis(40), + "timer_at: elapsed {elapsed:?} should be >= 40ms" + ); + Ok(()) +} + +#[wstd::test] +async fn instant_monotonic() -> Result<(), Box> { + let a = Instant::now(); + let b = Instant::now(); + assert!(b >= a, "monotonic clock should not go backwards"); + Ok(()) +}