diff --git a/.gitignore b/.gitignore index 20b91f6ba..6368ac41f 100644 --- a/.gitignore +++ b/.gitignore @@ -35,4 +35,5 @@ _manifest ARROW # Agent files -.claude \ No newline at end of file +.claude +CLAUDE.md \ No newline at end of file diff --git a/.spelling b/.spelling index 722ec3649..2da87d285 100644 --- a/.spelling +++ b/.spelling @@ -1,3 +1,4 @@ +304 → 0.X.Y 100k @@ -82,6 +83,8 @@ C-SERDE C-SMART-PTR deallocate Debuggability +Deduplicate +deduplicating deduplication deque Deque @@ -279,6 +282,8 @@ unconfigured uncontended unhandleable unicode +uniflight +uniflight's Uninit unordered unredacted diff --git a/CHANGELOG.md b/CHANGELOG.md index 7465a7569..8c22ef8e7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,3 +20,4 @@ Please see each crate's change log below: - [`thread_aware_macros`](./crates/thread_aware_macros/CHANGELOG.md) - [`thread_aware_macros_impl`](./crates/thread_aware_macros_impl/CHANGELOG.md) - [`tick`](./crates/tick/CHANGELOG.md) +- [`uniflight`](./crates/uniflight/CHANGELOG.md) diff --git a/Cargo.lock b/Cargo.lock index f45ff5a97..e2ea9f216 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,18 @@ # It is not intended for manual editing. version = 4 +[[package]] +name = "ahash" +version = "0.8.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a15f179cd60c4584b8a8c596927aadc462e27f2ca70c04e0071964a73ba7a75" +dependencies = [ + "cfg-if", + "once_cell", + "version_check", + "zerocopy", +] + [[package]] name = "aho-corasick" version = "1.1.4" @@ -137,6 +149,12 @@ dependencies = [ "futures-lite", ] +[[package]] +name = "async-once-cell" +version = "0.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4288f83726785267c6f2ef073a3d83dc3f9b81464e9f99898240cced85fce35a" + [[package]] name = "async-process" version = "2.5.0" @@ -421,6 +439,7 @@ dependencies = [ "serde", "serde_json", "tinytemplate", + "tokio", "walkdir", ] @@ -446,6 +465,20 @@ version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "460fbee9c2c2f33933d720630a6a0bac33ba7053db5344fac858d4b8952d77d5" +[[package]] +name = "dashmap" +version = "6.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf" +dependencies = [ + "cfg-if", + "crossbeam-utils", + "hashbrown 0.14.5", + "lock_api", + "once_cell", + "parking_lot_core", +] + [[package]] name = "data_privacy" version = "0.10.1" @@ -793,6 +826,12 @@ dependencies = [ "byteorder", ] +[[package]] +name = "hashbrown" +version = "0.14.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" + [[package]] name = "hashbrown" version = "0.16.1" @@ -856,7 +895,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7714e70437a7dc3ac8eb7e6f8df75fd8eb422675fc7678aff7364301092b1017" dependencies = [ "equivalent", - "hashbrown", + "hashbrown 0.16.1", ] [[package]] @@ -2219,6 +2258,27 @@ version = "0.2.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ebc1c04c71510c7f702b52b7c350734c9ff1295c464a03335b00bb84fc54f853" +[[package]] +name = "uniflight" +version = "0.1.0" +dependencies = [ + "ahash", + "async-once-cell", + "criterion", + "dashmap", + "futures-util", + "mutants", + "thread_aware", + "tick", + "tokio", +] + +[[package]] +name = "version_check" +version = "0.9.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" + [[package]] name = "walkdir" version = "2.5.0" diff --git a/Cargo.toml b/Cargo.toml index 12977d59f..f5c451837 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -43,13 +43,18 @@ thread_aware = { path = "crates/thread_aware", default-features = false, version thread_aware_macros = { path = "crates/thread_aware_macros", default-features = false, version = "0.6.1" } thread_aware_macros_impl = { path = "crates/thread_aware_macros_impl", default-features = false, version = "0.6.1" } tick = { path = "crates/tick", default-features = false, version = "0.1.2" } +uniflight = { path = "crates/uniflight", default-features = false, version = "0.1.0" } # external dependencies +ahash = { version = "0.8", default-features = false } alloc_tracker = { version = "0.5.9", default-features = false } +anyhow = { version = "1.0.100", default-features = false } +async-once-cell = { version = "0.5", default-features = false } bytes = { version = "1.11.0", default-features = false } chrono = { version = "0.4.40", default-features = false } chrono-tz = { version = "0.10.4", default-features = false } criterion = { version = "0.8.1", default-features = false } +dashmap = { version = "6.1", default-features = false } derive_more = { version = "2.0.1", default-features = false } duct = { version = "1.1.1", default-features = false } dynosaur = { version = "0.3.0", default-features = false } @@ -73,6 +78,7 @@ once_cell = { version = "1.21.3", default-features = false } opentelemetry = { version = "0.31.0", default-features = false } opentelemetry-stdout = { version = "0.31.0", default-features = false } opentelemetry_sdk = { version = "0.31.0", default-features = false } +parking_lot = { version = "0.12.5", default-features = false } pin-project-lite = { version = "0.2.13", default-features = false } pretty_assertions = { version = "1.4.1", default-features = false } prettyplease = { version = "0.2.37", default-features = false } @@ -101,6 +107,7 @@ trait-variant = { version = "0.1.2", default-features = false } trybuild = { version = "1.0.114", default-features = false } typeid = { version = "1.0.3", default-features = false } windows-sys = { version = "0.61.2", default-features = false } +xutex = { version = "0.2.0", default-features = false } xxhash-rust = { version = "0.8.15", default-features = false } [workspace.lints.rust] diff --git a/README.md b/README.md index 50c253a5b..4f6f51408 100644 --- a/README.md +++ b/README.md @@ -36,6 +36,7 @@ These are the primary crates built out of this repo: - [`seatbelt`](./crates/seatbelt/README.md) - Resilience and recovery mechanisms for fallible operations. - [`thread_aware`](./crates/thread_aware/README.md) - Facilities to support thread-isolated state. - [`tick`](./crates/tick/README.md) - Provides primitives to interact with and manipulate machine time. +- [`uniflight`](./crates/uniflight/README.md) - Coalesces duplicate async tasks into a single execution. ## About this Repo diff --git a/crates/uniflight/CHANGELOG.md b/crates/uniflight/CHANGELOG.md new file mode 100644 index 000000000..0906fd27f --- /dev/null +++ b/crates/uniflight/CHANGELOG.md @@ -0,0 +1,8 @@ +# Changelog + +## [0.1.0] - 2025-12-10 + +- 🧩 Miscellaneous + + - Initial commit of uniflight + diff --git a/crates/uniflight/Cargo.toml b/crates/uniflight/Cargo.toml new file mode 100644 index 000000000..7774671f7 --- /dev/null +++ b/crates/uniflight/Cargo.toml @@ -0,0 +1,53 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +[package] +name = "uniflight" +description = "Coalesces duplicate async tasks into a single execution." +version = "0.1.0" +readme = "README.md" +keywords = ["oxidizer", "coalescing", "stempede", "singleflight", "deduplication"] +categories = ["concurrency"] + +edition.workspace = true +rust-version.workspace = true +authors.workspace = true +license.workspace = true +homepage.workspace = true +repository.workspace = true + +[package.metadata.cargo_check_external_types] +allowed_external_types = [ + "thread_aware::cell::builtin::PerProcess", + "thread_aware::cell::storage::Strategy", + "thread_aware::core::ThreadAware", +] + +[dependencies] +ahash = { workspace = true, default-features = false, features = ["std"] } +async-once-cell.workspace = true +dashmap.workspace = true +futures-util = { workspace = true, default-features = false, features = ["std", "alloc"] } +thread_aware.workspace = true + +[dev-dependencies] +criterion = { workspace = true, features = ["async_tokio"] } +futures-util = { workspace = true, features = ["alloc", "std"] } +mutants.workspace = true +tick = { workspace = true, features = ["tokio"] } +tokio = { workspace = true, features = [ + "macros", + "rt", + "time", + "rt-multi-thread", +] } + +[lints] +workspace = true + +[[bench]] +name = "performance" +harness = false + +[[example]] +name = "cache_population" diff --git a/crates/uniflight/README.md b/crates/uniflight/README.md new file mode 100644 index 000000000..4894a8b36 --- /dev/null +++ b/crates/uniflight/README.md @@ -0,0 +1,145 @@ +
+ Uniflight Logo + +# Uniflight + +[![crate.io](https://img.shields.io/crates/v/uniflight.svg)](https://crates.io/crates/uniflight) +[![docs.rs](https://docs.rs/uniflight/badge.svg)](https://docs.rs/uniflight) +[![MSRV](https://img.shields.io/crates/msrv/uniflight)](https://crates.io/crates/uniflight) +[![CI](https://github.com/microsoft/oxidizer/actions/workflows/main.yml/badge.svg?event=push)](https://github.com/microsoft/oxidizer/actions/workflows/main.yml) +[![Coverage](https://codecov.io/gh/microsoft/oxidizer/graph/badge.svg?token=FCUG0EL5TI)](https://codecov.io/gh/microsoft/oxidizer) +[![License](https://img.shields.io/badge/license-MIT-blue.svg)](../../LICENSE) +This crate was developed as part of the Oxidizer project + +
+ +Coalesces duplicate async tasks into a single execution. + +This crate provides [`Merger`][__link0], a mechanism for deduplicating concurrent async operations. +When multiple tasks request the same work (identified by a key), only the first task (the +“leader”) performs the actual work while subsequent tasks (the “followers”) wait and receive +a clone of the result. + +## When to Use + +Use `Merger` when you have expensive or rate-limited operations that may be requested +concurrently with the same parameters: + +* **Cache population**: Prevent thundering herd when a cache entry expires +* **API calls**: Deduplicate concurrent requests to the same endpoint +* **Database queries**: Coalesce identical queries issued simultaneously +* **File I/O**: Avoid reading the same file multiple times concurrently + +## Example + +```rust +use uniflight::Merger; + +let group: Merger = Merger::new(); + +// Multiple concurrent calls with the same key will share a single execution. +// Note: you can pass &str directly when the key type is String. +let result = group.execute("user:123", || async { + // This expensive operation runs only once, even if called concurrently + "expensive_result".to_string() +}).await.expect("leader should not panic"); +``` + +## Flexible Key Types + +The [`Merger::execute`][__link1] method accepts keys using [`Borrow`][__link2] semantics, allowing you to pass +borrowed forms of the key type. For example, with `Merger`, you can pass `&str` +directly without allocating: + +```rust +let merger: Merger = Merger::new(); + +// Pass &str directly - no need to call .to_string() +let result = merger.execute("my-key", || async { 42 }).await; +assert_eq!(result, Ok(42)); +``` + +## Thread-Aware Scoping + +`Merger` supports thread-aware scoping via a [`Strategy`][__link3] +type parameter. This controls how the internal state is partitioned across threads/NUMA nodes: + +* [`PerProcess`][__link4] (default): Single global state, maximum deduplication +* [`PerNuma`][__link5]: Separate state per NUMA node, NUMA-local memory access +* [`PerCore`][__link6]: Separate state per core, no deduplication (useful for already-partitioned work) + +```rust +use uniflight::Merger; +use thread_aware::PerNuma; + +// NUMA-aware merger - each NUMA node gets its own deduplication scope +let merger: Merger = Merger::new_per_numa(); +``` + +## Cancellation and Panic Handling + +`Merger` handles task cancellation and panics explicitly: + +* If the leader task is cancelled or dropped, a follower becomes the new leader +* If the leader task panics, followers receive [`LeaderPanicked`][__link7] error with the panic message +* Followers that join before the leader completes receive the value the leader returns + +When a panic occurs, followers are notified via the error type rather than silently +retrying. The panic message is captured and available via [`LeaderPanicked::message`][__link8]: + +```rust +let merger: Merger = Merger::new(); +match merger.execute("key", || async { "result".to_string() }).await { + Ok(value) => println!("got {value}"), + Err(err) => { + println!("leader panicked: {}", err.message()); + // Decide whether to retry + } +} +``` + +## Memory Management + +Completed entries are automatically removed from the internal map when the last caller +finishes. This ensures no stale entries accumulate over time. + +## Type Requirements + +The value type `T` must implement [`Clone`][__link9] because followers receive a clone of the +leader’s result. The key type `K` must implement [`Hash`][__link10] and [`Eq`][__link11]. + +## Thread Safety + +[`Merger`][__link12] is `Send` and `Sync`, and can be shared across threads. The returned futures +are `Send` when the closure, future, key, and value types are `Send`. + +## Performance + +Run benchmarks with `cargo bench -p uniflight`. The suite covers: + +* `single_call`: Baseline latency with no contention +* `high_contention_100`: 100 concurrent tasks on the same key +* `distributed_10x10`: 10 keys with 10 tasks each + +Use `--save-baseline` and `--baseline` flags to track regressions over time. + + +
+ +This crate was developed as part of The Oxidizer Project. Browse this crate's source code. + + + [__cargo_doc2readme_dependencies_info]: ggGkYW0CYXSEGy4k8ldDFPOhG2VNeXtD5nnKG6EPY6OfW5wBG8g18NOFNdxpYXKEGxgwNFq9VUtfG5xaBNm6U4VGG97W2YkyKkPjG4KVgSbTgdOrYWSCgmx0aHJlYWRfYXdhcmVlMC42LjGCaXVuaWZsaWdodGUwLjEuMA + [__link0]: https://docs.rs/uniflight/0.1.0/uniflight/struct.Merger.html + [__link1]: https://docs.rs/uniflight/0.1.0/uniflight/?search=Merger::execute + [__link10]: https://doc.rust-lang.org/stable/std/?search=hash::Hash + [__link11]: https://doc.rust-lang.org/stable/std/cmp/trait.Eq.html + [__link12]: https://docs.rs/uniflight/0.1.0/uniflight/struct.Merger.html + [__link2]: https://doc.rust-lang.org/stable/std/?search=borrow::Borrow + [__link3]: https://docs.rs/thread_aware/0.6.1/thread_aware/?search=storage::Strategy + [__link4]: https://docs.rs/thread_aware/0.6.1/thread_aware/?search=PerProcess + [__link5]: https://docs.rs/thread_aware/0.6.1/thread_aware/?search=PerNuma + [__link6]: https://docs.rs/thread_aware/0.6.1/thread_aware/?search=PerCore + [__link7]: https://docs.rs/uniflight/0.1.0/uniflight/struct.LeaderPanicked.html + [__link8]: https://docs.rs/uniflight/0.1.0/uniflight/?search=LeaderPanicked::message + [__link9]: https://doc.rust-lang.org/stable/std/clone/trait.Clone.html diff --git a/crates/uniflight/benches/performance.rs b/crates/uniflight/benches/performance.rs new file mode 100644 index 000000000..e210dd4fb --- /dev/null +++ b/crates/uniflight/benches/performance.rs @@ -0,0 +1,99 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +//! Performance benchmarks for uniflight. +//! +//! Run with: cargo bench -p uniflight +//! Save baseline: cargo bench -p uniflight -- --save-baseline main +//! Compare to baseline: cargo bench -p uniflight -- --baseline main + +#![allow(missing_docs, reason = "benchmark code")] + +use std::sync::{ + Arc, + atomic::{AtomicU64, Ordering}, +}; + +use criterion::{Criterion, criterion_group, criterion_main}; +use uniflight::Merger; + +static KEY_COUNTER: AtomicU64 = AtomicU64::new(0); + +fn unique_key() -> String { + format!("key_{}", KEY_COUNTER.fetch_add(1, Ordering::Relaxed)) +} + +/// Baseline: single call, no contention. +/// This measures the fixed overhead of the merger. +fn bench_single_call(c: &mut Criterion) { + let rt = tokio::runtime::Runtime::new().expect("Failed to create runtime"); + let merger = Arc::new(Merger::::new()); + + c.bench_function("single_call", |b| { + b.to_async(&rt).iter(|| { + let merger = Arc::clone(&merger); + async move { merger.execute(&unique_key(), || async { "value".to_string() }).await } + }); + }); +} + +/// Stress test: 100 concurrent tasks on the same key. +/// This hammers the synchronization primitives. +fn bench_high_contention(c: &mut Criterion) { + let rt = tokio::runtime::Runtime::new().expect("Failed to create runtime"); + let merger = Arc::new(Merger::::new()); + + c.bench_function("high_contention_100", |b| { + b.to_async(&rt).iter(|| { + let merger = Arc::clone(&merger); + async move { + let key = unique_key(); + let tasks: Vec<_> = (0..100) + .map(|_| { + let merger = Arc::clone(&merger); + let key = key.clone(); + tokio::spawn(async move { merger.execute(&key, || async { "value".to_string() }).await }) + }) + .collect(); + + for task in tasks { + task.await.expect("Task panicked").expect("Leader panicked"); + } + } + }); + }); +} + +/// Distributed load: 10 keys with 10 concurrent tasks each. +/// This exercises the hash map under concurrent access. +fn bench_distributed_keys(c: &mut Criterion) { + let rt = tokio::runtime::Runtime::new().expect("Failed to create runtime"); + let merger = Arc::new(Merger::::new()); + + c.bench_function("distributed_10x10", |b| { + b.to_async(&rt).iter(|| { + let merger = Arc::clone(&merger); + async move { + let prefix = KEY_COUNTER.fetch_add(1, Ordering::Relaxed); + let tasks: Vec<_> = (0..10) + .flat_map(|key_id| { + let merger = Arc::clone(&merger); + (0..10).map(move |_| { + let merger = Arc::clone(&merger); + let key = format!("key_{prefix}_{key_id}"); + tokio::spawn(async move { merger.execute(&key, || async { "value".to_string() }).await }) + }) + }) + .collect(); + + for task in tasks { + task.await.expect("Task panicked").expect("Leader panicked"); + } + } + }); + }); +} + +criterion_group!(benches, bench_single_call, bench_high_contention, bench_distributed_keys,); + +criterion_main!(benches); diff --git a/crates/uniflight/examples/cache_population.rs b/crates/uniflight/examples/cache_population.rs new file mode 100644 index 000000000..b1c6592eb --- /dev/null +++ b/crates/uniflight/examples/cache_population.rs @@ -0,0 +1,50 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +//! Demonstrates using `Merger` to prevent thundering herd when populating a cache. +//! +//! Multiple concurrent requests for the same cache key share a single execution. +//! The first request (leader) performs the work while others (followers) wait and +//! receive a clone of the result. + +use std::{ + sync::{ + Arc, + atomic::{AtomicUsize, Ordering}, + }, + time::Duration, +}; + +use uniflight::Merger; + +#[tokio::main] +async fn main() { + let merger = Arc::new(Merger::::new()); + let execution_count = Arc::new(AtomicUsize::new(0)); + + // Spawn 5 concurrent requests for the same key + let handles: Vec<_> = (0..5) + .map(|_| { + let merger = Arc::clone(&merger); + let counter = Arc::clone(&execution_count); + tokio::spawn(async move { + merger + .execute("user:123", || async { + counter.fetch_add(1, Ordering::SeqCst); + tokio::time::sleep(Duration::from_millis(100)).await; + "UserData { name: Alice }".to_string() + }) + .await + }) + }) + .collect(); + + // All requests complete with the same result + for handle in handles { + let result = handle.await.expect("task panicked"); + assert_eq!(result, Ok("UserData { name: Alice }".to_string())); + } + + // Work executed only once despite 5 concurrent requests + assert_eq!(execution_count.load(Ordering::SeqCst), 1); +} diff --git a/crates/uniflight/favicon.ico b/crates/uniflight/favicon.ico new file mode 100644 index 000000000..f1a3f34c1 --- /dev/null +++ b/crates/uniflight/favicon.ico @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:ed616c6fc5b1c95300147e226f28d1b88f193babdf3bb1669422a93f55339304 +size 15406 diff --git a/crates/uniflight/logo.png b/crates/uniflight/logo.png new file mode 100644 index 000000000..1562ae9f1 --- /dev/null +++ b/crates/uniflight/logo.png @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:6bf624b54edbaeb8bf0d961e895a86e09b18502fe6c761d00748317883dd09b8 +size 62560 diff --git a/crates/uniflight/src/lib.rs b/crates/uniflight/src/lib.rs new file mode 100644 index 000000000..6c74b6e72 --- /dev/null +++ b/crates/uniflight/src/lib.rs @@ -0,0 +1,627 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +//! Coalesces duplicate async tasks into a single execution. +//! +//! This crate provides [`Merger`], a mechanism for deduplicating concurrent async operations. +//! When multiple tasks request the same work (identified by a key), only the first task (the +//! "leader") performs the actual work while subsequent tasks (the "followers") wait and receive +//! a clone of the result. +//! +//! # When to Use +//! +//! Use `Merger` when you have expensive or rate-limited operations that may be requested +//! concurrently with the same parameters: +//! +//! - **Cache population**: Prevent thundering herd when a cache entry expires +//! - **API calls**: Deduplicate concurrent requests to the same endpoint +//! - **Database queries**: Coalesce identical queries issued simultaneously +//! - **File I/O**: Avoid reading the same file multiple times concurrently +//! +//! # Example +//! +//! ``` +//! use uniflight::Merger; +//! +//! # async fn example() { +//! let group: Merger = Merger::new(); +//! +//! // Multiple concurrent calls with the same key will share a single execution. +//! // Note: you can pass &str directly when the key type is String. +//! let result = group.execute("user:123", || async { +//! // This expensive operation runs only once, even if called concurrently +//! "expensive_result".to_string() +//! }).await.expect("leader should not panic"); +//! # } +//! ``` +//! +//! # Flexible Key Types +//! +//! The [`Merger::execute`] method accepts keys using [`Borrow`] semantics, allowing you to pass +//! borrowed forms of the key type. For example, with `Merger`, you can pass `&str` +//! directly without allocating: +//! +//! ``` +//! # use uniflight::Merger; +//! # async fn example() { +//! let merger: Merger = Merger::new(); +//! +//! // Pass &str directly - no need to call .to_string() +//! let result = merger.execute("my-key", || async { 42 }).await; +//! assert_eq!(result, Ok(42)); +//! # } +//! ``` +//! +//! # Thread-Aware Scoping +//! +//! `Merger` supports thread-aware scoping via a [`Strategy`] +//! type parameter. This controls how the internal state is partitioned across threads/NUMA nodes: +//! +//! - [`PerProcess`] (default): Single global state, maximum deduplication +//! - [`PerNuma`]: Separate state per NUMA node, NUMA-local memory access +//! - [`PerCore`]: Separate state per core, no deduplication (useful for already-partitioned work) +//! +//! ``` +//! use uniflight::Merger; +//! use thread_aware::PerNuma; +//! +//! # async fn example() { +//! // NUMA-aware merger - each NUMA node gets its own deduplication scope +//! let merger: Merger = Merger::new_per_numa(); +//! # } +//! ``` +//! +//! # Cancellation and Panic Handling +//! +//! `Merger` handles task cancellation and panics explicitly: +//! +//! - If the leader task is cancelled or dropped, a follower becomes the new leader +//! - If the leader task panics, followers receive [`LeaderPanicked`] error with the panic message +//! - Followers that join before the leader completes receive the value the leader returns +//! +//! When a panic occurs, followers are notified via the error type rather than silently +//! retrying. The panic message is captured and available via [`LeaderPanicked::message`]: +//! +//! ``` +//! # use uniflight::Merger; +//! # async fn example() { +//! let merger: Merger = Merger::new(); +//! match merger.execute("key", || async { "result".to_string() }).await { +//! Ok(value) => println!("got {value}"), +//! Err(err) => { +//! println!("leader panicked: {}", err.message()); +//! // Decide whether to retry +//! } +//! } +//! # } +//! ``` +//! +//! # Memory Management +//! +//! Completed entries are automatically removed from the internal map when the last caller +//! finishes. This ensures no stale entries accumulate over time. +//! +//! # Type Requirements +//! +//! The value type `T` must implement [`Clone`] because followers receive a clone of the +//! leader's result. The key type `K` must implement [`Hash`] and [`Eq`]. +//! +//! # Thread Safety +//! +//! [`Merger`] is `Send` and `Sync`, and can be shared across threads. The returned futures +//! are `Send` when the closure, future, key, and value types are `Send`. +//! +//! # Performance +//! +//! Run benchmarks with `cargo bench -p uniflight`. The suite covers: +//! +//! - `single_call`: Baseline latency with no contention +//! - `high_contention_100`: 100 concurrent tasks on the same key +//! - `distributed_10x10`: 10 keys with 10 tasks each +//! +//! Use `--save-baseline` and `--baseline` flags to track regressions over time. + +#![doc(html_logo_url = "https://media.githubusercontent.com/media/microsoft/oxidizer/refs/heads/main/crates/uniflight/logo.png")] +#![doc(html_favicon_url = "https://media.githubusercontent.com/media/microsoft/oxidizer/refs/heads/main/crates/uniflight/favicon.ico")] + +use std::{ + borrow::Borrow, + fmt::Debug, + hash::Hash, + panic::AssertUnwindSafe, + sync::{Arc, Weak}, +}; + +use ahash::RandomState; +use async_once_cell::OnceCell; +use dashmap::{ + DashMap, + Entry::{Occupied, Vacant}, +}; +use futures_util::FutureExt; // catch_unwind, map +use thread_aware::{ + Arc as TaArc, PerCore, PerNuma, PerProcess, ThreadAware, + affinity::{MemoryAffinity, PinnedAffinity}, + storage::Strategy, +}; + +/// Suppresses duplicate async operations identified by a key. +/// +/// The `S` type parameter controls the thread-aware scoping strategy: +/// - [`PerProcess`]: Single global scope (default, maximum deduplication) +/// - [`PerNuma`]: Per-NUMA-node scope (NUMA-local memory access) +/// - [`PerCore`]: Per-core scope (no deduplication) +pub struct Merger { + inner: TaArc>, RandomState>, S>, +} + +impl Debug for Merger { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Merger").field("inner", &format_args!("DashMap<...>")).finish() + } +} + +impl Clone for Merger { + fn clone(&self) -> Self { + Self { inner: self.inner.clone() } + } +} + +impl Default for Merger +where + K: Hash + Eq + Send + Sync + 'static, + T: Send + Sync + 'static, + S: Strategy, +{ + fn default() -> Self { + Self { + inner: TaArc::new(|| DashMap::with_hasher(RandomState::new())), + } + } +} + +impl Merger +where + K: Hash + Eq + Send + Sync + 'static, + T: Send + Sync + 'static, + S: Strategy, +{ + /// Creates a new `Merger` instance. + /// + /// The scoping strategy is determined by the type parameter `S`: + /// - [`PerProcess`] (default): Process-wide scope, maximum deduplication + /// - [`PerNuma`]: Per-NUMA-node scope, NUMA-local memory access + /// - [`PerCore`]: Per-core scope, no cross-core deduplication + /// + /// # Examples + /// + /// ``` + /// use uniflight::Merger; + /// use thread_aware::{PerNuma, PerCore}; + /// + /// // Default (PerProcess) - type can be inferred + /// let global: Merger = Merger::new(); + /// + /// // NUMA-local scope + /// let numa: Merger = Merger::new(); + /// + /// // Per-core scope + /// let core: Merger = Merger::new(); + /// ``` + #[inline] + #[must_use] + pub fn new() -> Self { + Self::default() + } +} + +impl Merger +where + K: Hash + Eq + Send + Sync + 'static, + T: Send + Sync + 'static, +{ + /// Creates a new `Merger` with process-wide scoping (default). + /// + /// All threads share a single deduplication scope, providing maximum + /// work deduplication across the entire process. + /// + /// # Example + /// + /// ``` + /// use uniflight::Merger; + /// + /// let merger = Merger::::new_per_process(); + /// ``` + #[inline] + #[must_use] + #[cfg_attr(test, mutants::skip)] // Equivalent mutant: delegates to Default + pub fn new_per_process() -> Self { + Self::default() + } +} + +impl Merger +where + K: Hash + Eq + Send + Sync + 'static, + T: Send + Sync + 'static, +{ + /// Creates a new `Merger` with per-NUMA-node scoping. + /// + /// Each NUMA node gets its own deduplication scope, ensuring memory + /// locality for cached results while still deduplicating within each node. + /// + /// # Example + /// + /// ``` + /// use uniflight::Merger; + /// + /// let merger = Merger::::new_per_numa(); + /// ``` + #[inline] + #[must_use] + #[cfg_attr(test, mutants::skip)] // Equivalent mutant: delegates to Default + pub fn new_per_numa() -> Self { + Self::default() + } +} + +impl Merger +where + K: Hash + Eq + Send + Sync + 'static, + T: Send + Sync + 'static, +{ + /// Creates a new `Merger` with per-core scoping. + /// + /// Each core gets its own deduplication scope. This is useful when work + /// is already partitioned by core and cross-core deduplication is not needed. + /// + /// # Example + /// + /// ``` + /// use uniflight::Merger; + /// + /// let merger = Merger::::new_per_core(); + /// ``` + #[inline] + #[must_use] + #[cfg_attr(test, mutants::skip)] // Equivalent mutant: delegates to Default + pub fn new_per_core() -> Self { + Self::default() + } +} + +impl Merger +where + K: Hash + Eq, +{ + /// Returns the number of in-flight operations. + #[cfg(test)] + fn len(&self) -> usize { + self.inner.len() + } + + /// Returns `true` if there are no in-flight operations. + #[cfg(test)] + fn is_empty(&self) -> bool { + self.inner.is_empty() + } +} + +impl ThreadAware for Merger +where + S: Strategy, +{ + fn relocated(self, source: MemoryAffinity, destination: PinnedAffinity) -> Self { + Self { + inner: self.inner.relocated(source, destination), + } + } +} + +impl Merger +where + K: Hash + Eq + Send + Sync, + T: Send + Sync, + S: Strategy + Send + Sync, +{ + /// Execute and return the value for a given function, making sure that only one + /// operation is in-flight at a given moment. If a duplicate call comes in, + /// that caller will wait until the leader completes and return the same value. + /// + /// # Errors + /// + /// Returns [`LeaderPanicked`] if the leader task panicked during execution. + /// Callers can retry by calling `execute` again if desired. + /// + /// # Example + /// + /// The key can be passed as any borrowed form of `K`. For example, if `K` is `String`, + /// you can pass `&str` directly: + /// + /// ``` + /// # use uniflight::Merger; + /// # async fn example() { + /// let merger: Merger = Merger::new(); + /// let result = merger.execute("my-key", || async { 42 }).await; + /// assert_eq!(result, Ok(42)); + /// # } + /// ``` + pub fn execute(&self, key: &Q, func: F) -> impl Future> + Send + use + where + K: Borrow, + Q: Hash + Eq + ToOwned + ?Sized, + F: FnOnce() -> Fut + Send, + Fut: Future + Send, + T: Clone, + { + // Clone the TaArc - the async block owns this clone + let inner = self.inner.clone(); + let cell = Self::get_or_create_cell(&inner, key); + let owned_key = key.to_owned(); + async move { + let result = cell.get_or_init(func()).await.clone(); + drop(cell); // Release our Arc before cleanup check + // Remove entry if no one else is using it (weak can't upgrade) + inner.remove_if(owned_key.borrow(), |_, weak| weak.upgrade().is_none()); + result + } + } + + /// Gets an existing cell for the key, or creates a new one. + fn get_or_create_cell(map: &DashMap>, RandomState>, key: &Q) -> Arc> + where + K: Borrow, + Q: Hash + Eq + ToOwned + ?Sized, + { + // Fast path: check if entry exists and is still valid + if let Some(entry) = map.get(key) + && let Some(cell) = entry.value().upgrade() + { + return cell; + } + + // Slow path: need to insert or replace expired entry + Self::insert_or_get_existing(map, key) + } + + /// Inserts a new cell or returns an existing live cell (handling races). + /// + /// This is the slow path of `get_or_create_cell`, separated for testability. + /// It handles the case where another thread may have inserted a cell between + /// our fast-path check and this insertion attempt. + fn insert_or_get_existing(map: &DashMap>, RandomState>, key: &Q) -> Arc> + where + K: Borrow, + Q: Hash + Eq + ToOwned + ?Sized, + { + let cell = Arc::new(PanicAwareCell::new()); + let weak = Arc::downgrade(&cell); + + // Use Entry enum to atomically check-and-return or insert + match map.entry(key.to_owned()) { + Occupied(mut entry) => { + // Entry exists - check if still alive + if let Some(existing) = entry.get().upgrade() { + // Another thread's cell is still alive - use it + return existing; + } + // Expired - replace with ours + entry.insert(weak); + } + Vacant(entry) => { + entry.insert(weak); + } + } + + // We inserted our cell, return it + cell + } +} + +/// Error returned when the leader task panicked during execution. +/// +/// When a leader task panics, followers receive this error instead of +/// silently retrying. Callers can decide whether to retry by calling +/// `execute` again. +/// +/// The panic message is captured and available via [`std::fmt::Display`] or [`LeaderPanicked::message`]. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct LeaderPanicked { + message: Arc, +} + +impl LeaderPanicked { + /// Returns the panic message from the leader task. + #[must_use] + pub fn message(&self) -> &str { + &self.message + } +} + +impl std::fmt::Display for LeaderPanicked { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "leader task panicked: {}", self.message) + } +} + +impl std::error::Error for LeaderPanicked {} + +/// Extracts a message from a panic payload. +/// +/// Tries to downcast to `&str` or `String`, falling back to a default message. +fn extract_panic_message(payload: &(dyn std::any::Any + Send)) -> Arc { + if let Some(s) = payload.downcast_ref::<&str>() { + return Arc::from(*s); + } + if let Some(s) = payload.downcast_ref::() { + return Arc::from(s.as_str()); + } + Arc::from("unknown panic") +} + +struct PanicAwareCell { + inner: OnceCell>, +} + +impl PanicAwareCell { + fn new() -> Self { + Self { inner: OnceCell::new() } + } + + #[expect(clippy::future_not_send, reason = "Send bounds enforced by Merger::execute")] + async fn get_or_init(&self, f: F) -> &Result + where + F: Future, + { + // Use map combinator instead of async block to avoid extra state machine + self.inner + .get_or_init(AssertUnwindSafe(f).catch_unwind().map(|result| { + result.map_err(|payload| LeaderPanicked { + message: extract_panic_message(&*payload), + }) + })) + .await + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::time::Duration; + use thread_aware::affinity::pinned_affinities; + + #[test] + fn relocated_delegates_to_inner() { + let affinities = pinned_affinities(&[2]); + let source = affinities[0].into(); + let destination = affinities[1]; + + let merger: Merger = Merger::new(); + let relocated = merger.relocated(source, destination); + + // Verify the relocated merger still works + assert!(relocated.is_empty()); + } + + #[test] + fn fast_path_returns_existing() { + let map: DashMap>, RandomState> = DashMap::with_hasher(RandomState::new()); + let existing_cell = Arc::new(PanicAwareCell::new()); + map.insert("key".to_string(), Arc::downgrade(&existing_cell)); + + let result = Merger::::get_or_create_cell(&map, "key"); + + assert!(Arc::ptr_eq(&result, &existing_cell)); + } + + #[test] + fn replaces_expired_entry() { + let map: DashMap>, RandomState> = DashMap::with_hasher(RandomState::new()); + let expired_weak = Arc::downgrade(&Arc::new(PanicAwareCell::::new())); + map.insert("key".to_string(), expired_weak); + + let result = Merger::::get_or_create_cell(&map, "key"); + + let entry = map.get("key").unwrap(); + assert!(Arc::ptr_eq(&result, &entry.value().upgrade().unwrap())); + } + + /// Simulates a race where another thread inserted between fast-path check and `entry()`. + #[test] + fn race_returns_existing() { + let map: DashMap>, RandomState> = DashMap::with_hasher(RandomState::new()); + let other_cell = Arc::new(PanicAwareCell::new()); + map.insert("key".to_string(), Arc::downgrade(&other_cell)); + + let result = Merger::::insert_or_get_existing(&map, "key"); + + assert!(Arc::ptr_eq(&result, &other_cell)); + } + + #[tokio::test] + async fn cleanup_after_completion() { + let group: Merger = Merger::new(); + assert!(group.is_empty()); + + // Single call should clean up after completion + let result = group.execute("key1", || async { "Result".to_string() }).await; + assert_eq!(result, Ok("Result".to_string())); + assert!(group.is_empty(), "Map should be empty after single call completes"); + + // Multiple concurrent calls should clean up after all complete + let futures: Vec<_> = (0..10) + .map(|_| { + group.execute("key2", || async { + tokio::time::sleep(Duration::from_millis(50)).await; + "Result".to_string() + }) + }) + .collect(); + + // While in flight, map should have an entry + assert_eq!(group.len(), 1); + + for fut in futures { + assert_eq!(fut.await, Ok("Result".to_string())); + } + + assert!(group.is_empty(), "Map should be empty after all concurrent calls complete"); + + // Multiple different keys should all be cleaned up + let fut1 = group.execute("a", || async { "A".to_string() }); + let fut2 = group.execute("b", || async { "B".to_string() }); + let fut3 = group.execute("c", || async { "C".to_string() }); + + assert_eq!(group.len(), 3); + + let (r1, r2, r3) = tokio::join!(fut1, fut2, fut3); + assert_eq!(r1, Ok("A".to_string())); + assert_eq!(r2, Ok("B".to_string())); + assert_eq!(r3, Ok("C".to_string())); + + assert!(group.is_empty(), "Map should be empty after all keys complete"); + } + + #[tokio::test] + async fn catch_unwind_works() { + // Verify that catch_unwind actually catches panics in async code + let result = AssertUnwindSafe(async { + panic!("test panic"); + #[expect(unreachable_code, reason = "Required to satisfy return type after panic")] + 42i32 + }) + .catch_unwind() + .await; + + assert!(result.is_err(), "catch_unwind should catch the panic"); + } + + #[tokio::test] + async fn panic_aware_cell_catches_panic() { + let cell = PanicAwareCell::::new(); + let result = cell + .get_or_init(async { + panic!("test panic"); + #[expect(unreachable_code, reason = "Required to satisfy return type after panic")] + "never".to_string() + }) + .await; + + let err = result.as_ref().unwrap_err(); + assert_eq!(err.message(), "test panic"); + } + + #[test] + fn extract_panic_message_from_string() { + let payload: Box = Box::new(String::from("owned string panic")); + let message = extract_panic_message(&*payload); + assert_eq!(&*message, "owned string panic"); + } + + #[test] + fn extract_panic_message_unknown_type() { + let payload: Box = Box::new(42i32); + let message = extract_panic_message(&*payload); + assert_eq!(&*message, "unknown panic"); + } +} diff --git a/crates/uniflight/tests/work.rs b/crates/uniflight/tests/work.rs new file mode 100644 index 000000000..5ecdf5f5b --- /dev/null +++ b/crates/uniflight/tests/work.rs @@ -0,0 +1,416 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +//! Integration tests for [`Merger::execute()`]. + +use std::{ + sync::{ + Arc, + atomic::{ + AtomicUsize, + Ordering::{AcqRel, Acquire}, + }, + }, + time::Duration, +}; + +use futures_util::{StreamExt, stream::FuturesUnordered}; +use uniflight::Merger; + +fn unreachable_future() -> std::future::Pending { + std::future::pending() +} + +#[tokio::test] +async fn direct_call() { + let group = Merger::::new_per_process(); + let result = group + .execute("key", || async { + tokio::time::sleep(Duration::from_millis(10)).await; + "Result".to_string() + }) + .await; + assert_eq!(result, Ok("Result".to_string())); +} + +#[tokio::test] +async fn parallel_call() { + let call_counter = AtomicUsize::default(); + + let group = Merger::::new_per_process(); + let futures = FuturesUnordered::new(); + for _ in 0..10 { + futures.push(group.execute("key", || async { + tokio::time::sleep(Duration::from_millis(100)).await; + call_counter.fetch_add(1, AcqRel); + "Result".to_string() + })); + } + + assert!(futures.all(|out| async move { out == Ok("Result".to_string()) }).await); + assert_eq!(call_counter.load(Acquire), 1); +} + +#[tokio::test] +async fn parallel_call_seq_await() { + let call_counter = AtomicUsize::default(); + + let group = Merger::::new_per_process(); + let mut futures = Vec::new(); + for _ in 0..10 { + futures.push(group.execute("key", || async { + tokio::time::sleep(Duration::from_millis(100)).await; + call_counter.fetch_add(1, AcqRel); + "Result".to_string() + })); + } + + for fut in futures { + assert_eq!(fut.await, Ok("Result".to_string())); + } + assert_eq!(call_counter.load(Acquire), 1); +} + +#[tokio::test] +async fn call_with_static_str_key() { + let group = Merger::::new_per_process(); + let result = group + .execute("key", || async { + tokio::time::sleep(Duration::from_millis(1)).await; + "Result".to_string() + }) + .await; + assert_eq!(result, Ok("Result".to_string())); +} + +#[tokio::test] +async fn call_with_static_string_key() { + let group = Merger::::new_per_process(); + let result = group + .execute("key", || async { + tokio::time::sleep(Duration::from_millis(1)).await; + "Result".to_string() + }) + .await; + assert_eq!(result, Ok("Result".to_string())); +} + +#[tokio::test] +async fn call_with_custom_key() { + #[derive(Clone, PartialEq, Eq, Hash)] + struct K(i32); + let group = Merger::::new_per_process(); + let result = group + .execute(&K(1), || async { + tokio::time::sleep(Duration::from_millis(1)).await; + "Result".to_string() + }) + .await; + assert_eq!(result, Ok("Result".to_string())); +} + +#[tokio::test] +async fn late_wait() { + let group = Merger::::new_per_process(); + let fut_early = group.execute("key", || async { + tokio::time::sleep(Duration::from_millis(20)).await; + "Result".to_string() + }); + let fut_late = group.execute("key", unreachable_future); + assert_eq!(fut_early.await, Ok("Result".to_string())); + tokio::time::sleep(Duration::from_millis(50)).await; + assert_eq!(fut_late.await, Ok("Result".to_string())); +} + +#[tokio::test] +async fn cancel() { + let group = Merger::::new_per_process(); + + // The executor was cancelled; the other awaiter will create a new future and execute. + let fut_cancel = group.execute(&"key".to_string(), unreachable_future); + let _ = tokio::time::timeout(Duration::from_millis(10), fut_cancel).await; + let fut_late = group.execute("key", || async { "Result2".to_string() }); + assert_eq!(fut_late.await, Ok("Result2".to_string())); + + // the first executer is slow but not dropped, so the result will be the first ones. + let begin = tokio::time::Instant::now(); + let fut_1 = group.execute("key", || async { + tokio::time::sleep(Duration::from_millis(2000)).await; + "Result1".to_string() + }); + let fut_2 = group.execute(&"key".to_string(), unreachable_future); + let (v1, v2) = tokio::join!(fut_1, fut_2); + assert_eq!(v1, Ok("Result1".to_string())); + assert_eq!(v2, Ok("Result1".to_string())); + assert!(begin.elapsed() > Duration::from_millis(1500)); +} + +#[tokio::test] +async fn leader_panic_returns_error_to_all() { + let group: Arc> = Arc::new(Merger::new()); + + // First task will panic (caught by catch_unwind) + let group_clone = Arc::clone(&group); + let leader_handle = tokio::spawn(async move { + group_clone + .execute("key", || async { + tokio::time::sleep(Duration::from_millis(50)).await; + panic!("leader panicked"); + #[expect(unreachable_code, reason = "Required to satisfy return type after panic")] + "never".to_string() + }) + .await + }); + + // Give time for the spawned task to register and start + tokio::time::sleep(Duration::from_millis(10)).await; + + // Second task joins as a follower + let group_clone = Arc::clone(&group); + let follower_handle = tokio::spawn(async move { + group_clone + .execute("key", || async { + // This should never run - we're a follower + "follower result".to_string() + }) + .await + }); + + // Leader gets LeaderPanicked error (panic is caught, not propagated) + let leader_result = leader_handle.await.expect("task should not panic - panic is caught"); + let Err(leader_err) = leader_result else { + panic!("expected Err, got Ok"); + }; + assert_eq!(leader_err.message(), "leader panicked"); + + // Follower also gets LeaderPanicked error with same message + let follower_result = follower_handle.await.expect("follower task should not panic"); + let Err(follower_err) = follower_result else { + panic!("expected Err, got Ok"); + }; + assert_eq!(follower_err.message(), "leader panicked"); +} + +#[tokio::test] +async fn debug_impl() { + let group: Merger = Merger::new(); + + // Test Debug on empty group + let debug_str = format!("{group:?}"); + assert!(debug_str.contains("Merger")); + + // Create a pending work item to populate the mapping + let fut = group.execute("key", || async { + tokio::time::sleep(Duration::from_millis(100)).await; + "Result".to_string() + }); + + // Debug should still work with entries in the mapping + let debug_str = format!("{group:?}"); + assert!(debug_str.contains("Merger")); + // The inner storage is a DashMap + assert!(debug_str.contains("DashMap")); + + // Complete the work + assert_eq!(fut.await, Ok("Result".to_string())); +} + +#[tokio::test] +async fn per_process_strategy() { + let group = Merger::::new_per_process(); + let result = group.execute("key", || async { "Result".to_string() }).await; + assert_eq!(result, Ok("Result".to_string())); +} + +#[tokio::test] +async fn per_numa_strategy() { + let group = Merger::::new_per_numa(); + let result = group.execute("key", || async { "Result".to_string() }).await; + assert_eq!(result, Ok("Result".to_string())); +} + +#[tokio::test] +async fn per_core_strategy() { + let group = Merger::::new_per_core(); + let result = group.execute("key", || async { "Result".to_string() }).await; + assert_eq!(result, Ok("Result".to_string())); +} + +#[tokio::test] +async fn clone_shares_state() { + let group1 = Merger::::new_per_process(); + let group2 = group1.clone(); + + let call_counter = AtomicUsize::default(); + + // Start work on clone 1 + let fut1 = group1.execute("key", || async { + tokio::time::sleep(Duration::from_millis(50)).await; + call_counter.fetch_add(1, AcqRel); + "Result".to_string() + }); + + // Clone 2 should join the same work + let fut2 = group2.execute("key", || async { + call_counter.fetch_add(1, AcqRel); + "Unreachable".to_string() + }); + + let (r1, r2) = tokio::join!(fut1, fut2); + assert_eq!(r1, Ok("Result".to_string())); + assert_eq!(r2, Ok("Result".to_string())); + // Work should only execute once + assert_eq!(call_counter.load(Acquire), 1); +} + +#[tokio::test] +async fn leader_panicked_error_traits() { + // Create an error by triggering a panic + let group: Merger = Merger::new(); + let result = group + .execute("key", || async { + panic!("test message"); + #[expect(unreachable_code, reason = "Required to satisfy return type after panic")] + "never".to_string() + }) + .await; + let Err(error) = result else { + panic!("expected Err"); + }; + + // Test message() + assert_eq!(error.message(), "test message"); + + // Test Display - includes the panic message + let display = format!("{error}"); + assert!(display.contains("leader task panicked")); + assert!(display.contains("test message")); + + // Test Debug + let debug_str = format!("{error:?}"); + assert!(debug_str.contains("LeaderPanicked")); + + // Test Clone + let cloned = error.clone(); + assert_eq!(cloned.message(), error.message()); + + // Test PartialEq and Eq + assert_eq!(error, cloned); + + // Test Error trait (can be used as a source error) + let std_error: &dyn std::error::Error = &error; + assert!(std_error.source().is_none()); +} + +#[tokio::test] +async fn retry_after_panic_succeeds() { + let group: Merger = Merger::new(); + + // First call panics + let result = group + .execute("key", || async { + panic!("intentional panic"); + #[expect(unreachable_code, reason = "Required to satisfy return type after panic")] + "never".to_string() + }) + .await; + let Err(err) = result else { + panic!("expected Err"); + }; + assert_eq!(err.message(), "intentional panic"); + + // Retry with the same key should succeed + let result = group.execute("key", || async { "success".to_string() }).await; + assert_eq!(result, Ok("success".to_string())); +} + +#[tokio::test] +async fn default_impl() { + // Test that Default::default() works the same as new() + let group1: Merger = Merger::default(); + let group2: Merger = Merger::new(); + + let result1 = group1.execute("key", || async { "value".to_string() }).await; + let result2 = group2.execute("key", || async { "value".to_string() }).await; + + assert_eq!(result1, Ok("value".to_string())); + assert_eq!(result2, Ok("value".to_string())); +} + +#[tokio::test] +async fn mixed_panic_and_success() { + let group: Merger = Merger::new(); + + // Start multiple keys concurrently - some panic, some succeed + let panic_fut = group.execute("panic_key", || async { + tokio::time::sleep(Duration::from_millis(10)).await; + panic!("intentional panic"); + #[expect(unreachable_code, reason = "Required to satisfy return type after panic")] + "never".to_string() + }); + + let success_fut = group.execute("success_key", || async { + tokio::time::sleep(Duration::from_millis(10)).await; + "success".to_string() + }); + + let (panic_result, success_result) = tokio::join!(panic_fut, success_fut); + + // Panic key returns error with message + let Err(err) = panic_result else { + panic!("expected Err"); + }; + assert_eq!(err.message(), "intentional panic"); + + // Success key returns value + assert_eq!(success_result, Ok("success".to_string())); +} + +#[tokio::test] +async fn follower_closure_not_called_on_panic() { + let group: Arc> = Arc::new(Merger::new()); + let follower_called = Arc::new(AtomicUsize::new(0)); + + // Leader will panic + let group_clone = Arc::clone(&group); + let leader_handle = tokio::spawn(async move { + group_clone + .execute("key", || async { + tokio::time::sleep(Duration::from_millis(50)).await; + panic!("leader panic"); + #[expect(unreachable_code, reason = "Required to satisfy return type after panic")] + "never".to_string() + }) + .await + }); + + // Give leader time to start + tokio::time::sleep(Duration::from_millis(10)).await; + + // Follower joins - its closure should NOT be called + let group_clone = Arc::clone(&group); + let follower_called_clone = Arc::clone(&follower_called); + let follower_handle = tokio::spawn(async move { + group_clone + .execute("key", || async { + follower_called_clone.fetch_add(1, Acquire); + "follower result".to_string() + }) + .await + }); + + let (leader_result, follower_result) = tokio::join!(leader_handle, follower_handle); + + let Err(leader_err) = leader_result.expect("task join") else { + panic!("expected Err"); + }; + assert_eq!(leader_err.message(), "leader panic"); + + let Err(follower_err) = follower_result.expect("task join") else { + panic!("expected Err"); + }; + assert_eq!(follower_err.message(), "leader panic"); + + // Follower's closure was never called + assert_eq!(follower_called.load(Acquire), 0); +}