diff --git a/crates/cold/src/task/runner.rs b/crates/cold/src/task/runner.rs index c247d90..da213be 100644 --- a/crates/cold/src/task/runner.rs +++ b/crates/cold/src/task/runner.rs @@ -4,8 +4,13 @@ //! them to the storage backend. Reads and writes use separate channels: //! //! - **Reads**: Processed concurrently (up to 64 in flight) via spawned tasks. -//! In-flight reads are drained before each write. +//! Each reader holds a permit on `read_semaphore` for the lifetime of the +//! handler; the semaphore is the backpressure mechanism. In-flight reads +//! are drained before each write. //! - **Writes**: Processed sequentially (inline await) to maintain ordering. +//! Draining is implemented by acquiring all [`MAX_CONCURRENT_READERS`] +//! permits on `read_semaphore`, which unblocks only once every in-flight +//! reader has finished and released its permit. //! - **Streams**: Log-streaming producers run independently, tracked for //! graceful shutdown but not drained before writes. Backends must provide //! their own read isolation (e.g. snapshot semantics). @@ -264,8 +269,18 @@ pub struct ColdStorageTask { read_receiver: mpsc::Receiver, write_receiver: mpsc::Receiver, cancel_token: CancellationToken, - /// Task tracker for concurrent read handlers only. + /// Tracks spawned read handlers so the graceful-shutdown path can wait + /// for them to finish. Not used for backpressure — see `read_semaphore`. task_tracker: TaskTracker, + /// Bounds in-flight read handlers and serves as the drain barrier for + /// writes. + /// + /// A reader acquires one permit before being spawned; the permit is + /// released when the spawned task completes (or panics). Writes acquire + /// all [`MAX_CONCURRENT_READERS`] permits at once, which blocks until + /// every in-flight reader has finished, giving the write exclusive + /// backend access for the drain-before-write invariant. + read_semaphore: Arc, } impl std::fmt::Debug for ColdStorageTask { @@ -293,6 +308,7 @@ impl ColdStorageTask { write_receiver, cancel_token, task_tracker: TaskTracker::new(), + read_semaphore: Arc::new(Semaphore::new(MAX_CONCURRENT_READERS)), }; let handle = ColdStorageHandle::new(read_sender, write_sender); (task, handle) @@ -360,14 +376,25 @@ impl ColdStorageTask { debug!("Cold storage write channel closed"); break; }; - // Drain in-flight read tasks before executing the write. - // Stream producers are NOT drained here — they rely on - // backend-level read isolation (snapshot semantics). - self.task_tracker.close(); - self.task_tracker.wait().await; - self.task_tracker.reopen(); + // Drain in-flight read tasks before executing the write by + // acquiring every read permit. This blocks until all + // in-flight readers have released their permits, giving the + // write exclusive backend access. Stream producers are NOT + // drained here — they rely on backend-level read isolation + // (snapshot semantics). + // + // `acquire_many_owned` only errors if the semaphore is + // closed; the semaphore lives for the lifetime of the run + // loop, so this is infallible here. + let _drain = self + .read_semaphore + .clone() + .acquire_many_owned(MAX_CONCURRENT_READERS as u32) + .await + .expect("read semaphore outlives the run loop"); self.handle_write(req).await; + // `_drain` drops here, restoring all permits. } maybe_read = self.read_receiver.recv() => { @@ -376,19 +403,26 @@ impl ColdStorageTask { break; }; - // Apply backpressure: wait if we've hit the concurrent reader limit - while self.task_tracker.len() >= MAX_CONCURRENT_READERS { - tokio::select! { - _ = self.cancel_token.cancelled() => { - debug!("Cancellation while waiting for read task slot"); - break; - } - _ = self.task_tracker.wait() => {} + // Apply backpressure: acquire a permit before spawning. + // When the semaphore is saturated (64 in-flight readers, or + // a write holding all permits to drain), this waits until + // a permit becomes available. Cancel-safe: a cancellation + // signal exits the run loop without spawning. + let permit = tokio::select! { + _ = self.cancel_token.cancelled() => { + debug!("Cancellation while waiting for read permit"); + break; } - } + permit = self.read_semaphore.clone().acquire_owned() => { + permit.expect("read semaphore outlives the run loop") + } + }; let inner = Arc::clone(&self.inner); self.task_tracker.spawn(async move { + // Hold the permit for the lifetime of the handler — + // it is released on completion or panic. + let _permit = permit; inner.handle_read(req).await; }); } diff --git a/crates/cold/tests/concurrency.rs b/crates/cold/tests/concurrency.rs new file mode 100644 index 0000000..9e5d950 --- /dev/null +++ b/crates/cold/tests/concurrency.rs @@ -0,0 +1,104 @@ +//! Concurrency regression tests for the cold storage task. +//! +//! These tests exercise the read/write concurrency machinery in +//! [`signet_cold::ColdStorageTask`] directly, independent of any particular +//! backend. They use the in-memory backend as a fast, deterministic fixture. + +use alloy::consensus::{Header, Sealable}; +use signet_cold::{BlockData, ColdStorageTask, mem::MemColdBackend}; +use std::time::Duration; +use tokio::time::timeout; +use tokio_util::sync::CancellationToken; + +/// Upper bound for the whole test. Far larger than any correct execution; +/// small enough that a deadlock regression trips the timeout quickly. +const DEADLOCK_GUARD: Duration = Duration::from_secs(15); + +fn block(n: u64) -> BlockData { + let header = Header { number: n, ..Default::default() }; + BlockData::new(header.seal_slow(), vec![], vec![], vec![], None) +} + +/// Regression test for the read-arm backpressure deadlock. +/// +/// Prior to the semaphore-based backpressure fix, the run loop used +/// `TaskTracker::wait()` to throttle readers once `MAX_CONCURRENT_READERS` +/// tasks were in flight. `TaskTracker::wait()` only resolves when the +/// tracker is both closed and empty, and the read arm never closes the +/// tracker — so the whole run loop wedged the moment the 65th concurrent +/// read arrived. All subsequent reads piled up in the mpsc channel and +/// writes could not be dispatched either. +/// +/// This test issues 256 concurrent reads (4× the in-flight cap) and asserts +/// that every one of them completes. Without the fix the task deadlocks and +/// this test hits [`DEADLOCK_GUARD`]. +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn reads_above_concurrency_cap_do_not_deadlock() { + let backend = MemColdBackend::new(); + let cancel = CancellationToken::new(); + let handle = ColdStorageTask::spawn(backend, cancel.clone()); + + // Seed a handful of blocks so the reads have something to find. + handle.append_blocks((1..=8).map(block).collect()).await.unwrap(); + + let result = timeout(DEADLOCK_GUARD, async { + let reader = handle.reader(); + let mut set = tokio::task::JoinSet::new(); + // 4× the 64-reader concurrency cap — enough to reach the previously + // deadlocking path many times over. + for i in 0..256u64 { + let reader = reader.clone(); + set.spawn(async move { reader.get_header_by_number(1 + (i % 8)).await }); + } + let mut completed = 0usize; + while let Some(res) = set.join_next().await { + res.expect("read task panicked").expect("read returned an error"); + completed += 1; + } + completed + }) + .await + .expect("cold storage task deadlocked under concurrent reads"); + + assert_eq!(result, 256); + + cancel.cancel(); +} + +/// A write arriving while reads are saturated must still be processed. +/// +/// With the old backpressure scheme, once 64 reads were in flight the read +/// arm would wedge on `TaskTracker::wait()`, so the outer `select!` never +/// re-polled and the write arm was starved. This test interleaves a write +/// into a flood of reads and requires it to complete promptly. +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn write_after_saturating_reads_makes_progress() { + let backend = MemColdBackend::new(); + let cancel = CancellationToken::new(); + let handle = ColdStorageTask::spawn(backend, cancel.clone()); + + handle.append_blocks((1..=8).map(block).collect()).await.unwrap(); + + let result = timeout(DEADLOCK_GUARD, async { + let reader = handle.reader(); + let mut set = tokio::task::JoinSet::new(); + for i in 0..256u64 { + let reader = reader.clone(); + set.spawn(async move { reader.get_header_by_number(1 + (i % 8)).await }); + } + + // Issue a write while the readers are still queuing. With the fix + // it completes as soon as in-flight readers drain; without it, it + // never dispatches because the run loop is wedged. + handle.append_blocks(vec![block(9)]).await.unwrap(); + + while let Some(res) = set.join_next().await { + res.expect("read task panicked").expect("read returned an error"); + } + }) + .await; + + result.expect("write was starved by saturated readers"); + + cancel.cancel(); +}