-
Notifications
You must be signed in to change notification settings - Fork 17
feat: Add uniflight crate for duplicate request coalescing #118
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
54 commits
Select commit
Hold shift + click to select a range
e5fbae3
Initial commit of uniflight
schgoo 9eb29c0
Update docs
schgoo dc9c254
Merge branch 'main' into uniflight
schgoo e29c3f4
Fix clippy
schgoo eac8971
Merge branch 'uniflight' of https://github.com/schgoo/oxidizer into u…
schgoo 3f508a5
Add logo
schgoo b692e0e
Merge with main
schgoo ff628d0
Fix formatting
schgoo 8aa3ffb
Update favicon and code coverage
schgoo 6ce6e11
Update crates/uniflight/Cargo.toml
schgoo 76e0ede
Update crates/uniflight/Cargo.toml
schgoo e6e7458
Move tests to separate file
schgoo 1ca1d3e
Merge with main
schgoo bf25a1a
Use tick
schgoo e9697fa
Add N leaders functionality (for redundancy)
schgoo 1aa66c4
Merge branch 'uniflight' of https://github.com/schgoo/oxidizer into u…
schgoo af7f96c
Refactoring to allow followers to unlock in parallel
schgoo 9445788
Update to use async-once and dashmap. Update benchmarks
schgoo 0837809
Merge with main
schgoo 8efdde2
Fix comments
schgoo 9ef7039
Make it thread_aware
schgoo 85c5770
Merge with main
schgoo aeb1cb1
Fix merge issue
schgoo 4cc0a76
Fix external types check
schgoo 74cbc11
Fix pr issues
schgoo da0eebe
Fix pr issues
schgoo e0f88e5
cargo fmt
schgoo 575baf0
Fix clippy issues
schgoo e60a4df
Merge branch 'main' into uniflight
schgoo beaacdb
Fix PR issues
schgoo 4b81fe3
Merge branch 'uniflight' of https://github.com/schgoo/oxidizer into u…
schgoo bb97d17
Remove excessive comments. Improve code coverage
schgoo 693d224
Use tick::Clock::delay instead of tokio::time::sleep
schgoo dd3bd07
Merge branch 'main' into uniflight
schgoo fcde8fa
Merge branch 'uniflight' of https://github.com/schgoo/oxidizer into u…
schgoo e6b76f5
clippy
schgoo 7f28cfe
Rename work to execute. Remove unnecessary attribution
schgoo 61f8ce6
better logo
schgoo 455c7d2
Merge with main
schgoo dd104ea
Update readme
schgoo 5345752
Undo ToC changes to README.md
schgoo ab92d59
Improve benchmarks for incremental development. Remove thread_aware e…
schgoo f19b674
Update readme
schgoo 168acbb
Add thread aware types to external type check
schgoo 3792000
Clippy, fmt
schgoo 80d0766
Merge branch 'main' into uniflight
schgoo ae9ce04
Clippy, readme
schgoo d6dd361
Merge branch 'main' of https://github.com/microsoft/oxidizer into uni…
schgoo cb49236
Merge branch 'uniflight' of https://github.com/schgoo/oxidizer into u…
schgoo 321c353
Clippy, readme, surface task panics back to caller and all followers …
schgoo 760479c
Merge with main
schgoo 75cea13
CI failures
schgoo 828fc07
Merge with main
schgoo a4019d5
Merge branch 'main' into uniflight
schgoo File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -35,4 +35,5 @@ _manifest | |
| ARROW | ||
|
|
||
| # Agent files | ||
| .claude | ||
| .claude | ||
| CLAUDE.md | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,8 @@ | ||
| # Changelog | ||
|
|
||
| ## [0.1.0] - 2025-12-10 | ||
|
|
||
| - 🧩 Miscellaneous | ||
|
|
||
| - Initial commit of uniflight | ||
|
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,53 @@ | ||
| # Copyright (c) Microsoft Corporation. | ||
| # Licensed under the MIT License. | ||
|
|
||
| [package] | ||
| name = "uniflight" | ||
| description = "Coalesces duplicate async tasks into a single execution." | ||
| version = "0.1.0" | ||
| readme = "README.md" | ||
| keywords = ["oxidizer", "coalescing", "stempede", "singleflight", "deduplication"] | ||
| categories = ["concurrency"] | ||
|
|
||
| edition.workspace = true | ||
| rust-version.workspace = true | ||
| authors.workspace = true | ||
| license.workspace = true | ||
| homepage.workspace = true | ||
| repository.workspace = true | ||
|
|
||
| [package.metadata.cargo_check_external_types] | ||
| allowed_external_types = [ | ||
| "thread_aware::cell::builtin::PerProcess", | ||
| "thread_aware::cell::storage::Strategy", | ||
| "thread_aware::core::ThreadAware", | ||
| ] | ||
|
|
||
| [dependencies] | ||
| ahash = { workspace = true, default-features = false, features = ["std"] } | ||
| async-once-cell.workspace = true | ||
| dashmap.workspace = true | ||
| futures-util = { workspace = true, default-features = false, features = ["std", "alloc"] } | ||
| thread_aware.workspace = true | ||
|
|
||
| [dev-dependencies] | ||
| criterion = { workspace = true, features = ["async_tokio"] } | ||
| futures-util = { workspace = true, features = ["alloc", "std"] } | ||
| mutants.workspace = true | ||
| tick = { workspace = true, features = ["tokio"] } | ||
| tokio = { workspace = true, features = [ | ||
| "macros", | ||
| "rt", | ||
| "time", | ||
| "rt-multi-thread", | ||
| ] } | ||
|
|
||
| [lints] | ||
| workspace = true | ||
|
|
||
| [[bench]] | ||
| name = "performance" | ||
| harness = false | ||
|
|
||
| [[example]] | ||
| name = "cache_population" | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,145 @@ | ||
| <div align="center"> | ||
| <img src="./logo.png" alt="Uniflight Logo" width="96"> | ||
|
|
||
| # Uniflight | ||
|
|
||
| [](https://crates.io/crates/uniflight) | ||
| [](https://docs.rs/uniflight) | ||
| [](https://crates.io/crates/uniflight) | ||
| [](https://github.com/microsoft/oxidizer/actions/workflows/main.yml) | ||
| [](https://codecov.io/gh/microsoft/oxidizer) | ||
| [](../../LICENSE) | ||
| <a href="../.."><img src="../../logo.svg" alt="This crate was developed as part of the Oxidizer project" width="20"></a> | ||
|
|
||
| </div> | ||
|
|
||
| Coalesces duplicate async tasks into a single execution. | ||
|
|
||
| This crate provides [`Merger`][__link0], a mechanism for deduplicating concurrent async operations. | ||
| When multiple tasks request the same work (identified by a key), only the first task (the | ||
| “leader”) performs the actual work while subsequent tasks (the “followers”) wait and receive | ||
| a clone of the result. | ||
|
|
||
| ## When to Use | ||
|
|
||
| Use `Merger` when you have expensive or rate-limited operations that may be requested | ||
| concurrently with the same parameters: | ||
|
|
||
| * **Cache population**: Prevent thundering herd when a cache entry expires | ||
| * **API calls**: Deduplicate concurrent requests to the same endpoint | ||
| * **Database queries**: Coalesce identical queries issued simultaneously | ||
| * **File I/O**: Avoid reading the same file multiple times concurrently | ||
|
|
||
| ## Example | ||
|
|
||
| ```rust | ||
| use uniflight::Merger; | ||
|
|
||
| let group: Merger<String, String> = Merger::new(); | ||
|
|
||
| // Multiple concurrent calls with the same key will share a single execution. | ||
| // Note: you can pass &str directly when the key type is String. | ||
| let result = group.execute("user:123", || async { | ||
| // This expensive operation runs only once, even if called concurrently | ||
| "expensive_result".to_string() | ||
| }).await.expect("leader should not panic"); | ||
| ``` | ||
|
|
||
| ## Flexible Key Types | ||
|
|
||
| The [`Merger::execute`][__link1] method accepts keys using [`Borrow`][__link2] semantics, allowing you to pass | ||
| borrowed forms of the key type. For example, with `Merger<String, T>`, you can pass `&str` | ||
| directly without allocating: | ||
|
|
||
| ```rust | ||
| let merger: Merger<String, i32> = Merger::new(); | ||
|
|
||
| // Pass &str directly - no need to call .to_string() | ||
| let result = merger.execute("my-key", || async { 42 }).await; | ||
| assert_eq!(result, Ok(42)); | ||
| ``` | ||
|
|
||
| ## Thread-Aware Scoping | ||
|
|
||
| `Merger` supports thread-aware scoping via a [`Strategy`][__link3] | ||
| type parameter. This controls how the internal state is partitioned across threads/NUMA nodes: | ||
|
|
||
| * [`PerProcess`][__link4] (default): Single global state, maximum deduplication | ||
| * [`PerNuma`][__link5]: Separate state per NUMA node, NUMA-local memory access | ||
| * [`PerCore`][__link6]: Separate state per core, no deduplication (useful for already-partitioned work) | ||
|
|
||
| ```rust | ||
| use uniflight::Merger; | ||
| use thread_aware::PerNuma; | ||
|
|
||
| // NUMA-aware merger - each NUMA node gets its own deduplication scope | ||
| let merger: Merger<String, String, PerNuma> = Merger::new_per_numa(); | ||
| ``` | ||
|
|
||
| ## Cancellation and Panic Handling | ||
|
|
||
| `Merger` handles task cancellation and panics explicitly: | ||
|
|
||
| * If the leader task is cancelled or dropped, a follower becomes the new leader | ||
| * If the leader task panics, followers receive [`LeaderPanicked`][__link7] error with the panic message | ||
| * Followers that join before the leader completes receive the value the leader returns | ||
|
|
||
| When a panic occurs, followers are notified via the error type rather than silently | ||
| retrying. The panic message is captured and available via [`LeaderPanicked::message`][__link8]: | ||
|
|
||
| ```rust | ||
| let merger: Merger<String, String> = Merger::new(); | ||
| match merger.execute("key", || async { "result".to_string() }).await { | ||
| Ok(value) => println!("got {value}"), | ||
| Err(err) => { | ||
| println!("leader panicked: {}", err.message()); | ||
| // Decide whether to retry | ||
| } | ||
| } | ||
| ``` | ||
|
|
||
| ## Memory Management | ||
|
|
||
| Completed entries are automatically removed from the internal map when the last caller | ||
| finishes. This ensures no stale entries accumulate over time. | ||
|
|
||
| ## Type Requirements | ||
|
|
||
| The value type `T` must implement [`Clone`][__link9] because followers receive a clone of the | ||
| leader’s result. The key type `K` must implement [`Hash`][__link10] and [`Eq`][__link11]. | ||
|
|
||
| ## Thread Safety | ||
|
|
||
| [`Merger`][__link12] is `Send` and `Sync`, and can be shared across threads. The returned futures | ||
| are `Send` when the closure, future, key, and value types are `Send`. | ||
|
|
||
| ## Performance | ||
|
|
||
| Run benchmarks with `cargo bench -p uniflight`. The suite covers: | ||
|
|
||
| * `single_call`: Baseline latency with no contention | ||
| * `high_contention_100`: 100 concurrent tasks on the same key | ||
| * `distributed_10x10`: 10 keys with 10 tasks each | ||
|
|
||
| Use `--save-baseline` and `--baseline` flags to track regressions over time. | ||
|
|
||
|
|
||
| <hr/> | ||
| <sub> | ||
| This crate was developed as part of <a href="../..">The Oxidizer Project</a>. Browse this crate's <a href="https://github.com/microsoft/oxidizer/tree/main/crates/uniflight">source code</a>. | ||
| </sub> | ||
|
|
||
| [__cargo_doc2readme_dependencies_info]: ggGkYW0CYXSEGy4k8ldDFPOhG2VNeXtD5nnKG6EPY6OfW5wBG8g18NOFNdxpYXKEGxgwNFq9VUtfG5xaBNm6U4VGG97W2YkyKkPjG4KVgSbTgdOrYWSCgmx0aHJlYWRfYXdhcmVlMC42LjGCaXVuaWZsaWdodGUwLjEuMA | ||
| [__link0]: https://docs.rs/uniflight/0.1.0/uniflight/struct.Merger.html | ||
| [__link1]: https://docs.rs/uniflight/0.1.0/uniflight/?search=Merger::execute | ||
| [__link10]: https://doc.rust-lang.org/stable/std/?search=hash::Hash | ||
| [__link11]: https://doc.rust-lang.org/stable/std/cmp/trait.Eq.html | ||
| [__link12]: https://docs.rs/uniflight/0.1.0/uniflight/struct.Merger.html | ||
| [__link2]: https://doc.rust-lang.org/stable/std/?search=borrow::Borrow | ||
| [__link3]: https://docs.rs/thread_aware/0.6.1/thread_aware/?search=storage::Strategy | ||
| [__link4]: https://docs.rs/thread_aware/0.6.1/thread_aware/?search=PerProcess | ||
| [__link5]: https://docs.rs/thread_aware/0.6.1/thread_aware/?search=PerNuma | ||
| [__link6]: https://docs.rs/thread_aware/0.6.1/thread_aware/?search=PerCore | ||
| [__link7]: https://docs.rs/uniflight/0.1.0/uniflight/struct.LeaderPanicked.html | ||
| [__link8]: https://docs.rs/uniflight/0.1.0/uniflight/?search=LeaderPanicked::message | ||
| [__link9]: https://doc.rust-lang.org/stable/std/clone/trait.Clone.html |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.