Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 51 additions & 17 deletions crates/cold/src/task/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,13 @@
//! them to the storage backend. Reads and writes use separate channels:
//!
//! - **Reads**: Processed concurrently (up to 64 in flight) via spawned tasks.
//! In-flight reads are drained before each write.
//! Each reader holds a permit on `read_semaphore` for the lifetime of the
//! handler; the semaphore is the backpressure mechanism. In-flight reads
//! are drained before each write.
//! - **Writes**: Processed sequentially (inline await) to maintain ordering.
//! Draining is implemented by acquiring all [`MAX_CONCURRENT_READERS`]
//! permits on `read_semaphore`, which unblocks only once every in-flight
//! reader has finished and released its permit.
//! - **Streams**: Log-streaming producers run independently, tracked for
//! graceful shutdown but not drained before writes. Backends must provide
//! their own read isolation (e.g. snapshot semantics).
Expand Down Expand Up @@ -264,8 +269,18 @@ pub struct ColdStorageTask<B: ColdStorage> {
read_receiver: mpsc::Receiver<ColdReadRequest>,
write_receiver: mpsc::Receiver<ColdWriteRequest>,
cancel_token: CancellationToken,
/// Task tracker for concurrent read handlers only.
/// Tracks spawned read handlers so the graceful-shutdown path can wait
/// for them to finish. Not used for backpressure — see `read_semaphore`.
task_tracker: TaskTracker,
/// Bounds in-flight read handlers and serves as the drain barrier for
/// writes.
///
/// A reader acquires one permit before being spawned; the permit is
/// released when the spawned task completes (or panics). Writes acquire
/// all [`MAX_CONCURRENT_READERS`] permits at once, which blocks until
/// every in-flight reader has finished, giving the write exclusive
/// backend access for the drain-before-write invariant.
read_semaphore: Arc<Semaphore>,
}

impl<B: ColdStorage> std::fmt::Debug for ColdStorageTask<B> {
Expand Down Expand Up @@ -293,6 +308,7 @@ impl<B: ColdStorage> ColdStorageTask<B> {
write_receiver,
cancel_token,
task_tracker: TaskTracker::new(),
read_semaphore: Arc::new(Semaphore::new(MAX_CONCURRENT_READERS)),
};
let handle = ColdStorageHandle::new(read_sender, write_sender);
(task, handle)
Expand Down Expand Up @@ -360,14 +376,25 @@ impl<B: ColdStorage> ColdStorageTask<B> {
debug!("Cold storage write channel closed");
break;
};
// Drain in-flight read tasks before executing the write.
// Stream producers are NOT drained here — they rely on
// backend-level read isolation (snapshot semantics).
self.task_tracker.close();
self.task_tracker.wait().await;
self.task_tracker.reopen();
// Drain in-flight read tasks before executing the write by
// acquiring every read permit. This blocks until all
// in-flight readers have released their permits, giving the
// write exclusive backend access. Stream producers are NOT
// drained here — they rely on backend-level read isolation
// (snapshot semantics).
//
// `acquire_many_owned` only errors if the semaphore is
// closed; the semaphore lives for the lifetime of the run
// loop, so this is infallible here.
let _drain = self
.read_semaphore
.clone()
.acquire_many_owned(MAX_CONCURRENT_READERS as u32)
.await
.expect("read semaphore outlives the run loop");

self.handle_write(req).await;
// `_drain` drops here, restoring all permits.
}

maybe_read = self.read_receiver.recv() => {
Expand All @@ -376,19 +403,26 @@ impl<B: ColdStorage> ColdStorageTask<B> {
break;
};

// Apply backpressure: wait if we've hit the concurrent reader limit
while self.task_tracker.len() >= MAX_CONCURRENT_READERS {
tokio::select! {
_ = self.cancel_token.cancelled() => {
debug!("Cancellation while waiting for read task slot");
break;
}
_ = self.task_tracker.wait() => {}
// Apply backpressure: acquire a permit before spawning.
// When the semaphore is saturated (64 in-flight readers, or
// a write holding all permits to drain), this waits until
// a permit becomes available. Cancel-safe: a cancellation
// signal exits the run loop without spawning.
let permit = tokio::select! {
_ = self.cancel_token.cancelled() => {
debug!("Cancellation while waiting for read permit");
break;
}
}
permit = self.read_semaphore.clone().acquire_owned() => {
permit.expect("read semaphore outlives the run loop")
}
};

let inner = Arc::clone(&self.inner);
self.task_tracker.spawn(async move {
// Hold the permit for the lifetime of the handler —
// it is released on completion or panic.
let _permit = permit;
inner.handle_read(req).await;
});
}
Expand Down
104 changes: 104 additions & 0 deletions crates/cold/tests/concurrency.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
//! Concurrency regression tests for the cold storage task.
//!
//! These tests exercise the read/write concurrency machinery in
//! [`signet_cold::ColdStorageTask`] directly, independent of any particular
//! backend. They use the in-memory backend as a fast, deterministic fixture.

use alloy::consensus::{Header, Sealable};
use signet_cold::{BlockData, ColdStorageTask, mem::MemColdBackend};
use std::time::Duration;
use tokio::time::timeout;
use tokio_util::sync::CancellationToken;

/// Upper bound for the whole test. Far larger than any correct execution;
/// small enough that a deadlock regression trips the timeout quickly.
const DEADLOCK_GUARD: Duration = Duration::from_secs(15);

fn block(n: u64) -> BlockData {
let header = Header { number: n, ..Default::default() };
BlockData::new(header.seal_slow(), vec![], vec![], vec![], None)
}

/// Regression test for the read-arm backpressure deadlock.
///
/// Prior to the semaphore-based backpressure fix, the run loop used
/// `TaskTracker::wait()` to throttle readers once `MAX_CONCURRENT_READERS`
/// tasks were in flight. `TaskTracker::wait()` only resolves when the
/// tracker is both closed and empty, and the read arm never closes the
/// tracker — so the whole run loop wedged the moment the 65th concurrent
/// read arrived. All subsequent reads piled up in the mpsc channel and
/// writes could not be dispatched either.
///
/// This test issues 256 concurrent reads (4× the in-flight cap) and asserts
/// that every one of them completes. Without the fix the task deadlocks and
/// this test hits [`DEADLOCK_GUARD`].
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn reads_above_concurrency_cap_do_not_deadlock() {
let backend = MemColdBackend::new();
let cancel = CancellationToken::new();
let handle = ColdStorageTask::spawn(backend, cancel.clone());

// Seed a handful of blocks so the reads have something to find.
handle.append_blocks((1..=8).map(block).collect()).await.unwrap();

let result = timeout(DEADLOCK_GUARD, async {
let reader = handle.reader();
let mut set = tokio::task::JoinSet::new();
// 4× the 64-reader concurrency cap — enough to reach the previously
// deadlocking path many times over.
for i in 0..256u64 {
let reader = reader.clone();
set.spawn(async move { reader.get_header_by_number(1 + (i % 8)).await });
}
let mut completed = 0usize;
while let Some(res) = set.join_next().await {
res.expect("read task panicked").expect("read returned an error");
completed += 1;
}
completed
})
.await
.expect("cold storage task deadlocked under concurrent reads");

assert_eq!(result, 256);

cancel.cancel();
}

/// A write arriving while reads are saturated must still be processed.
///
/// With the old backpressure scheme, once 64 reads were in flight the read
/// arm would wedge on `TaskTracker::wait()`, so the outer `select!` never
/// re-polled and the write arm was starved. This test interleaves a write
/// into a flood of reads and requires it to complete promptly.
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn write_after_saturating_reads_makes_progress() {
let backend = MemColdBackend::new();
let cancel = CancellationToken::new();
let handle = ColdStorageTask::spawn(backend, cancel.clone());

handle.append_blocks((1..=8).map(block).collect()).await.unwrap();

let result = timeout(DEADLOCK_GUARD, async {
let reader = handle.reader();
let mut set = tokio::task::JoinSet::new();
for i in 0..256u64 {
let reader = reader.clone();
set.spawn(async move { reader.get_header_by_number(1 + (i % 8)).await });
}

// Issue a write while the readers are still queuing. With the fix
// it completes as soon as in-flight readers drain; without it, it
// never dispatches because the run loop is wedged.
handle.append_blocks(vec![block(9)]).await.unwrap();

while let Some(res) = set.join_next().await {
res.expect("read task panicked").expect("read returned an error");
}
})
.await;

result.expect("write was starved by saturated readers");

cancel.cancel();
}
Loading