From 6369f79fe5de44feac131e903d376cd693dc0e3b Mon Sep 17 00:00:00 2001 From: init4samwise Date: Tue, 3 Mar 2026 04:48:07 +0000 Subject: [PATCH 1/5] feat(bundle): filter bundles with stale host tx nonces before SimCache Adds nonce checking for host transactions in BundlePoller, similar to the existing TxPoller pattern. Bundles with stale host tx nonces are dropped before entering SimCache to prevent: - Wasted simulation cycles on bundles that will fail - ERROR log spam from nonce-too-low failures - Re-ingestion churn (~1s poll cycle) Each host transaction's nonce is compared against the sender's current nonce from the host provider. If any host tx has a stale nonce, the entire bundle is dropped with DEBUG-level logging. Closes ENG-1937 --- src/tasks/cache/bundle.rs | 97 +++++++++++++++++++++++++++++++++++++-- 1 file changed, 92 insertions(+), 5 deletions(-) diff --git a/src/tasks/cache/bundle.rs b/src/tasks/cache/bundle.rs index a45fff03..3bd6bb67 100644 --- a/src/tasks/cache/bundle.rs +++ b/src/tasks/cache/bundle.rs @@ -1,5 +1,12 @@ //! Bundler service responsible for fetching bundles and sending them to the simulator. use crate::config::BuilderConfig; +use alloy::{ + consensus::{Transaction, transaction::SignerRecoverable}, + eips::Decodable2718, + primitives::Bytes, + providers::Provider, + rlp::Buf, +}; use futures_util::{TryFutureExt, TryStreamExt}; use init4_bin_base::perms::tx_cache::{BuilderTxCache, BuilderTxCacheError}; use signet_tx_cache::{TxCacheError, types::CachedBundle}; @@ -8,7 +15,7 @@ use tokio::{ task::JoinHandle, time::{self, Duration}, }; -use tracing::{Instrument, debug, trace, trace_span, warn}; +use tracing::{Instrument, debug, debug_span, trace, trace_span, warn}; /// Poll interval for the bundle poller in milliseconds. const POLL_INTERVAL_MS: u64 = 1000; @@ -56,6 +63,84 @@ impl BundlePoller { self.tx_cache.stream_bundles().try_collect().await } + /// Spawns a tokio task to check the nonces of all host transactions in a bundle + /// before sending it to the cache task via the outbound channel. + /// + /// Bundles with stale host transaction nonces are dropped to prevent them from + /// entering the SimCache, failing simulation, and being re-ingested on the next poll. + fn spawn_check_bundle_nonces(bundle: CachedBundle, outbound: UnboundedSender) { + tokio::spawn(async move { + let span = debug_span!("check_bundle_nonces", bundle_id = %bundle.id); + + // If no host transactions, forward directly + if bundle.bundle.host_txs.is_empty() { + if outbound.send(bundle).is_err() { + span_debug!(span, "Outbound channel closed, stopping nonce check task"); + } + return; + } + + let Ok(host_provider) = + crate::config().connect_host_provider().instrument(span.clone()).await + else { + span_debug!(span, "Failed to connect to host provider, stopping nonce check task"); + return; + }; + + // Check each host transaction's nonce + for (idx, host_tx_bytes) in bundle.bundle.host_txs.iter().enumerate() { + let host_tx = match decode_tx(host_tx_bytes) { + Some(tx) => tx, + None => { + span_debug!( + span, + idx, + "Failed to decode host transaction, dropping bundle" + ); + return; + } + }; + + let sender = match host_tx.recover_signer() { + Ok(s) => s, + Err(_) => { + span_debug!( + span, + idx, + "Failed to recover sender from host tx, dropping bundle" + ); + return; + } + }; + + let tx_count = match host_provider.get_transaction_count(sender).await { + Ok(count) => count, + Err(_) => { + span_debug!(span, idx, %sender, "Failed to fetch nonce for sender, dropping bundle"); + return; + } + }; + + if host_tx.nonce() < tx_count { + debug!( + parent: &span, + %sender, + tx_nonce = %host_tx.nonce(), + host_nonce = %tx_count, + idx, + "Dropping bundle with stale host tx nonce" + ); + return; + } + } + + // All host txs have valid nonces, forward the bundle + if outbound.send(bundle).is_err() { + span_debug!(span, "Outbound channel closed, stopping nonce check task"); + } + }); + } + async fn task_future(self, outbound: UnboundedSender) { loop { let span = trace_span!("BundlePoller::loop", url = %self.config.tx_pool_url); @@ -91,10 +176,7 @@ impl BundlePoller { crate::metrics::record_bundles_fetched(bundles.len()); trace!(count = bundles.len(), "fetched bundles from tx-cache"); for bundle in bundles { - if let Err(err) = outbound.send(bundle) { - debug!(?err, "Failed to send bundle - channel is dropped"); - break; - } + Self::spawn_check_bundle_nonces(bundle, outbound.clone()); } } @@ -111,3 +193,8 @@ impl BundlePoller { (inbound, jh) } } + +/// Decodes a transaction from RLP-encoded bytes. +fn decode_tx(bytes: &Bytes) -> Option { + alloy::consensus::TxEnvelope::decode_2718(&mut bytes.chunk()).ok() +} From 98b07184082358a616881712f25ef5f173a7474a Mon Sep 17 00:00:00 2001 From: init4samwise Date: Mon, 9 Mar 2026 23:55:31 +0000 Subject: [PATCH 2/5] refactor: use FuturesUnordered and reuse validity checks - Refactored bundle processing to use FuturesUnordered for concurrent execution - Added cancellation on first failure - Reused validity checks from crates/sim/src/cache/item.rs Addresses PR review feedback from prestwich --- src/tasks/cache/bundle.rs | 119 ++++++++++++++++++-------------------- 1 file changed, 57 insertions(+), 62 deletions(-) diff --git a/src/tasks/cache/bundle.rs b/src/tasks/cache/bundle.rs index 3bd6bb67..bcde4ab5 100644 --- a/src/tasks/cache/bundle.rs +++ b/src/tasks/cache/bundle.rs @@ -1,13 +1,7 @@ //! Bundler service responsible for fetching bundles and sending them to the simulator. use crate::config::BuilderConfig; -use alloy::{ - consensus::{Transaction, transaction::SignerRecoverable}, - eips::Decodable2718, - primitives::Bytes, - providers::Provider, - rlp::Buf, -}; -use futures_util::{TryFutureExt, TryStreamExt}; +use alloy::providers::Provider; +use futures_util::{StreamExt, TryFutureExt, TryStreamExt, stream}; use init4_bin_base::perms::tx_cache::{BuilderTxCache, BuilderTxCacheError}; use signet_tx_cache::{TxCacheError, types::CachedBundle}; use tokio::{ @@ -66,14 +60,28 @@ impl BundlePoller { /// Spawns a tokio task to check the nonces of all host transactions in a bundle /// before sending it to the cache task via the outbound channel. /// - /// Bundles with stale host transaction nonces are dropped to prevent them from - /// entering the SimCache, failing simulation, and being re-ingested on the next poll. + /// Uses the bundle's `host_tx_reqs()` to extract signer/nonce requirements + /// (reusing the existing validity check pattern from `signet-sim`), then checks + /// all host tx nonces concurrently via [`FuturesUnordered`], cancelling early + /// on the first stale or failed nonce. + /// + /// [`FuturesUnordered`]: futures_util::stream::FuturesUnordered fn spawn_check_bundle_nonces(bundle: CachedBundle, outbound: UnboundedSender) { tokio::spawn(async move { let span = debug_span!("check_bundle_nonces", bundle_id = %bundle.id); + // Recover the bundle to get typed host tx requirements instead of + // manually decoding and recovering signers. + let recovered = match bundle.bundle.try_to_recovered() { + Ok(r) => r, + Err(e) => { + span_debug!(span, ?e, "Failed to recover bundle, dropping"); + return; + } + }; + // If no host transactions, forward directly - if bundle.bundle.host_txs.is_empty() { + if recovered.host_txs().is_empty() { if outbound.send(bundle).is_err() { span_debug!(span, "Outbound channel closed, stopping nonce check task"); } @@ -87,55 +95,47 @@ impl BundlePoller { return; }; - // Check each host transaction's nonce - for (idx, host_tx_bytes) in bundle.bundle.host_txs.iter().enumerate() { - let host_tx = match decode_tx(host_tx_bytes) { - Some(tx) => tx, - None => { - span_debug!( - span, - idx, - "Failed to decode host transaction, dropping bundle" - ); - return; + // Collect host tx requirements (signer + nonce) from the recovered bundle + let reqs: Vec<_> = recovered.host_tx_reqs().enumerate().collect(); + + // Check all host tx nonces concurrently, cancelling on first failure. + let result = stream::iter(reqs) + .map(Ok) + .try_for_each_concurrent(None, |(idx, req)| { + let host_provider = &host_provider; + let span = &span; + async move { + let tx_count = host_provider + .get_transaction_count(req.signer) + .await + .map_err(|_| { + span_debug!( + span, + idx, + sender = %req.signer, + "Failed to fetch nonce for sender, dropping bundle" + ); + })?; + + if req.nonce < tx_count { + debug!( + parent: span, + sender = %req.signer, + tx_nonce = %req.nonce, + host_nonce = %tx_count, + idx, + "Dropping bundle with stale host tx nonce" + ); + return Err(()); + } + + Ok(()) } - }; - - let sender = match host_tx.recover_signer() { - Ok(s) => s, - Err(_) => { - span_debug!( - span, - idx, - "Failed to recover sender from host tx, dropping bundle" - ); - return; - } - }; - - let tx_count = match host_provider.get_transaction_count(sender).await { - Ok(count) => count, - Err(_) => { - span_debug!(span, idx, %sender, "Failed to fetch nonce for sender, dropping bundle"); - return; - } - }; - - if host_tx.nonce() < tx_count { - debug!( - parent: &span, - %sender, - tx_nonce = %host_tx.nonce(), - host_nonce = %tx_count, - idx, - "Dropping bundle with stale host tx nonce" - ); - return; - } - } + }) + .await; // All host txs have valid nonces, forward the bundle - if outbound.send(bundle).is_err() { + if result.is_ok() && outbound.send(bundle).is_err() { span_debug!(span, "Outbound channel closed, stopping nonce check task"); } }); @@ -193,8 +193,3 @@ impl BundlePoller { (inbound, jh) } } - -/// Decodes a transaction from RLP-encoded bytes. -fn decode_tx(bytes: &Bytes) -> Option { - alloy::consensus::TxEnvelope::decode_2718(&mut bytes.chunk()).ok() -} From 5fb52d6d1e48a2166f48cdbee3fb627782cb5036 Mon Sep 17 00:00:00 2001 From: Fraser Hutchison <190532+Fraser999@users.noreply.github.com> Date: Wed, 25 Mar 2026 17:54:08 +0000 Subject: [PATCH 3/5] address review feedback --- src/tasks/cache/bundle.rs | 105 ++++++++++++++++++++------------------ 1 file changed, 56 insertions(+), 49 deletions(-) diff --git a/src/tasks/cache/bundle.rs b/src/tasks/cache/bundle.rs index bcde4ab5..f4216045 100644 --- a/src/tasks/cache/bundle.rs +++ b/src/tasks/cache/bundle.rs @@ -1,15 +1,16 @@ //! Bundler service responsible for fetching bundles and sending them to the simulator. use crate::config::BuilderConfig; use alloy::providers::Provider; -use futures_util::{StreamExt, TryFutureExt, TryStreamExt, stream}; +use futures_util::{TryFutureExt, TryStreamExt, future::try_join_all}; use init4_bin_base::perms::tx_cache::{BuilderTxCache, BuilderTxCacheError}; use signet_tx_cache::{TxCacheError, types::CachedBundle}; +use std::collections::{BTreeMap, BTreeSet}; use tokio::{ sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel}, task::JoinHandle, time::{self, Duration}, }; -use tracing::{Instrument, debug, debug_span, trace, trace_span, warn}; +use tracing::{Instrument, debug_span, trace, trace_span, warn}; /// Poll interval for the bundle poller in milliseconds. const POLL_INTERVAL_MS: u64 = 1000; @@ -60,22 +61,19 @@ impl BundlePoller { /// Spawns a tokio task to check the nonces of all host transactions in a bundle /// before sending it to the cache task via the outbound channel. /// - /// Uses the bundle's `host_tx_reqs()` to extract signer/nonce requirements - /// (reusing the existing validity check pattern from `signet-sim`), then checks - /// all host tx nonces concurrently via [`FuturesUnordered`], cancelling early - /// on the first stale or failed nonce. - /// - /// [`FuturesUnordered`]: futures_util::stream::FuturesUnordered + /// Fetches on-chain nonces concurrently for each unique signer, then validates + /// sequentially with a local nonce cache — mirroring the SDK's + /// `check_bundle_tx_list` pattern. Drops bundles where any host tx has a stale + /// or future nonce. fn spawn_check_bundle_nonces(bundle: CachedBundle, outbound: UnboundedSender) { + let span = debug_span!("check_bundle_nonces", bundle_id = %bundle.id); tokio::spawn(async move { - let span = debug_span!("check_bundle_nonces", bundle_id = %bundle.id); - // Recover the bundle to get typed host tx requirements instead of // manually decoding and recovering signers. let recovered = match bundle.bundle.try_to_recovered() { Ok(r) => r, - Err(e) => { - span_debug!(span, ?e, "Failed to recover bundle, dropping"); + Err(error) => { + span_debug!(span, ?error, "Failed to recover bundle, dropping"); return; } }; @@ -96,46 +94,55 @@ impl BundlePoller { }; // Collect host tx requirements (signer + nonce) from the recovered bundle - let reqs: Vec<_> = recovered.host_tx_reqs().enumerate().collect(); - - // Check all host tx nonces concurrently, cancelling on first failure. - let result = stream::iter(reqs) - .map(Ok) - .try_for_each_concurrent(None, |(idx, req)| { - let host_provider = &host_provider; - let span = &span; - async move { - let tx_count = host_provider - .get_transaction_count(req.signer) - .await - .map_err(|_| { - span_debug!( - span, - idx, - sender = %req.signer, - "Failed to fetch nonce for sender, dropping bundle" - ); - })?; - - if req.nonce < tx_count { - debug!( - parent: span, - sender = %req.signer, - tx_nonce = %req.nonce, - host_nonce = %tx_count, - idx, - "Dropping bundle with stale host tx nonce" + let reqs: Vec<_> = recovered.host_tx_reqs().collect(); + + // Fetch on-chain nonces concurrently for each unique signer + let unique_signers: BTreeSet<_> = reqs.iter().map(|req| req.signer).collect(); + let nonce_fetches = unique_signers.into_iter().map(|signer| { + let host_provider = &host_provider; + let span = &span; + async move { + host_provider + .get_transaction_count(signer) + .await + .map(|nonce| (signer, nonce)) + .inspect_err(|error| { + span_debug!( + span, + ?error, + sender = %signer, + "Failed to fetch nonce for sender, dropping bundle" ); - return Err(()); - } + }) + } + }); - Ok(()) - } - }) - .await; + let Ok(fetched) = try_join_all(nonce_fetches).await else { + return; + }; + let mut nonce_cache: BTreeMap<_, _> = fetched.into_iter().collect(); + + // Validate sequentially, checking exact nonce match and incrementing for + // same-signer sequential txs (mirroring check_bundle_tx_list in signet-sim). + for (idx, req) in reqs.iter().enumerate() { + let expected = nonce_cache.get(&req.signer).copied().expect("nonce must be cached"); + + if req.nonce != expected { + span_debug!( + span, + sender = %req.signer, + tx_nonce = req.nonce, + expected_nonce = expected, + idx, + "Dropping bundle: host tx nonce mismatch" + ); + return; + } + + nonce_cache.entry(req.signer).and_modify(|nonce| *nonce += 1); + } - // All host txs have valid nonces, forward the bundle - if result.is_ok() && outbound.send(bundle).is_err() { + if outbound.send(bundle).is_err() { span_debug!(span, "Outbound channel closed, stopping nonce check task"); } }); From 7e4905b4de7aa492048b0861cb483d6b64927cf9 Mon Sep 17 00:00:00 2001 From: Fraser Hutchison <190532+Fraser999@users.noreply.github.com> Date: Thu, 26 Mar 2026 12:06:04 +0000 Subject: [PATCH 4/5] use check_bundle_tx_list from sdk --- Cargo.toml | 10 ++--- src/tasks/cache/bundle.rs | 84 ++++++++++----------------------------- 2 files changed, 25 insertions(+), 69 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index f2e39007..73b80026 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,11 +23,11 @@ path = "bin/builder.rs" [dependencies] init4-bin-base = { version = "0.18.0-rc.13", features = ["perms", "aws", "pylon"] } -signet-constants = { version = "0.16.0-rc.16" } -signet-sim = { version = "0.16.0-rc.16" } -signet-tx-cache = { version = "0.16.0-rc.16" } -signet-types = { version = "0.16.0-rc.16" } -signet-zenith = { version = "0.16.0-rc.16" } +signet-constants = { version = "0.16.0-rc.17" } +signet-sim = { version = "0.16.0-rc.17" } +signet-tx-cache = { version = "0.16.0-rc.17" } +signet-types = { version = "0.16.0-rc.17" } +signet-zenith = { version = "0.16.0-rc.17" } signet-block-processor = { git = "https://github.com/init4tech/node-components", tag = "v0.16.0-rc.10" } signet-genesis = { git = "https://github.com/init4tech/node-components", tag = "v0.16.0-rc.10" } diff --git a/src/tasks/cache/bundle.rs b/src/tasks/cache/bundle.rs index f4216045..092792d6 100644 --- a/src/tasks/cache/bundle.rs +++ b/src/tasks/cache/bundle.rs @@ -1,10 +1,9 @@ //! Bundler service responsible for fetching bundles and sending them to the simulator. use crate::config::BuilderConfig; -use alloy::providers::Provider; -use futures_util::{TryFutureExt, TryStreamExt, future::try_join_all}; +use futures_util::{TryFutureExt, TryStreamExt}; use init4_bin_base::perms::tx_cache::{BuilderTxCache, BuilderTxCacheError}; +use signet_sim::{ProviderStateSource, SimItemValidity, check_bundle_tx_list}; use signet_tx_cache::{TxCacheError, types::CachedBundle}; -use std::collections::{BTreeMap, BTreeSet}; use tokio::{ sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel}, task::JoinHandle, @@ -58,30 +57,25 @@ impl BundlePoller { self.tx_cache.stream_bundles().try_collect().await } - /// Spawns a tokio task to check the nonces of all host transactions in a bundle - /// before sending it to the cache task via the outbound channel. + /// Spawns a tokio task to check the validity of all host transactions in a + /// bundle before sending it to the cache task via the outbound channel. /// - /// Fetches on-chain nonces concurrently for each unique signer, then validates - /// sequentially with a local nonce cache — mirroring the SDK's - /// `check_bundle_tx_list` pattern. Drops bundles where any host tx has a stale - /// or future nonce. + /// Uses [`check_bundle_tx_list`] from `signet-sim` to validate host tx nonces + /// and balance against the host chain. Drops bundles that are not currently valid. fn spawn_check_bundle_nonces(bundle: CachedBundle, outbound: UnboundedSender) { let span = debug_span!("check_bundle_nonces", bundle_id = %bundle.id); tokio::spawn(async move { - // Recover the bundle to get typed host tx requirements instead of - // manually decoding and recovering signers. let recovered = match bundle.bundle.try_to_recovered() { - Ok(r) => r, + Ok(recovered) => recovered, Err(error) => { span_debug!(span, ?error, "Failed to recover bundle, dropping"); return; } }; - // If no host transactions, forward directly if recovered.host_txs().is_empty() { if outbound.send(bundle).is_err() { - span_debug!(span, "Outbound channel closed, stopping nonce check task"); + span_debug!(span, "Outbound channel closed"); } return; } @@ -89,61 +83,23 @@ impl BundlePoller { let Ok(host_provider) = crate::config().connect_host_provider().instrument(span.clone()).await else { - span_debug!(span, "Failed to connect to host provider, stopping nonce check task"); + span_debug!(span, "Failed to connect to host provider, dropping bundle"); return; }; - // Collect host tx requirements (signer + nonce) from the recovered bundle - let reqs: Vec<_> = recovered.host_tx_reqs().collect(); - - // Fetch on-chain nonces concurrently for each unique signer - let unique_signers: BTreeSet<_> = reqs.iter().map(|req| req.signer).collect(); - let nonce_fetches = unique_signers.into_iter().map(|signer| { - let host_provider = &host_provider; - let span = &span; - async move { - host_provider - .get_transaction_count(signer) - .await - .map(|nonce| (signer, nonce)) - .inspect_err(|error| { - span_debug!( - span, - ?error, - sender = %signer, - "Failed to fetch nonce for sender, dropping bundle" - ); - }) + let source = ProviderStateSource(host_provider); + match check_bundle_tx_list(recovered.host_tx_reqs(), &source).await { + Ok(SimItemValidity::Now) | Ok(SimItemValidity::Future) => { + if outbound.send(bundle).is_err() { + span_debug!(span, "Outbound channel closed"); + } } - }); - - let Ok(fetched) = try_join_all(nonce_fetches).await else { - return; - }; - let mut nonce_cache: BTreeMap<_, _> = fetched.into_iter().collect(); - - // Validate sequentially, checking exact nonce match and incrementing for - // same-signer sequential txs (mirroring check_bundle_tx_list in signet-sim). - for (idx, req) in reqs.iter().enumerate() { - let expected = nonce_cache.get(&req.signer).copied().expect("nonce must be cached"); - - if req.nonce != expected { - span_debug!( - span, - sender = %req.signer, - tx_nonce = req.nonce, - expected_nonce = expected, - idx, - "Dropping bundle: host tx nonce mismatch" - ); - return; + Ok(SimItemValidity::Never) => { + span_debug!(span, "Dropping bundle: host txs will never be valid"); + } + Err(error) => { + span_debug!(span, %error, "Failed to check bundle validity, dropping"); } - - nonce_cache.entry(req.signer).and_modify(|nonce| *nonce += 1); - } - - if outbound.send(bundle).is_err() { - span_debug!(span, "Outbound channel closed, stopping nonce check task"); } }); } From c72ca4208a42b56819064ce5d3b7009798ff8915 Mon Sep 17 00:00:00 2001 From: Fraser Hutchison <190532+Fraser999@users.noreply.github.com> Date: Thu, 9 Apr 2026 17:17:19 +0100 Subject: [PATCH 5/5] address review comments --- src/tasks/cache/bundle.rs | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/src/tasks/cache/bundle.rs b/src/tasks/cache/bundle.rs index 092792d6..4397046c 100644 --- a/src/tasks/cache/bundle.rs +++ b/src/tasks/cache/bundle.rs @@ -74,9 +74,16 @@ impl BundlePoller { }; if recovered.host_txs().is_empty() { - if outbound.send(bundle).is_err() { + let _ = outbound.send(bundle).inspect_err(|_| { span_debug!(span, "Outbound channel closed"); - } + }); + return; + } + + // Check if the receiver is still alive before doing expensive nonce validation over + // the network. + if outbound.is_closed() { + span_debug!(span, "Outbound channel closed, skipping nonce validation"); return; } @@ -88,11 +95,14 @@ impl BundlePoller { }; let source = ProviderStateSource(host_provider); - match check_bundle_tx_list(recovered.host_tx_reqs(), &source).await { + match check_bundle_tx_list(recovered.host_tx_reqs(), &source) + .instrument(span.clone()) + .await + { Ok(SimItemValidity::Now) | Ok(SimItemValidity::Future) => { - if outbound.send(bundle).is_err() { + let _ = outbound.send(bundle).inspect_err(|_| { span_debug!(span, "Outbound channel closed"); - } + }); } Ok(SimItemValidity::Never) => { span_debug!(span, "Dropping bundle: host txs will never be valid");