Skip to content

cold: unified handle architecture (supersedes #57)#58

Open
prestwich wants to merge 12 commits intomainfrom
prestwich/cold-unified-architecture
Open

cold: unified handle architecture (supersedes #57)#58
prestwich wants to merge 12 commits intomainfrom
prestwich/cold-unified-architecture

Conversation

@prestwich
Copy link
Copy Markdown
Member

Summary

Collapse cold storage from channels+dispatcher+writer-task into a single ColdStorage<B: ColdStorageBackend> handle over Arc<Inner<B>>. All concurrency primitives (read_sem(64), write_sem(1), stream_sem(8), TaskTracker, CancellationToken) live on Inner; all backend work spawns into the shared tracker; spawned tasks hold permits for the real duration of the backend call.

Supersedes #57. Tracks Linear ENG-2198.

Fixes three PR #57 review issues:

  1. Semaphore acquisition was scattered across three places — now all in the handle.
  2. StreamLogs setup held the outer read permit across unbounded awaits — streams now acquire only stream_sem; setup reads are isolated from the read pool by design (a stream requesting "latest" should observe latest at setup).
  3. Dispatcher-side timeout did not cancel backend work so permits released early — timeouts are now mandatory in the ColdStorageBackend trait and enforced at the backend layer, so permits honestly reflect in-flight work.

Shape:

  • ColdStorage<B> is clone-cheap (one Arc refcount bump); replaces ColdStorageHandle + ColdStorageReadHandle + ColdStorageTask.
  • ColdStorageWrite now takes &self across all backends (Mem / MDBX / SQL updated in lockstep).
  • Reads acquire a read permit in the handle and spawn; cache-hit fast path returns in the caller's task without permit/spawn.
  • Writes acquire write_sem then the drain barrier (read_sem.acquire_many_owned(64)); destructive writes invalidate the cache inside the spawned body while holding the drain.
  • Streams acquire only stream_sem; producer runs in the shared tracker.
  • Hidden shutdown coordinator task watches the cancel token and closes all three semaphores + the tracker; handle calls then fail fast with TaskTerminated.

Backend timeouts (mandatory in trait contract):

  • MDBX: tokio::task::spawn_blocking for reads, tokio::task::block_in_place for writes, in-body deadline check between per-block iterations. Post-commit WARN on overrun (advisory — MDBX commits are uninterruptible).
  • SQL: SET LOCAL statement_timeout = <ms> at the start of every Postgres transaction; SQLite skips (no equivalent).
  • Both backends expose with_read_timeout / with_write_timeout builders on backend and connector. Defaults 500ms / 2s.

Observability:

  • New crates/cold/src/metrics.rs following the ajj pattern.
  • Gauges: cold.reads_in_flight, cold.writes_in_flight, cold.streams_active.
  • Histograms: cold.op_duration_us, cold.permit_wait_us, cold.stream_lifetime_ms.
  • Counter: cold.op_errors_total.
  • #[tracing::instrument] on every handle method with stable op field; spans propagate into spawned tasks via .in_current_span().
  • Alerting is external (Prometheus on gauges); no in-process watchdog.

Error variants removed: Timeout, Backpressure, Cancelled. Backend timeouts now map to Backend. TaskTerminated remains and is returned from every handle method when the semaphore is closed.

Rollout: 10 commits, one per phase, each building + testing clean in isolation:

  1. `b539c8f` refactor(cold): ColdStorageWrite takes &self; all backends updated in lockstep
  2. `31e1c13` refactor(cold): unify handle around Arc; remove channels and dispatcher
  3. `b0d3063` fix(cold): stream permit acquired in handle; streams do not hold a read permit
  4. `9881f03` feat(cold): drain barrier moves to handle write path
  5. `9426d88` feat(cold): shutdown coordinator closes semaphores on cancel
  6. `902aab6` refactor(cold-mdbx): spawn_blocking reads, block_in_place writes, in-body iterator deadline
  7. `727c054` feat(cold-sql): mandatory statement_timeout; read_timeout and write_timeout builders
  8. `978e2af` feat(cold): metrics and tracing spans across all operations
  9. `3339c87` docs(cold): trait impl guide documents mandatory timeouts
  10. `cb06741` test(cold): concurrency suite covers new architecture

Spec: `docs/superpowers/specs/2026-04-20-cold-unified-architecture-design.md` (local — `docs/` is gitignored).

Test plan

  • `cargo +nightly fmt -- --check`
  • `cargo clippy --workspace --all-targets --all-features -- -D warnings`
  • `cargo clippy --workspace --all-targets --no-default-features -- -D warnings`
  • `RUSTDOCFLAGS="-D warnings" cargo doc --workspace --no-deps`
  • `cargo t --workspace` — all suites green, including:
    • `tests/handle_shape.rs` (clone-cheap handle)
    • `tests/stream_isolation.rs` (stream setup not blocked by saturated reads)
    • `tests/drain_barrier.rs` (write waits for reads to drain)
    • `tests/shutdown.rs` (acquire fails fast after cancel)
    • `tests/concurrency.rs` (7 scenarios: 256-way reads, fairness, cancel during backpressure, cancel during drain, stream permit cancel, cache-through-truncate)
    • `crates/cold-mdbx/tests/timeout.rs` (iterator deadline trips; point lookups bypass)
    • `crates/cold-sql/tests/statement_timeout.rs` (gated on Postgres; skip locally without DATABASE_URL)
  • Reviewer validation path: cut a patch release, bump `init4tech/node-components`, rebuild `signet-sidecar:latest`, redeploy to dev mainnet, confirm no backpressure-induced crashes and no permit-wait regression on the new metrics.

🤖 Generated with Claude Code

prestwich and others added 10 commits April 21, 2026 12:33
…body iterator deadline

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Adds a `metrics` module under `crates/cold/src/metrics.rs` with const
metric names, help strings, a `LazyLock` describe block, and
`pub(crate)` helper functions for recording:

- `cold.reads_in_flight`, `cold.writes_in_flight`, `cold.streams_active`
  (gauges)
- `cold.op_duration_us` (histogram, labeled by op)
- `cold.permit_wait_us` (histogram, labeled by sem: read/write/drain/stream)
- `cold.op_errors_total` (counter, labeled by op and error kind)
- `cold.stream_lifetime_ms` (histogram)

Wires the helpers into every `ColdStorage<B>` handle method:
`spawn_read` and `spawn_write` time permit acquisition, bump in-flight,
measure op duration, record errors, and dec in-flight after the backend
call. Cache hits in `get_header`/`get_transaction`/`get_receipt` record
op duration only (no permit wait, no in-flight). `stream_logs`
instruments stream permit wait and records stream lifetime + gauge in
the spawned producer.

Adds `ColdStorageError::kind()` for the error metric label.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@prestwich
Copy link
Copy Markdown
Member Author

[Claude Code]

Code review

Went deep on the concurrency model. One real regression worth addressing, plus a resource-lifetime nit.

Found 2 issues:

  1. Fast-path cache reads bypass the drain barrier. get_header / get_transaction / get_receipt check inner.cache.lock() before spawn_read, with no read permit. Writers hold write_sem + all 64 read permits and invalidate the cache inside the spawned body — but that spawned body first runs backend.truncate_above(block).await and only then cache.invalidate_above(block). A concurrent fast-path reader can land between those two points and return a cached entry for a block the backend has already dropped. The handle's own invariant comment — "no read is in flight while a write commits" — is only true for reads that go through the backend; the cache tier is not protected. In the old architecture (task/runner.rs) the drain was task_tracker.close(); task_tracker.wait() before handle_write ran, and cache lookups happened inside the tracked read task, so this window didn't exist. Easiest fix: invalidate the cache before the backend call, or route cache lookups through spawn_read (acquiring a read permit) so the drain actually covers them. The test cache_consistent_through_truncate at concurrency.rs#L261 is sequential and doesn't exercise this race.

#[tracing::instrument(skip(self, spec), fields(op = "get_header"))]
pub async fn get_header(&self, spec: HeaderSpecifier) -> ColdResult<Option<SealedHeader>> {
let op_start = Instant::now();
if let HeaderSpecifier::Number(n) = &spec
&& let Some(hit) = self.inner.cache.lock().await.get_header(n)
{
metrics::record_op_duration("get_header", op_start.elapsed());
return Ok(Some(hit));
}
self.spawn_read("get_header", move |inner| async move {
let result = inner.backend.get_header(spec).await;
if let Ok(Some(ref h)) = result {
inner.cache.lock().await.put_header(h.number, h.clone());
}
result
})
.await
}

#[tracing::instrument(skip(self), fields(op = "truncate_above"))]
pub async fn truncate_above(&self, block: BlockNumber) -> ColdResult<()> {
self.spawn_write("truncate_above", move |inner| async move {
let result = inner.backend.truncate_above(block).await;
if result.is_ok() {
inner.cache.lock().await.invalidate_above(block);
}
result
})
.await
}
/// Read and remove all blocks above the given block number.
///
/// Returns receipts for each block above `block` in ascending order,
/// then truncates. Index 0 = block+1, index 1 = block+2, etc.
#[tracing::instrument(skip(self), fields(op = "drain_above"))]
pub async fn drain_above(&self, block: BlockNumber) -> ColdResult<Vec<Vec<ColdReceipt>>> {
self.spawn_write("drain_above", move |inner| async move {
let result = inner.backend.drain_above(block).await;
if result.is_ok() {
inner.cache.lock().await.invalidate_above(block);
}
result
})
.await
}

  1. Shutdown coordinator keeps Inner alive indefinitely if cancel never fires. ColdStorage::new spawns an untracked tokio::spawn that moves Arc<Inner<B>> and awaits cancel.cancelled(). If a caller drops all ColdStorage clones without firing the token, the backend (MDBX env, Postgres pool, file handles) is pinned until process exit. This matters in tests and any transient-handle usage. A Weak::upgrade inside the coordinator would let Inner drop naturally and the task fall through.

let inner_s = Arc::clone(&inner);
// Shutdown coordinator: must NOT be tracked by `tracker`, otherwise
// `tracker.wait()` would deadlock waiting on this task.
tokio::spawn(async move {
inner_s.cancel.cancelled().await;
inner_s.read_sem.close();
inner_s.write_sem.close();
inner_s.stream_sem.close();
inner_s.tracker.close();
});
Self { inner }
}

🤖 Generated with Claude Code

- If this code review was useful, please react with 👍. Otherwise, react with 👎.

prestwich and others added 2 commits April 22, 2026 09:07
The coordinator task previously moved Arc<Inner<B>> into its body and
awaited the user's cancel token. If callers dropped all ColdStorage
clones without firing cancel, Inner (and the backend's file/DB handles)
stayed pinned until process exit.

Switch the coordinator to Weak<Inner>, and put a DropGuard on Inner that
fires a child cancel token. shutdown now fires on either user-side
cancel OR Inner drop; in the drop case upgrade() returns None and the
coordinator exits without pinning anything.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
ColdStorageError::backend unconditionally wraps as Backend(Box<_>),
which hid MdbxColdError::TooManyLogs behind the generic backend variant
and broke the conformance suite's max_logs assertion. The
From<MdbxColdError> for ColdStorageError impl already translates
TooManyLogs correctly and wraps the rest. Route all spawn_blocking
result conversions through ::from so the translation runs.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant