diff --git a/.spelling b/.spelling index 43a963cc2..722ec3649 100644 --- a/.spelling +++ b/.spelling @@ -37,6 +37,7 @@ btree_map buildable bytesbuf callee +cancelled Cargo.toml C-BITFLAG C-CONV @@ -239,6 +240,7 @@ shareable skimmable SLAs smallvec +spawner startup stderr stdlib diff --git a/CHANGELOG.md b/CHANGELOG.md index 9a9ad24dd..7465a7569 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ Please see each crate's change log below: +- [`anyspawn`](./crates/anyspawn/CHANGELOG.md) - [`bytesbuf`](./crates/bytesbuf/CHANGELOG.md) - [`bytesbuf_io`](./crates/bytesbuf_io/CHANGELOG.md) - [`data_privacy`](./crates/data_privacy/CHANGELOG.md) diff --git a/Cargo.lock b/Cargo.lock index 87e78fbaf..f45ff5a97 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -47,6 +47,144 @@ version = "1.0.13" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5192cca8006f1fd4f7237516f40fa183bb07f8fbdfedaa0036de5ea9b0b45e78" +[[package]] +name = "anyspawn" +version = "0.1.0" +dependencies = [ + "criterion", + "futures", + "futures-channel", + "smol", + "static_assertions", + "tick", + "tokio", +] + +[[package]] +name = "async-channel" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "924ed96dd52d1b75e9c1a3e6275715fd320f5f9439fb5a4a11fa51f4221158d2" +dependencies = [ + "concurrent-queue", + "event-listener-strategy", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-executor" +version = "1.13.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "497c00e0fd83a72a79a39fcbd8e3e2f055d6f6c7e025f3b3d91f4f8e76527fb8" +dependencies = [ + "async-task", + "concurrent-queue", + "fastrand", + "futures-lite", + "pin-project-lite", + "slab", +] + +[[package]] +name = "async-fs" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8034a681df4aed8b8edbd7fbe472401ecf009251c8b40556b304567052e294c5" +dependencies = [ + "async-lock", + "blocking", + "futures-lite", +] + +[[package]] +name = "async-io" +version = "2.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "456b8a8feb6f42d237746d4b3e9a178494627745c3c56c6ea55d92ba50d026fc" +dependencies = [ + "autocfg", + "cfg-if", + "concurrent-queue", + "futures-io", + "futures-lite", + "parking", + "polling", + "rustix", + "slab", + "windows-sys 0.61.2", +] + +[[package]] +name = "async-lock" +version = "3.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "290f7f2596bd5b78a9fec8088ccd89180d7f9f55b94b0576823bbbdc72ee8311" +dependencies = [ + "event-listener", + "event-listener-strategy", + "pin-project-lite", +] + +[[package]] +name = "async-net" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b948000fad4873c1c9339d60f2623323a0cfd3816e5181033c6a5cb68b2accf7" +dependencies = [ + "async-io", + "blocking", + "futures-lite", +] + +[[package]] +name = "async-process" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc50921ec0055cdd8a16de48773bfeec5c972598674347252c0399676be7da75" +dependencies = [ + "async-channel", + "async-io", + "async-lock", + "async-signal", + "async-task", + "blocking", + "cfg-if", + "event-listener", + "futures-lite", + "rustix", +] + +[[package]] +name = "async-signal" +version = "0.2.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "43c070bbf59cd3570b6b2dd54cd772527c7c3620fce8be898406dd3ed6adc64c" +dependencies = [ + "async-io", + "async-lock", + "atomic-waker", + "cfg-if", + "futures-core", + "futures-io", + "rustix", + "signal-hook-registry", + "slab", + "windows-sys 0.61.2", +] + +[[package]] +name = "async-task" +version = "4.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b75356056920673b02621b35afd0f7dda9306d03c79a30f5c56c44cf256e3de" + +[[package]] +name = "atomic-waker" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" + [[package]] name = "autocfg" version = "1.5.0" @@ -69,6 +207,19 @@ version = "2.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "812e12b5285cc515a9c72a5c1d3b6d46a19dac5acfef5265968c166106e31dd3" +[[package]] +name = "blocking" +version = "1.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e83f8d02be6967315521be875afa792a316e28d57b5a2d401897e2a7921b7f21" +dependencies = [ + "async-channel", + "async-task", + "futures-io", + "futures-lite", + "piper", +] + [[package]] name = "bumpalo" version = "3.19.1" @@ -215,6 +366,15 @@ version = "0.7.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c3e64b0cc0439b12df2fa678eae89a1c56a529fd067a9115f7827f1fffd22b32" +[[package]] +name = "concurrent-queue" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ca0197aee26d1ae37445ee532fefce43251d24cc7c166799f4d46817f1d3973" +dependencies = [ + "crossbeam-utils", +] + [[package]] name = "convert_case" version = "0.10.0" @@ -274,6 +434,12 @@ dependencies = [ "itertools 0.13.0", ] +[[package]] +name = "crossbeam-utils" +version = "0.8.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28" + [[package]] name = "crunchy" version = "0.2.4" @@ -408,6 +574,27 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "event-listener" +version = "5.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e13b66accf52311f30a0db42147dadea9850cb48cd070028831ae5f5d4b856ab" +dependencies = [ + "concurrent-queue", + "parking", + "pin-project-lite", +] + +[[package]] +name = "event-listener-strategy" +version = "0.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8be9f3dfaaffdae2972880079a491a1a8bb7cbed0b8dd7a347f668b4150a3b93" +dependencies = [ + "event-listener", + "pin-project-lite", +] + [[package]] name = "fastrand" version = "2.3.0" @@ -514,6 +701,19 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" +[[package]] +name = "futures-lite" +version = "2.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f78e10609fe0e0b3f4157ffab1876319b5b0db102a2c60dc4626306dc46b44ad" +dependencies = [ + "fastrand", + "futures-core", + "futures-io", + "parking", + "pin-project-lite", +] + [[package]] name = "futures-macro" version = "0.3.31" @@ -609,6 +809,12 @@ dependencies = [ "stable_deref_trait", ] +[[package]] +name = "hermit-abi" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc0fef456e4baa96da950455cd02c081ca953b141298e41db3fc7e36b1da849c" + [[package]] name = "http" version = "1.4.0" @@ -1026,6 +1232,12 @@ dependencies = [ "winapi", ] +[[package]] +name = "parking" +version = "2.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f38d5652c16fde515bb1ecef450ab0f6a219d619a7274976324d5e377f7dceba" + [[package]] name = "parking_lot" version = "0.12.5" @@ -1138,6 +1350,31 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "piper" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "96c8c490f422ef9a4efd2cb5b42b76c8613d7e7dfc1caf667b8a3350a5acc066" +dependencies = [ + "atomic-waker", + "fastrand", + "futures-io", +] + +[[package]] +name = "polling" +version = "3.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5d0e4f59085d47d8241c88ead0f274e8a0cb551f3625263c05eb8dd897c34218" +dependencies = [ + "cfg-if", + "concurrent-queue", + "hermit-abi", + "pin-project-lite", + "rustix", + "windows-sys 0.61.2", +] + [[package]] name = "portable-atomic" version = "1.13.0" @@ -1500,6 +1737,16 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" +[[package]] +name = "signal-hook-registry" +version = "1.4.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c4db69cba1110affc0e9f7bcd48bbf87b3f4fc7c61fc9155afd4c469eb3d6c1b" +dependencies = [ + "errno", + "libc", +] + [[package]] name = "similar" version = "2.7.0" @@ -1524,6 +1771,23 @@ version = "1.15.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "67b1b7a3b5fe4f1376887184045fcf45c69e92af734b7aaddc05fb777b6fbd03" +[[package]] +name = "smol" +version = "2.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a33bd3e260892199c3ccfc487c88b2da2265080acb316cd920da72fdfd7c599f" +dependencies = [ + "async-channel", + "async-executor", + "async-fs", + "async-io", + "async-lock", + "async-net", + "async-process", + "blocking", + "futures-lite", +] + [[package]] name = "stable_deref_trait" version = "1.2.1" diff --git a/Cargo.toml b/Cargo.toml index 5a5db75c3..12977d59f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,6 +24,7 @@ repository = "https://github.com/microsoft/oxidizer" [workspace.dependencies] # local dependencies +anyspawn = { path = "crates/anyspawn", default-features = false, version = "0.1.0" } bytesbuf = { path = "crates/bytesbuf", default-features = false, version = "0.2.2" } bytesbuf_io = { path = "crates/bytesbuf_io", default-features = false, version = "0.2.0" } data_privacy = { path = "crates/data_privacy", default-features = false, version = "0.10.1" } @@ -54,6 +55,7 @@ duct = { version = "1.1.1", default-features = false } dynosaur = { version = "0.3.0", default-features = false } fastrand = { version = "2.3.0", default-features = false, features = ["std"] } futures = { version = "0.3.31", default-features = false } +futures-channel = { version = "0.3.31", default-features = false } futures-core = { version = "0.3.31", default-features = false } futures-util = { version = "0.3.31", default-features = false } http = { version = "1.2.0", default-features = false, features = ["std"] } diff --git a/README.md b/README.md index d87aac9de..50c253a5b 100644 --- a/README.md +++ b/README.md @@ -19,12 +19,13 @@ This repository contains a set of crates that help you build robust highly scala - [CI Workflows](#ci-workflows) - [Pull Request Gates](#pull-request-gates) - [Tool Versions](#tool-versions) -- [Trademarks](#trademarks) + - [Trademarks](#trademarks) ## Crates These are the primary crates built out of this repo: +- [`anyspawn`](./crates/anyspawn/README.md) - A generic task spawner compatible with any async runtime. - [`bytesbuf`](./crates/bytesbuf/README.md) - Types for creating and manipulating byte sequences. - [`bytesbuf_io`](./crates/bytesbuf_io/README.md) - Asynchronous I/O abstractions expressed via `bytesbuf` types. - [`data_privacy`](./crates/data_privacy/README.md) - Mechanisms to classify, manipulate, and redact sensitive data. diff --git a/crates/anyspawn/CHANGELOG.md b/crates/anyspawn/CHANGELOG.md new file mode 100644 index 000000000..d0cc418fe --- /dev/null +++ b/crates/anyspawn/CHANGELOG.md @@ -0,0 +1,8 @@ +# Changelog + +## 0.1.0 + +Initial release. + +- `Spawner` trait for abstracting async task spawning across runtimes +- `TokioSpawner` implementation for the Tokio runtime (requires `tokio` feature) diff --git a/crates/anyspawn/Cargo.toml b/crates/anyspawn/Cargo.toml new file mode 100644 index 000000000..9e0b5ca40 --- /dev/null +++ b/crates/anyspawn/Cargo.toml @@ -0,0 +1,53 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +[package] +name = "anyspawn" +description = "A generic task spawner compatible with any async runtime." +version = "0.1.0" +readme = "README.md" +keywords = ["oxidizer", "async", "runtime", "futures"] +categories = ["asynchronous"] + +edition.workspace = true +rust-version.workspace = true +authors.workspace = true +license.workspace = true +homepage.workspace = true +repository.workspace = true + +[package.metadata.docs.rs] +all-features = true + +[features] +default = ["tokio", "custom"] +tokio = ["dep:tokio"] +custom = ["dep:futures-channel"] + +[dependencies] +futures-channel = { workspace = true, features = ["alloc"], optional = true } +tokio = { workspace = true, features = ["rt"], optional = true } + +[dev-dependencies] +criterion = { workspace = true } +futures = { workspace = true, features = ["executor"] } +smol = "2" +static_assertions.workspace = true +tick = { workspace = true, features = ["tokio"] } +tokio = { workspace = true, features = ["macros", "rt-multi-thread", "sync"] } + +[[bench]] +name = "spawner" +harness = false +required-features = ["tokio", "custom"] + +[[example]] +name = "custom" +required-features = ["tokio", "custom"] + +[[example]] +name = "tokio" +required-features = ["tokio"] + +[lints] +workspace = true diff --git a/crates/anyspawn/README.md b/crates/anyspawn/README.md new file mode 100644 index 000000000..336528c03 --- /dev/null +++ b/crates/anyspawn/README.md @@ -0,0 +1,66 @@ +
+ Anyspawn Logo + +# Anyspawn + +[![crate.io](https://img.shields.io/crates/v/anyspawn.svg)](https://crates.io/crates/anyspawn) +[![docs.rs](https://docs.rs/anyspawn/badge.svg)](https://docs.rs/anyspawn) +[![MSRV](https://img.shields.io/crates/msrv/anyspawn)](https://crates.io/crates/anyspawn) +[![CI](https://github.com/microsoft/oxidizer/actions/workflows/main.yml/badge.svg?event=push)](https://github.com/microsoft/oxidizer/actions/workflows/main.yml) +[![Coverage](https://codecov.io/gh/microsoft/oxidizer/graph/badge.svg?token=FCUG0EL5TI)](https://codecov.io/gh/microsoft/oxidizer) +[![License](https://img.shields.io/badge/license-MIT-blue.svg)](../../LICENSE) +This crate was developed as part of the Oxidizer project + +
+ +A generic task spawner compatible with any async runtime. + +This crate provides a [`Spawner`][__link0] type that abstracts task spawning across +different async runtimes without generic infection. + +## Design Philosophy + +* **Concrete type**: No generics needed in your code +* **Simple**: Use built-in constructors or provide a closure +* **Flexible**: Works with any async runtime + +## Quick Start + +### Using Tokio + +```rust +use anyspawn::Spawner; + +let spawner = Spawner::new_tokio(); +let result = spawner.spawn(async { 1 + 1 }).await; +assert_eq!(result, 2); +``` + +### Custom Runtime + +```rust +use anyspawn::Spawner; + +let spawner = Spawner::new_custom(|fut| { + std::thread::spawn(move || futures::executor::block_on(fut)); +}); + +// Returns a JoinHandle that can be awaited or dropped +let handle = spawner.spawn(async { 42 }); +``` + +## Features + +* `tokio` (default): Enables the [`Spawner::new_tokio`][__link1] constructor +* `custom`: Enables the [`Spawner::new_custom`][__link2] constructor + + +
+ +This crate was developed as part of The Oxidizer Project. Browse this crate's source code. + + + [__cargo_doc2readme_dependencies_info]: ggGkYW0CYXSEGy4k8ldDFPOhG2VNeXtD5nnKG6EPY6OfW5wBG8g18NOFNdxpYXKEG9dpP_3aSShKG7QXWKrXDfsoG1WHsvhBMNw7G0Heldai66RkYWSBgmhhbnlzcGF3bmUwLjEuMA + [__link0]: https://docs.rs/anyspawn/0.1.0/anyspawn/?search=Spawner + [__link1]: https://docs.rs/anyspawn/0.1.0/anyspawn/?search=Spawner::new_tokio + [__link2]: https://docs.rs/anyspawn/0.1.0/anyspawn/?search=Spawner::new_custom diff --git a/crates/anyspawn/benches/spawner.rs b/crates/anyspawn/benches/spawner.rs new file mode 100644 index 000000000..b6ecb41f1 --- /dev/null +++ b/crates/anyspawn/benches/spawner.rs @@ -0,0 +1,45 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +#![allow( + missing_docs, + clippy::unwrap_used, + reason = "Benchmarks don't require documentation and should fail fast on errors" +)] + +use anyspawn::Spawner; +use criterion::{Criterion, criterion_group, criterion_main}; + +fn entry(c: &mut Criterion) { + let mut group = c.benchmark_group("spawner"); + + // Tokio benchmarks + let rt = tokio::runtime::Runtime::new().unwrap(); + let tokio_spawner = Spawner::new_tokio(); + + group.bench_function("tokio_direct", |b| { + b.iter(|| rt.block_on(async { tokio::spawn(async { 42 }).await.unwrap() })); + }); + + group.bench_function("tokio_via_spawner", |b| { + b.iter(|| rt.block_on(async { tokio_spawner.spawn(async { 42 }).await })); + }); + + // smol benchmarks + let smol_spawner = Spawner::new_custom(|fut| { + smol::spawn(fut).detach(); + }); + + group.bench_function("smol_direct", |b| { + b.iter(|| smol::block_on(async { smol::spawn(async { 42 }).await })); + }); + + group.bench_function("smol_via_spawner", |b| { + b.iter(|| smol::block_on(async { smol_spawner.spawn(async { 42 }).await })); + }); + + group.finish(); +} + +criterion_group!(benches, entry); +criterion_main!(benches); diff --git a/crates/anyspawn/examples/custom.rs b/crates/anyspawn/examples/custom.rs new file mode 100644 index 000000000..074d9064a --- /dev/null +++ b/crates/anyspawn/examples/custom.rs @@ -0,0 +1,35 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +//! Spawning tasks with a custom spawner. + +use std::{ + thread::{sleep, spawn}, + time::Duration, +}; + +use anyspawn::Spawner; +use futures::executor::block_on; + +#[tokio::main] +async fn main() { + // Create a spawner that runs futures on background threads + let spawner = Spawner::new_custom(|fut| { + spawn(move || block_on(fut)); + }); + + // Fire-and-forget: spawn a task without waiting for its result + let () = spawner + .spawn(async { + println!("Background task completed!"); + }) + .await; + + // Retrieve a result by awaiting the JoinHandle + let handle = spawner.spawn(async { 1 + 1 }); + let value = handle.await; + println!("Got result: {value}"); + + // Wait for background task + sleep(Duration::from_millis(50)); +} diff --git a/crates/anyspawn/examples/tokio.rs b/crates/anyspawn/examples/tokio.rs new file mode 100644 index 000000000..cb2bf869f --- /dev/null +++ b/crates/anyspawn/examples/tokio.rs @@ -0,0 +1,24 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +//! Spawning tasks with Tokio. + +use anyspawn::Spawner; + +#[tokio::main] +async fn main() { + let spawner = Spawner::new_tokio(); + + // Fire-and-forget: spawn a task without waiting for its result + let () = spawner + .spawn({ + async move { + println!("Background task completed!"); + } + }) + .await; + + // Retrieve a result by awaiting the JoinHandle + let value = spawner.spawn(async { 1 + 1 }).await; + println!("Got result: {value}"); +} diff --git a/crates/anyspawn/favicon.ico b/crates/anyspawn/favicon.ico new file mode 100644 index 000000000..42c826b92 --- /dev/null +++ b/crates/anyspawn/favicon.ico @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:f2a7cb259fa5c491b39fd9e2c2e32aa25497b818198957c7d1f90b02c896e524 +size 15406 diff --git a/crates/anyspawn/logo.png b/crates/anyspawn/logo.png new file mode 100644 index 000000000..ae2ae5ced --- /dev/null +++ b/crates/anyspawn/logo.png @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:dfdc338b2229ac33c210aa8093128f2236c7a88d20eabce213b0f9b090fd24c8 +size 41426 diff --git a/crates/anyspawn/src/custom.rs b/crates/anyspawn/src/custom.rs new file mode 100644 index 000000000..398364b9c --- /dev/null +++ b/crates/anyspawn/src/custom.rs @@ -0,0 +1,29 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +use std::{fmt::Debug, pin::Pin, sync::Arc}; + +use futures_channel::oneshot; + +pub(crate) type BoxedFuture = Pin + Send>>; +type SpawnFn = dyn Fn(BoxedFuture) + Send + Sync; + +/// Internal wrapper for custom spawn functions. +#[derive(Clone)] +pub(crate) struct CustomSpawner(pub(crate) Arc); + +impl CustomSpawner { + pub(crate) fn call(&self, work: impl Future + Send + 'static) -> oneshot::Receiver { + let (tx, rx) = oneshot::channel(); + (self.0)(Box::pin(async move { + let _ = tx.send(work.await); + })); + rx + } +} + +impl Debug for CustomSpawner { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("CustomSpawner").finish_non_exhaustive() + } +} diff --git a/crates/anyspawn/src/handle.rs b/crates/anyspawn/src/handle.rs new file mode 100644 index 000000000..e64d49283 --- /dev/null +++ b/crates/anyspawn/src/handle.rs @@ -0,0 +1,49 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +//! [`JoinHandle`] for awaiting spawned task results. + +use std::{ + fmt::Debug, + pin::Pin, + task::{Context, Poll}, +}; + +#[cfg(feature = "custom")] +use futures_channel::oneshot; + +/// A handle to a spawned task that can be awaited to retrieve its result. +/// +/// This is returned by [`Spawner::spawn`](crate::Spawner::spawn) and implements +/// [`Future`] to allow awaiting the task's completion. +/// +/// # Panics +/// +/// Awaiting a `JoinHandle` will panic if the spawned task panicked. +pub struct JoinHandle(pub(crate) JoinHandleInner); + +pub(crate) enum JoinHandleInner { + #[cfg(feature = "tokio")] + Tokio(::tokio::task::JoinHandle), + #[cfg(feature = "custom")] + Custom(oneshot::Receiver), +} + +impl Future for JoinHandle { + type Output = T; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match &mut self.get_mut().0 { + #[cfg(feature = "tokio")] + JoinHandleInner::Tokio(jh) => Pin::new(jh).poll(cx).map(|res| res.expect("spawned task panicked")), + #[cfg(feature = "custom")] + JoinHandleInner::Custom(rx) => Pin::new(rx).poll(cx).map(|res| res.expect("spawned task panicked")), + } + } +} + +impl Debug for JoinHandle { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("JoinHandle").finish_non_exhaustive() + } +} diff --git a/crates/anyspawn/src/lib.rs b/crates/anyspawn/src/lib.rs new file mode 100644 index 000000000..8ce907163 --- /dev/null +++ b/crates/anyspawn/src/lib.rs @@ -0,0 +1,65 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +#![cfg_attr(coverage_nightly, feature(coverage_attribute))] +#![cfg_attr(docsrs, feature(doc_cfg))] +#![warn(missing_docs)] + +//! A generic task spawner compatible with any async runtime. +//! +//! This crate provides a [`Spawner`] type that abstracts task spawning across +//! different async runtimes without generic infection. +//! +//! # Design Philosophy +//! +//! - **Concrete type**: No generics needed in your code +//! - **Simple**: Use built-in constructors or provide a closure +//! - **Flexible**: Works with any async runtime +//! +//! # Quick Start +//! +//! ## Using Tokio +//! +//! ```rust +//! use anyspawn::Spawner; +//! +//! # #[tokio::main] +//! # async fn main() { +//! let spawner = Spawner::new_tokio(); +//! let result = spawner.spawn(async { 1 + 1 }).await; +//! assert_eq!(result, 2); +//! # } +//! ``` +//! +//! ## Custom Runtime +//! +//! ```rust,ignore +//! use anyspawn::Spawner; +//! +//! let spawner = Spawner::new_custom(|fut| { +//! std::thread::spawn(move || futures::executor::block_on(fut)); +//! }); +//! +//! // Returns a JoinHandle that can be awaited or dropped +//! let handle = spawner.spawn(async { 42 }); +//! ``` +//! +//! # Features +//! +//! - `tokio` (default): Enables the [`Spawner::new_tokio`] constructor +//! - `custom`: Enables the [`Spawner::new_custom`] constructor + +#![doc(html_logo_url = "https://media.githubusercontent.com/media/microsoft/oxidizer/refs/heads/main/crates/anyspawn/logo.png")] +#![doc(html_favicon_url = "https://media.githubusercontent.com/media/microsoft/oxidizer/refs/heads/main/crates/anyspawn/favicon.ico")] + +#[cfg(feature = "custom")] +mod custom; +#[cfg(any(feature = "tokio", feature = "custom"))] +mod handle; +#[cfg(any(feature = "tokio", feature = "custom"))] +mod spawner; + +#[cfg(any(feature = "tokio", feature = "custom"))] +pub use handle::JoinHandle; +#[cfg(any(feature = "tokio", feature = "custom"))] +pub use spawner::Spawner; diff --git a/crates/anyspawn/src/spawner.rs b/crates/anyspawn/src/spawner.rs new file mode 100644 index 000000000..5b651b0b7 --- /dev/null +++ b/crates/anyspawn/src/spawner.rs @@ -0,0 +1,187 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +//! [`Spawner`] for plugging in runtime implementations. + +use std::fmt::Debug; +#[cfg(feature = "custom")] +use std::sync::Arc; + +use crate::handle::JoinHandle; +#[cfg(any(feature = "tokio", feature = "custom"))] +use crate::handle::JoinHandleInner; + +#[cfg(feature = "custom")] +use crate::custom::{BoxedFuture, CustomSpawner}; + +/// Runtime-agnostic task spawner. +/// +/// `Spawner` abstracts task spawning across different async runtimes. Use the +/// built-in constructors for common runtimes, or [`Spawner::new_custom`] for custom +/// implementations. +/// +/// # Examples +/// +/// Using Tokio: +/// +/// ```rust +/// use anyspawn::Spawner; +/// +/// # #[tokio::main] +/// # async fn main() { +/// let spawner = Spawner::new_tokio(); +/// let handle = spawner.spawn(async { +/// println!("Task running!"); +/// }); +/// handle.await; // Wait for task to complete +/// # } +/// ``` +/// +/// ## Custom Runtime +/// +/// ```rust,ignore +/// use anyspawn::Spawner; +/// +/// let spawner = Spawner::new_custom(|fut| { +/// std::thread::spawn(move || futures::executor::block_on(fut)); +/// }); +/// +/// let handle = spawner.spawn(async { +/// println!("Running on custom runtime!"); +/// }); +/// // handle can be awaited or dropped (fire-and-forget) +/// ``` +/// +/// ## Getting Results +/// +/// Await the [`JoinHandle`](crate::JoinHandle) to retrieve a value from the task: +/// +/// ```rust +/// use anyspawn::Spawner; +/// +/// # #[tokio::main] +/// # async fn main() { +/// let spawner = Spawner::new_tokio(); +/// let value = spawner.spawn(async { 1 + 1 }).await; +/// assert_eq!(value, 2); +/// # } +/// ``` +/// +/// ## Handling Errors +/// +/// Return a `Result` from the task to propagate errors: +/// +/// ```rust +/// use anyspawn::Spawner; +/// +/// # #[tokio::main] +/// # async fn main() { +/// let spawner = Spawner::new_tokio(); +/// +/// let result = spawner +/// .spawn(async { +/// if true { Ok(42) } else { Err("something went wrong") } +/// }) +/// .await; +/// +/// match result { +/// Ok(value) => println!("Got {value}"), +/// Err(e) => eprintln!("Task failed: {e}"), +/// } +/// # } +/// ``` +#[derive(Debug, Clone)] +pub struct Spawner(SpawnerKind); + +#[derive(Debug, Clone)] +enum SpawnerKind { + #[cfg(feature = "tokio")] + Tokio, + #[cfg(feature = "custom")] + Custom(CustomSpawner), +} + +impl Spawner { + /// Creates a spawner that uses the Tokio runtime. + /// + /// # Panics + /// + /// Panics if called outside of a Tokio runtime context. + /// + /// # Examples + /// + /// ```rust + /// use anyspawn::Spawner; + /// + /// # #[tokio::main] + /// # async fn main() { + /// let spawner = Spawner::new_tokio(); + /// let result = spawner.spawn(async { 42 }).await; + /// assert_eq!(result, 42); + /// # } + /// ``` + #[must_use] + #[cfg(feature = "tokio")] + #[cfg_attr(docsrs, doc(cfg(feature = "tokio")))] + pub fn new_tokio() -> Self { + Self(SpawnerKind::Tokio) + } + + /// Creates a custom spawner from a closure. + /// + /// The closure receives a boxed, pinned future and is responsible for + /// spawning it on the appropriate runtime. + /// + /// # Examples + /// + /// ```rust,ignore + /// use anyspawn::Spawner; + /// + /// let spawner = Spawner::new_custom(|fut| { + /// std::thread::spawn(move || futures::executor::block_on(fut)); + /// }); + /// ``` + #[cfg(feature = "custom")] + #[cfg_attr(docsrs, doc(cfg(feature = "custom")))] + pub fn new_custom(f: F) -> Self + where + F: Fn(BoxedFuture) + Send + Sync + 'static, + { + Self(SpawnerKind::Custom(CustomSpawner(Arc::new(f)))) + } + + /// Spawns an async task on the runtime. + /// + /// Returns a [`JoinHandle`] that can be awaited to retrieve the task's result, + /// or dropped to run the task in fire-and-forget mode. + /// + /// # Panics + /// + /// Awaiting the returned `JoinHandle` will panic if the spawned task panics. + /// + /// # Examples + /// + /// ```rust + /// use anyspawn::Spawner; + /// + /// # #[tokio::main] + /// # async fn main() { + /// let spawner = Spawner::new_tokio(); + /// + /// // Await to get the result + /// let value = spawner.spawn(async { 1 + 1 }).await; + /// assert_eq!(value, 2); + /// + /// // Or fire-and-forget by dropping the handle + /// let _ = spawner.spawn(async { println!("background task") }); + /// # } + /// ``` + pub fn spawn(&self, work: impl Future + Send + 'static) -> JoinHandle { + match &self.0 { + #[cfg(feature = "tokio")] + SpawnerKind::Tokio => JoinHandle(JoinHandleInner::Tokio(::tokio::spawn(work))), + #[cfg(feature = "custom")] + SpawnerKind::Custom(c) => JoinHandle(JoinHandleInner::Custom(c.call(work))), + } + } +} diff --git a/crates/anyspawn/tests/handle.rs b/crates/anyspawn/tests/handle.rs new file mode 100644 index 000000000..a897989a2 --- /dev/null +++ b/crates/anyspawn/tests/handle.rs @@ -0,0 +1,19 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +#![allow(missing_docs, reason = "test code")] +#![cfg(any(feature = "tokio", feature = "custom"))] + +//! Tests for `JoinHandle` implementations. + +use anyspawn::Spawner; + +#[cfg(feature = "tokio")] +#[tokio::test] +async fn join_handle_debug() { + let spawner = Spawner::new_tokio(); + let handle = spawner.spawn(async { 42 }); + let debug_str = format!("{handle:?}"); + assert!(debug_str.contains("JoinHandle")); + let _ = handle.await; +} diff --git a/crates/anyspawn/tests/spawner.rs b/crates/anyspawn/tests/spawner.rs new file mode 100644 index 000000000..ef803210f --- /dev/null +++ b/crates/anyspawn/tests/spawner.rs @@ -0,0 +1,71 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +#![allow(missing_docs, reason = "test code")] +#![cfg(any(feature = "tokio", feature = "custom"))] + +//! Tests for `Spawner` implementations. + +use anyspawn::Spawner; + +static_assertions::assert_impl_all!(Spawner: Send, Sync); + +#[cfg(feature = "tokio")] +#[tokio::test] +async fn tokio_spawn_and_await() { + let spawner = Spawner::new_tokio(); + let result = spawner.spawn(async { 42 }).await; + assert_eq!(result, 42); +} + +#[cfg(feature = "tokio")] +#[tokio::test] +async fn tokio_spawn_fire_and_forget() { + let spawner = Spawner::new_tokio(); + let (tx, rx) = tokio::sync::oneshot::channel(); + + let () = spawner + .spawn(async move { + tx.send(42).unwrap(); + }) + .await; + + assert_eq!(rx.await.unwrap(), 42); +} + +#[cfg(feature = "custom")] +#[test] +fn custom_spawn_and_await() { + let spawner = Spawner::new_custom(|fut| { + std::thread::spawn(move || futures::executor::block_on(fut)); + }); + + let result = futures::executor::block_on(spawner.spawn(async { 42 })); + assert_eq!(result, 42); +} + +#[cfg(feature = "custom")] +#[tokio::test] +async fn custom_spawn_fire_and_forget() { + let spawner = Spawner::new_custom(|fut| { + std::thread::spawn(move || futures::executor::block_on(fut)); + }); + + let (tx, rx) = std::sync::mpsc::channel(); + + let () = spawner + .spawn(async move { + tx.send(42).unwrap(); + }) + .await; + + assert_eq!(rx.recv().unwrap(), 42); +} + +#[cfg(feature = "custom")] +#[test] +fn custom_spawner_debug() { + let spawner = Spawner::new_custom(|_| {}); + let debug_str = format!("{spawner:?}"); + assert!(debug_str.contains("CustomSpawner")); +}