diff --git a/Cargo.lock b/Cargo.lock index e527210a9..865157fd4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -197,6 +197,15 @@ version = "1.0.102" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f202df86484c868dbad7eaa557ef785d5c66295e41b460ef922eca0723b842c" +[[package]] +name = "arc-swap" +version = "1.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a3a1fd6f75306b68087b831f025c712524bcb19aad54e557b1129cfa0a2b207" +dependencies = [ + "rustversion", +] + [[package]] name = "arrayref" version = "0.3.9" @@ -2269,7 +2278,7 @@ dependencies = [ "libc", "percent-encoding", "pin-project-lite", - "socket2 0.6.3", + "socket2 0.5.10", "system-configuration", "tokio", "tower-service", @@ -2844,9 +2853,8 @@ checksum = "f8ca58f447f06ed17d5fc4043ce1b10dd205e060fb3ce5b979b8ed8e59ff3f79" [[package]] name = "miden-agglayer" -version = "0.14.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4302fc29d77db3d2c6323d1b211e503aafb91db2d572ef30c68829347fe79352" +version = "0.15.0" +source = "git+https://github.com/0xmiden/protocol?branch=sergerad-clone#b82601603cc329f42d9da782e980e5afeaba9a48" dependencies = [ "alloy-sol-types", "fs-err", @@ -2920,9 +2928,8 @@ dependencies = [ [[package]] name = "miden-block-prover" -version = "0.14.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cde56bcea3cebe307786a856e204d84e7987c318e5a2909bcbb655d16286ce31" +version = "0.15.0" +source = "git+https://github.com/0xmiden/protocol?branch=sergerad-clone#b82601603cc329f42d9da782e980e5afeaba9a48" dependencies = [ "miden-protocol", "thiserror 2.0.18", @@ -2970,8 +2977,7 @@ dependencies = [ [[package]] name = "miden-crypto" version = "0.23.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0ed0a034a460e27723dcfdf25effffab84331c3b46b13e7a1bd674197cc71bfe" +source = "git+https://github.com/0xmiden/crypto?branch=sergerad-largesmt-reader-trait#4441d9c2ab43ae3e9c8dcf1544d3ad4bb2001407" dependencies = [ "blake3", "cc", @@ -3013,8 +3019,7 @@ dependencies = [ [[package]] name = "miden-crypto-derive" version = "0.23.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e8bf6ebde028e79bcc61a3632d2f375a5cc64caa17d014459f75015238cb1e08" +source = "git+https://github.com/0xmiden/crypto?branch=sergerad-largesmt-reader-trait#4441d9c2ab43ae3e9c8dcf1544d3ad4bb2001407" dependencies = [ "quote", "syn 2.0.117", @@ -3041,8 +3046,7 @@ dependencies = [ [[package]] name = "miden-field" version = "0.23.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "38011348f4fb4c9e5ce1f471203d024721c00e3b60a91aa91aaefe6738d8b5ea" +source = "git+https://github.com/0xmiden/crypto?branch=sergerad-largesmt-reader-trait#4441d9c2ab43ae3e9c8dcf1544d3ad4bb2001407" dependencies = [ "miden-serde-utils", "num-bigint", @@ -3360,6 +3364,7 @@ name = "miden-node-store" version = "0.15.0" dependencies = [ "anyhow", + "arc-swap", "assert_matches", "build-rs", "criterion", @@ -3541,9 +3546,8 @@ dependencies = [ [[package]] name = "miden-protocol" -version = "0.14.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e860cc978d3467297de076e9bd22f0573b82ef73a3d223d6bb957731a45b8164" +version = "0.15.0" +source = "git+https://github.com/0xmiden/protocol?branch=sergerad-clone#b82601603cc329f42d9da782e980e5afeaba9a48" dependencies = [ "bech32", "fs-err", @@ -3571,9 +3575,8 @@ dependencies = [ [[package]] name = "miden-protocol-macros" -version = "0.14.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4daec4a5a6f050a670a8639e78e017ab11ef0bf2e253b012505f25e6247c13e7" +version = "0.15.0" +source = "git+https://github.com/0xmiden/protocol?branch=sergerad-clone#b82601603cc329f42d9da782e980e5afeaba9a48" dependencies = [ "proc-macro2", "quote", @@ -3659,8 +3662,7 @@ dependencies = [ [[package]] name = "miden-serde-utils" version = "0.23.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ff78082e9b4ca89863e68da01b35f8a4029ee6fd912e39fa41fde4273a7debab" +source = "git+https://github.com/0xmiden/crypto?branch=sergerad-largesmt-reader-trait#4441d9c2ab43ae3e9c8dcf1544d3ad4bb2001407" dependencies = [ "p3-field", "p3-goldilocks", @@ -3668,9 +3670,8 @@ dependencies = [ [[package]] name = "miden-standards" -version = "0.14.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f455a087f41c30636b45ead961d1e66114d2d20661887b307cede05307eeb942" +version = "0.15.0" +source = "git+https://github.com/0xmiden/protocol?branch=sergerad-clone#b82601603cc329f42d9da782e980e5afeaba9a48" dependencies = [ "fs-err", "miden-assembly", @@ -3686,9 +3687,8 @@ dependencies = [ [[package]] name = "miden-testing" -version = "0.14.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a84430e84c6dee90d9bd92568be1c3082113f0b4b36f9db7933380f0295207f9" +version = "0.15.0" +source = "git+https://github.com/0xmiden/protocol?branch=sergerad-clone#b82601603cc329f42d9da782e980e5afeaba9a48" dependencies = [ "anyhow", "itertools 0.14.0", @@ -3709,9 +3709,8 @@ dependencies = [ [[package]] name = "miden-tx" -version = "0.14.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6d788795041ce5e6f947a3256314373171e4877c11b86fafeabcec4d8b8628d9" +version = "0.15.0" +source = "git+https://github.com/0xmiden/protocol?branch=sergerad-clone#b82601603cc329f42d9da782e980e5afeaba9a48" dependencies = [ "miden-processor", "miden-protocol", @@ -3723,9 +3722,8 @@ dependencies = [ [[package]] name = "miden-tx-batch-prover" -version = "0.14.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ce059e2d599266b00708f6f1bff6af5cf82683e76df3ec812c2d1c72e880f943" +version = "0.15.0" +source = "git+https://github.com/0xmiden/protocol?branch=sergerad-clone#b82601603cc329f42d9da782e980e5afeaba9a48" dependencies = [ "miden-protocol", "miden-tx", @@ -4804,7 +4802,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "343d3bd7056eda839b03204e68deff7d1b13aba7af2b2fd16890697274262ee7" dependencies = [ "heck", - "itertools 0.14.0", + "itertools 0.13.0", "log", "multimap", "petgraph 0.8.3", @@ -4825,7 +4823,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "27c6023962132f4b30eb4c172c91ce92d933da334c59c23cddee82358ddafb0b" dependencies = [ "anyhow", - "itertools 0.14.0", + "itertools 0.13.0", "proc-macro2", "quote", "syn 2.0.117", @@ -4947,7 +4945,7 @@ dependencies = [ "quinn-udp", "rustc-hash", "rustls 0.23.37", - "socket2 0.6.3", + "socket2 0.5.10", "thiserror 2.0.18", "tokio", "tracing", @@ -4985,7 +4983,7 @@ dependencies = [ "cfg_aliases", "libc", "once_cell", - "socket2 0.6.3", + "socket2 0.5.10", "tracing", "windows-sys 0.60.2", ] diff --git a/Cargo.toml b/Cargo.toml index c13e492a8..c8647e299 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -61,13 +61,13 @@ miden-remote-prover-client = { path = "crates/remote-prover-client", versio miden-node-rocksdb-cxx-linkage-fix = { path = "crates/rocksdb-cxx-linkage-fix", version = "0.15" } # miden-protocol dependencies. These should be updated in sync. -miden-agglayer = { version = "0.14" } -miden-block-prover = { version = "0.14" } -miden-protocol = { default-features = false, version = "0.14" } -miden-standards = { version = "0.14" } -miden-testing = { version = "0.14" } -miden-tx = { default-features = false, version = "0.14" } -miden-tx-batch-prover = { version = "0.14" } +miden-agglayer = { version = "0.15" } +miden-block-prover = { version = "0.15" } +miden-protocol = { default-features = false, version = "0.15" } +miden-standards = { version = "0.15" } +miden-testing = { version = "0.15" } +miden-tx = { default-features = false, version = "0.15" } +miden-tx-batch-prover = { version = "0.15" } # Other miden dependencies. These should align with those expected by miden-protocol. miden-crypto = { version = "0.23" } @@ -143,3 +143,16 @@ should_panic_without_expect = "allow" # We don't care about the specific panic # Configure `cargo-typos` [workspace.metadata.typos] files.extend-exclude = ["*.svg"] # Ignore SVG files. + +[patch.crates-io] +miden-crypto = { branch = "sergerad-largesmt-reader-trait", git = "https://github.com/0xmiden/crypto" } +miden-serde-utils = { branch = "sergerad-largesmt-reader-trait", git = "https://github.com/0xmiden/crypto" } + +miden-agglayer = { branch = "sergerad-clone", git = "https://github.com/0xmiden/protocol" } +miden-block-prover = { branch = "sergerad-clone", git = "https://github.com/0xmiden/protocol" } +miden-protocol = { branch = "sergerad-clone", git = "https://github.com/0xmiden/protocol" } +miden-protocol-macros = { branch = "sergerad-clone", git = "https://github.com/0xmiden/protocol" } +miden-standards = { branch = "sergerad-clone", git = "https://github.com/0xmiden/protocol" } +miden-testing = { branch = "sergerad-clone", git = "https://github.com/0xmiden/protocol" } +miden-tx = { branch = "sergerad-clone", git = "https://github.com/0xmiden/protocol" } +miden-tx-batch-prover = { branch = "sergerad-clone", git = "https://github.com/0xmiden/protocol" } diff --git a/bin/stress-test/src/seeding/mod.rs b/bin/stress-test/src/seeding/mod.rs index 79b461848..2a0a5a6ea 100644 --- a/bin/stress-test/src/seeding/mod.rs +++ b/bin/stress-test/src/seeding/mod.rs @@ -370,7 +370,7 @@ fn create_batch(txs: &[ProvenTransaction], block_ref: &BlockHeader) -> ProvenBat .collect(); let input_notes = txs.iter().flat_map(|tx| tx.input_notes().iter().cloned()).collect(); let output_notes = txs.iter().flat_map(|tx| tx.output_notes().iter().cloned()).collect(); - ProvenBatch::new( + ProvenBatch::new_unchecked( BatchId::from_transactions(txs.iter()), block_ref.commitment(), block_ref.block_num(), diff --git a/crates/block-producer/src/test_utils/batch.rs b/crates/block-producer/src/test_utils/batch.rs index ca705e241..df97d3ea4 100644 --- a/crates/block-producer/src/test_utils/batch.rs +++ b/crates/block-producer/src/test_utils/batch.rs @@ -59,7 +59,7 @@ impl TransactionBatchConstructor for ProvenBatch { output_notes.extend(tx.output_notes().iter().cloned()); } - ProvenBatch::new( + ProvenBatch::new_unchecked( BatchId::from_transactions(txs.iter().copied()), Word::empty(), BlockNumber::GENESIS, diff --git a/crates/large-smt-backend-rocksdb/src/helpers.rs b/crates/large-smt-backend-rocksdb/src/helpers.rs index 23f3c8d88..d7074639f 100644 --- a/crates/large-smt-backend-rocksdb/src/helpers.rs +++ b/crates/large-smt-backend-rocksdb/src/helpers.rs @@ -1,4 +1,9 @@ -use miden_crypto::merkle::smt::{MAX_LEAF_ENTRIES, SmtLeaf, SmtLeafError}; +use alloc::boxed::Box; +use alloc::vec::Vec; + +use miden_crypto::merkle::NodeIndex; +use miden_crypto::merkle::smt::{MAX_LEAF_ENTRIES, SmtLeaf, SmtLeafError, Subtree}; +use miden_crypto::utils::Deserializable; use miden_crypto::word::LexicographicWord; use rocksdb::Error as RocksDbError; @@ -81,3 +86,121 @@ pub(crate) fn remove_from_leaf(leaf: &mut SmtLeaf, key: Word) -> (Option, }, } } + +/// Deserializes a big-endian `usize` count from exactly 8 bytes. `what` is used in the error +/// message if the length is wrong. +pub(crate) fn read_count(what: &'static str, bytes: &[u8]) -> Result { + let arr: [u8; 8] = bytes.try_into().map_err(|_| StorageError::BadValueLen { + what, + expected: 8, + found: bytes.len(), + })?; + Ok(usize::from_be_bytes(arr)) +} + +/// Deserializes a single SMT leaf from raw bytes. +#[expect(clippy::needless_pass_by_value, reason = "simplifies chaining")] +pub(crate) fn read_leaf(leaf_bytes: Vec) -> Result, StorageError> { + let leaf = SmtLeaf::read_from_bytes_with_budget(&leaf_bytes, leaf_bytes.len())?; + Ok(Some(leaf)) +} + +/// Deserializes a batch of optional raw byte vectors into optional SMT leaves. +pub(crate) fn read_leaves( + leaves: Vec>>, +) -> Result>, StorageError> { + leaves + .into_iter() + .map(|leaf| match leaf { + Some(bytes) => Ok(Some(SmtLeaf::read_from_bytes_with_budget(&bytes, bytes.len())?)), + None => Ok(None), + }) + .collect() +} + +/// Deserializes a single subtree from an optional raw byte vector. +pub(crate) fn read_subtree( + index: NodeIndex, + db_result: Option>, +) -> Result, StorageError> { + match db_result { + Some(bytes) => { + let subtree = Subtree::from_vec(index, &bytes)?; + Ok(Some(subtree)) + }, + None => Ok(None), + } +} + +/// Deserializes a batch of raw `multi_get` results into subtrees, preserving the original +/// indices for reassembly. +pub(crate) fn read_subtree_batch( + bucket: Vec<(usize, NodeIndex)>, + db_results: Vec>, RocksDbError>>, +) -> Result)>, StorageError> { + bucket + .into_iter() + .zip(db_results) + .map(|((original_index, node_index), db_result)| { + let subtree = match db_result { + Ok(Some(bytes)) => Some(Subtree::from_vec(node_index, &bytes)?), + Ok(None) => None, + Err(e) => return Err(map_rocksdb_err(e)), + }; + Ok((original_index, subtree)) + }) + .collect() +} + +/// Buckets subtree node indices by depth for batched column family lookups. +/// +/// Returns an array of 5 buckets (for depths 56, 48, 40, 32, 24), where each bucket +/// contains `(original_index, NodeIndex)` pairs. +pub(crate) fn bucket_by_depth( + indices: &[NodeIndex], +) -> Result<[Vec<(usize, NodeIndex)>; 5], StorageError> { + let mut depth_buckets: [Vec<(usize, NodeIndex)>; 5] = Default::default(); + + for (original_index, &node_index) in indices.iter().enumerate() { + let depth = node_index.depth(); + let bucket_index = match depth { + 56 => 0, + 48 => 1, + 40 => 2, + 32 => 3, + 24 => 4, + _ => { + return Err(StorageError::Unsupported(format!( + "unsupported subtree depth {depth}" + ))); + }, + }; + depth_buckets[bucket_index].push((original_index, node_index)); + } + + Ok(depth_buckets) +} + +/// Deserializes depth-24 hash entries from a database iterator into `(index, Word)` pairs. +pub(crate) fn read_depth24_entries( + iter: impl Iterator, Box<[u8]>), RocksDbError>>, +) -> Result, StorageError> { + let mut hashes = Vec::new(); + for item in iter { + let (key_bytes, value_bytes) = item.map_err(map_rocksdb_err)?; + let index = index_from_key_bytes(&key_bytes)?; + let hash = Word::read_from_bytes(&value_bytes)?; + hashes.push((index, hash)); + } + Ok(hashes) +} + +/// Deserializes a `u64` index from an 8-byte big-endian key. +pub(crate) fn index_from_key_bytes(key_bytes: &[u8]) -> Result { + if key_bytes.len() != 8 { + return Err(StorageError::BadKeyLen { expected: 8, found: key_bytes.len() }); + } + let mut arr = [0u8; 8]; + arr.copy_from_slice(key_bytes); + Ok(u64::from_be_bytes(arr)) +} diff --git a/crates/large-smt-backend-rocksdb/src/lib.rs b/crates/large-smt-backend-rocksdb/src/lib.rs index 563439c9f..7a97183b7 100644 --- a/crates/large-smt-backend-rocksdb/src/lib.rs +++ b/crates/large-smt-backend-rocksdb/src/lib.rs @@ -26,7 +26,11 @@ extern crate alloc; mod helpers; #[expect(clippy::doc_markdown, clippy::inline_always)] mod rocksdb; +#[expect(clippy::doc_markdown, clippy::inline_always)] +mod rocksdb_snapshot; // Re-export from miden-protocol. +/// Re-export of `rocksdb::DB` for consumers that need the raw database handle type. +pub use ::rocksdb::DB; pub use miden_protocol::crypto::merkle::smt::{ InnerNode, LargeSmt, @@ -39,6 +43,7 @@ pub use miden_protocol::crypto::merkle::smt::{ SmtLeafError, SmtProof, SmtStorage, + SmtStorageReader, StorageError, StorageUpdateParts, StorageUpdates, @@ -57,3 +62,4 @@ pub use miden_protocol::{ }, }; pub use rocksdb::{RocksDbConfig, RocksDbStorage}; +pub use rocksdb_snapshot::RocksDbSnapshotStorage; diff --git a/crates/large-smt-backend-rocksdb/src/rocksdb.rs b/crates/large-smt-backend-rocksdb/src/rocksdb.rs index e269029ad..7f8935236 100644 --- a/crates/large-smt-backend-rocksdb/src/rocksdb.rs +++ b/crates/large-smt-backend-rocksdb/src/rocksdb.rs @@ -22,32 +22,51 @@ use rocksdb::{ WriteBatch, }; -use super::{SmtStorage, StorageError, StorageUpdateParts, StorageUpdates, SubtreeUpdate}; -use crate::helpers::{insert_into_leaf, map_rocksdb_err, remove_from_leaf}; -use crate::{EMPTY_WORD, Word}; +use super::{ + SmtStorage, + SmtStorageReader, + StorageError, + StorageUpdateParts, + StorageUpdates, + SubtreeUpdate, +}; +use crate::helpers::{ + bucket_by_depth, + index_from_key_bytes, + insert_into_leaf, + map_rocksdb_err, + read_count, + read_depth24_entries, + read_leaf, + read_leaves, + read_subtree, + read_subtree_batch, + remove_from_leaf, +}; +use crate::{EMPTY_WORD, RocksDbSnapshotStorage, Word}; -const IN_MEMORY_DEPTH: u8 = 24; +pub(crate) const IN_MEMORY_DEPTH: u8 = 24; /// The name of the `RocksDB` column family used for storing SMT leaves. -const LEAVES_CF: &str = "leaves"; +pub(crate) const LEAVES_CF: &str = "leaves"; /// The names of the `RocksDB` column families used for storing SMT subtrees (deep nodes). -const SUBTREE_24_CF: &str = "st24"; -const SUBTREE_32_CF: &str = "st32"; -const SUBTREE_40_CF: &str = "st40"; -const SUBTREE_48_CF: &str = "st48"; -const SUBTREE_56_CF: &str = "st56"; -const SUBTREE_DEPTHS: [u8; 5] = [56, 48, 40, 32, 24]; +pub(crate) const SUBTREE_24_CF: &str = "st24"; +pub(crate) const SUBTREE_32_CF: &str = "st32"; +pub(crate) const SUBTREE_40_CF: &str = "st40"; +pub(crate) const SUBTREE_48_CF: &str = "st48"; +pub(crate) const SUBTREE_56_CF: &str = "st56"; +pub(crate) const SUBTREE_DEPTHS: [u8; 5] = [56, 48, 40, 32, 24]; /// The name of the `RocksDB` column family used for storing metadata (e.g., root, counts). -const METADATA_CF: &str = "metadata"; +pub(crate) const METADATA_CF: &str = "metadata"; /// The name of the `RocksDB` column family used for storing level 24 hashes for fast tree /// rebuilding. -const DEPTH_24_CF: &str = "depth24"; +pub(crate) const DEPTH_24_CF: &str = "depth24"; /// The key used in the `METADATA_CF` column family to store the total count of non-empty leaves. -const LEAF_COUNT_KEY: &[u8] = b"leaf_count"; +pub(crate) const LEAF_COUNT_KEY: &[u8] = b"leaf_count"; /// The key used in the `METADATA_CF` column family to store the total count of key-value entries. -const ENTRY_COUNT_KEY: &[u8] = b"entry_count"; +pub(crate) const ENTRY_COUNT_KEY: &[u8] = b"entry_count"; /// A `RocksDB`-backed persistent storage implementation for a Sparse Merkle Tree (SMT). /// @@ -67,7 +86,7 @@ const ENTRY_COUNT_KEY: &[u8] = b"entry_count"; /// `NodeIndex`. /// - `METADATA_CF` ("metadata"): Stores overall SMT metadata such as the current root hash, total /// leaf count, and total entry count. -#[derive(Debug, Clone)] +#[derive(Debug)] pub struct RocksDbStorage { db: Arc, } @@ -223,14 +242,14 @@ impl RocksDbStorage { /// Converts an index (u64) into a fixed-size byte array for use as a `RocksDB` key. #[inline(always)] - fn index_db_key(index: u64) -> [u8; 8] { + pub(crate) fn index_db_key(index: u64) -> [u8; 8] { index.to_be_bytes() } /// Converts a `NodeIndex` (for a subtree root) into a `KeyBytes` for use as a `RocksDB` key. /// The `KeyBytes` is a wrapper around a 8-byte value with a variable-length prefix. #[inline(always)] - fn subtree_db_key(index: NodeIndex) -> KeyBytes { + pub(crate) fn subtree_db_key(index: NodeIndex) -> KeyBytes { let keep = match index.depth() { 24 => 3, 32 => 4, @@ -259,9 +278,20 @@ impl RocksDbStorage { let name = cf_for_depth(index.depth()); self.cf_handle(name).expect("CF handle missing") } + + /// Returns a reference to the inner `Arc`. + pub fn db(&self) -> &Arc { + &self.db + } + + /// Creates a new [`crate::rocksdb_snapshot::RocksDbSnapshotStorage`] from this storage's + /// database. + pub fn snapshot_storage(&self) -> crate::rocksdb_snapshot::RocksDbSnapshotStorage { + crate::rocksdb_snapshot::RocksDbSnapshotStorage::new(Arc::clone(&self.db)) + } } -impl SmtStorage for RocksDbStorage { +impl SmtStorageReader for RocksDbStorage { /// Retrieves the total count of non-empty leaves from the `METADATA_CF` column family. /// Returns 0 if the count is not found. /// @@ -274,15 +304,7 @@ impl SmtStorage for RocksDbStorage { self.db .get_cf(cf, LEAF_COUNT_KEY) .map_err(map_rocksdb_err)? - .map_or(Ok(0), |bytes| { - let arr: [u8; 8] = - bytes.as_slice().try_into().map_err(|_| StorageError::BadValueLen { - what: "leaf count", - expected: 8, - found: bytes.len(), - })?; - Ok(usize::from_be_bytes(arr)) - }) + .map_or(Ok(0), |b| read_count("leaf count", &b)) } /// Retrieves the total count of key-value entries from the `METADATA_CF` column family. @@ -297,17 +319,183 @@ impl SmtStorage for RocksDbStorage { self.db .get_cf(cf, ENTRY_COUNT_KEY) .map_err(map_rocksdb_err)? - .map_or(Ok(0), |bytes| { - let arr: [u8; 8] = - bytes.as_slice().try_into().map_err(|_| StorageError::BadValueLen { - what: "entry count", - expected: 8, - found: bytes.len(), - })?; - Ok(usize::from_be_bytes(arr)) - }) + .map_or(Ok(0), |b| read_count("entry count", &b)) + } + + /// Retrieves a single SMT leaf node by its logical `index` from the `LEAVES_CF` column family. + /// + /// # Errors + /// - `StorageError::Backend`: If the leaves column family is missing or a RocksDB error occurs. + /// - `StorageError::DeserializationError`: If the retrieved leaf data is corrupt. + fn get_leaf(&self, index: u64) -> Result, StorageError> { + let cf = self.cf_handle(LEAVES_CF)?; + let key = Self::index_db_key(index); + self.db.get_cf(cf, key).map_err(map_rocksdb_err)?.map_or(Ok(None), read_leaf) + } + + /// Retrieves multiple SMT leaf nodes by their logical `indices` using RocksDB's `multi_get_cf`. + /// + /// # Errors + /// - `StorageError::Backend`: If the leaves column family is missing or a RocksDB error occurs. + /// - `StorageError::DeserializationError`: If any retrieved leaf data is corrupt. + fn get_leaves(&self, indices: &[u64]) -> Result>, StorageError> { + let cf = self.cf_handle(LEAVES_CF)?; + let db_keys: Vec<[u8; 8]> = indices.iter().map(|&idx| Self::index_db_key(idx)).collect(); + let results = self.db.multi_get_cf(db_keys.iter().map(|k| (cf, k.as_ref()))); + + let leaves = results + .into_iter() + .collect::>>, rocksdb::Error>>() + .map_err(map_rocksdb_err)?; + read_leaves(leaves) } + /// Returns true if the storage has any leaves. + /// + /// # Errors + /// Returns `StorageError` if the storage read operation fails. + fn has_leaves(&self) -> Result { + Ok(self.leaf_count()? > 0) + } + + /// Batch-retrieves multiple subtrees from RocksDB by their node indices. + /// + /// This method groups requests by subtree depth into column family buckets, + /// then performs parallel `multi_get` operations to efficiently retrieve + /// all subtrees. Results are deserialized and placed in the same order as + /// the input indices. + /// + /// Note: Retrieval is performed in parallel. If multiple errors occur (e.g., + /// deserialization or backend errors), only the first one encountered is returned. + /// Other errors will be discarded. + /// + /// # Parameters + /// - `indices`: A slice of subtree root indices to retrieve. + /// + /// # Returns + /// - A `Vec>` where each index corresponds to the original input. + /// - `Ok(...)` if all fetches succeed. + /// - `Err(StorageError)` if any RocksDB access or deserialization fails. + fn get_subtree(&self, index: NodeIndex) -> Result, StorageError> { + let cf = self.subtree_cf(index); + let key = Self::subtree_db_key(index); + read_subtree(index, self.db.get_cf(cf, key).map_err(map_rocksdb_err)?) + } + + fn get_subtrees(&self, indices: &[NodeIndex]) -> Result>, StorageError> { + use rayon::prelude::*; + + let depth_buckets = bucket_by_depth(indices)?; + let mut results = vec![None; indices.len()]; + + let bucket_results: Result, StorageError> = depth_buckets + .into_par_iter() + .enumerate() + .filter(|(_, bucket)| !bucket.is_empty()) + .map( + |(bucket_index, bucket)| -> Result)>, StorageError> { + let depth = SUBTREE_DEPTHS[bucket_index]; + let cf = self.cf_handle(cf_for_depth(depth))?; + let keys: Vec<_> = + bucket.iter().map(|(_, idx)| Self::subtree_db_key(*idx)).collect(); + + let db_results: Vec<_> = self + .db + .multi_get_cf(keys.iter().map(|k| (cf, k.as_ref()))) + .into_iter() + .collect(); + + read_subtree_batch(bucket, db_results) + }, + ) + .collect(); + + for bucket_result in bucket_results? { + for (original_index, subtree) in bucket_result { + results[original_index] = subtree; + } + } + + Ok(results) + } + + /// Retrieves a single inner node (non-leaf node) from within a Subtree. + /// + /// This method is intended for accessing nodes at depths greater than or equal to + /// `IN_MEMORY_DEPTH`. It first finds the appropriate Subtree containing the `index`, then + /// delegates to `Subtree::get_inner_node()`. + /// + /// # Errors + /// - `StorageError::Backend`: If `index.depth() < IN_MEMORY_DEPTH`, or if RocksDB errors occur. + /// - `StorageError::Value`: If the containing Subtree data is corrupt. + fn get_inner_node(&self, index: NodeIndex) -> Result, StorageError> { + if index.depth() < IN_MEMORY_DEPTH { + return Err(StorageError::Unsupported( + "Cannot get inner node from upper part of the tree".into(), + )); + } + let subtree_root_index = Subtree::find_subtree_root(index); + Ok(self + .get_subtree(subtree_root_index)? + .and_then(|subtree| subtree.get_inner_node(index))) + } + + /// Returns an iterator over all (logical u64 index, `SmtLeaf`) pairs in the `LEAVES_CF`. + /// + /// The iterator uses a RocksDB snapshot for consistency and iterates in lexicographical + /// order of the keys (leaf indices). Errors during iteration (e.g., deserialization issues) + /// cause the iterator to skip the problematic item and attempt to continue. + /// + /// # Errors + /// - `StorageError::Backend`: If the leaves column family is missing or a RocksDB error occurs + /// during iterator creation. + fn iter_leaves(&self) -> Result + '_>, StorageError> { + let cf = self.cf_handle(LEAVES_CF)?; + let mut read_opts = ReadOptions::default(); + read_opts.set_total_order_seek(true); + let db_iter = self.db.iterator_cf_opt(cf, read_opts, IteratorMode::Start); + + Ok(Box::new(RocksDbDirectLeafIterator { iter: db_iter })) + } + + /// Returns an iterator over all `Subtree` instances across all subtree column families. + /// + /// The iterator uses a RocksDB snapshot and iterates in lexicographical order of keys + /// (subtree root NodeIndex) across all depth column families (24, 32, 40, 48, 56). + /// Errors during iteration (e.g., deserialization issues) cause the iterator to skip + /// the problematic item and attempt to continue. + /// + /// # Errors + /// - `StorageError::Backend`: If any subtree column family is missing or a RocksDB error occurs + /// during iterator creation. + fn iter_subtrees(&self) -> Result + '_>, StorageError> { + // All subtree column family names in order + const SUBTREE_CFS: [&str; 5] = + [SUBTREE_24_CF, SUBTREE_32_CF, SUBTREE_40_CF, SUBTREE_48_CF, SUBTREE_56_CF]; + + let mut cf_handles = Vec::new(); + for cf_name in SUBTREE_CFS { + cf_handles.push(self.cf_handle(cf_name)?); + } + + Ok(Box::new(RocksDbSubtreeIterator::new(&self.db, cf_handles))) + } + + /// Retrieves all depth 24 hashes for fast tree rebuilding. + /// + /// # Errors + /// - `StorageError::Backend`: If the depth24 column family is missing or a RocksDB error + /// occurs. + /// - `StorageError::Value`: If any hash bytes are corrupt. + fn get_depth24(&self) -> Result, StorageError> { + let cf = self.cf_handle(DEPTH_24_CF)?; + read_depth24_entries(self.db.iterator_cf(cf, IteratorMode::Start)) + } +} + +impl SmtStorage for RocksDbStorage { + type Reader = RocksDbSnapshotStorage; + /// Inserts a key-value pair into the SMT leaf at the specified logical `index`. /// /// This operation involves: @@ -427,34 +615,12 @@ impl SmtStorage for RocksDbStorage { Ok(current_value) } - /// Retrieves a single SMT leaf node by its logical `index` from the `LEAVES_CF` column family. - /// - /// # Errors - /// - `StorageError::Backend`: If the leaves column family is missing or a RocksDB error occurs. - /// - `StorageError::DeserializationError`: If the retrieved leaf data is corrupt. - fn get_leaf(&self, index: u64) -> Result, StorageError> { - let cf = self.cf_handle(LEAVES_CF)?; - let key = Self::index_db_key(index); - match self.db.get_cf(cf, key).map_err(map_rocksdb_err)? { - Some(bytes) => { - let leaf = SmtLeaf::read_from_bytes_with_budget(&bytes, bytes.len())?; - Ok(Some(leaf)) - }, - None => Ok(None), - } - } - /// Sets or updates multiple SMT leaf nodes in the `LEAVES_CF` column family. /// /// This method performs a batch write to RocksDB. It also updates the global /// leaf and entry counts in the `METADATA_CF` based on the provided `leaves` map, /// overwriting any previous counts. /// - /// Note: This method assumes the provided `leaves` map represents the entirety - /// of leaves to be stored or that counts are being explicitly reset. - /// Note: This only updates the leaves. Callers are responsible for recomputing and - /// persisting the corresponding inner nodes. - /// /// # Errors /// - `StorageError::Backend`: If column families are missing or a RocksDB error occurs. fn set_leaves(&mut self, leaves: Map) -> Result<(), StorageError> { @@ -481,9 +647,6 @@ impl SmtStorage for RocksDbStorage { /// if using this method directly, or preferably use `apply` or `remove_value` which handle /// counts. /// - /// Note: This only removes the leaf. Callers are responsible for recomputing and - /// persisting the corresponding inner nodes. - /// /// # Errors /// - `StorageError::Backend`: If the leaves column family is missing or a RocksDB error occurs. /// - `StorageError::DeserializationError`: If the retrieved (to be returned) leaf data is @@ -499,144 +662,6 @@ impl SmtStorage for RocksDbStorage { })) } - /// Retrieves multiple SMT leaf nodes by their logical `indices` using RocksDB's `multi_get_cf`. - /// - /// # Errors - /// - `StorageError::Backend`: If the leaves column family is missing or a RocksDB error occurs. - /// - `StorageError::DeserializationError`: If any retrieved leaf data is corrupt. - fn get_leaves(&self, indices: &[u64]) -> Result>, StorageError> { - let cf = self.cf_handle(LEAVES_CF)?; - let db_keys: Vec<[u8; 8]> = indices.iter().map(|&idx| Self::index_db_key(idx)).collect(); - let results = self.db.multi_get_cf(db_keys.iter().map(|k| (cf, k.as_ref()))); - - results - .into_iter() - .map(|result| match result { - Ok(Some(bytes)) => { - Ok(Some(SmtLeaf::read_from_bytes_with_budget(&bytes, bytes.len())?)) - }, - Ok(None) => Ok(None), - Err(e) => Err(map_rocksdb_err(e)), - }) - .collect() - } - - /// Returns true if the storage has any leaves. - /// - /// # Errors - /// Returns `StorageError` if the storage read operation fails. - fn has_leaves(&self) -> Result { - Ok(self.leaf_count()? > 0) - } - - /// Batch-retrieves multiple subtrees from RocksDB by their node indices. - /// - /// This method groups requests by subtree depth into column family buckets, - /// then performs parallel `multi_get` operations to efficiently retrieve - /// all subtrees. Results are deserialized and placed in the same order as - /// the input indices. - /// - /// Note: Retrieval is performed in parallel. If multiple errors occur (e.g., - /// deserialization or backend errors), only the first one encountered is returned. - /// Other errors will be discarded. - /// - /// # Parameters - /// - `indices`: A slice of subtree root indices to retrieve. - /// - /// # Returns - /// - A `Vec>` where each index corresponds to the original input. - /// - `Ok(...)` if all fetches succeed. - /// - `Err(StorageError)` if any RocksDB access or deserialization fails. - fn get_subtree(&self, index: NodeIndex) -> Result, StorageError> { - let cf = self.subtree_cf(index); - let key = Self::subtree_db_key(index); - match self.db.get_cf(cf, key).map_err(map_rocksdb_err)? { - Some(bytes) => { - let subtree = Subtree::from_vec(index, &bytes)?; - Ok(Some(subtree)) - }, - None => Ok(None), - } - } - - /// Batch-retrieves multiple subtrees from RocksDB by their node indices. - /// - /// This method groups requests by subtree depth into column family buckets, - /// then performs parallel `multi_get` operations to efficiently retrieve - /// all subtrees. Results are deserialized and placed in the same order as - /// the input indices. - /// - /// # Parameters - /// - `indices`: A slice of subtree root indices to retrieve. - /// - /// # Returns - /// - A `Vec>` where each index corresponds to the original input. - /// - `Ok(...)` if all fetches succeed. - /// - `Err(StorageError)` if any RocksDB access or deserialization fails. - fn get_subtrees(&self, indices: &[NodeIndex]) -> Result>, StorageError> { - use rayon::prelude::*; - - let mut depth_buckets: [Vec<(usize, NodeIndex)>; 5] = Default::default(); - - for (original_index, &node_index) in indices.iter().enumerate() { - let depth = node_index.depth(); - let bucket_index = match depth { - 56 => 0, - 48 => 1, - 40 => 2, - 32 => 3, - 24 => 4, - _ => { - return Err(StorageError::Unsupported(format!( - "unsupported subtree depth {depth}" - ))); - }, - }; - depth_buckets[bucket_index].push((original_index, node_index)); - } - let mut results = vec![None; indices.len()]; - - // Process depth buckets in parallel - let bucket_results: Result, StorageError> = depth_buckets - .into_par_iter() - .enumerate() - .filter(|(_, bucket)| !bucket.is_empty()) - .map( - |(bucket_index, bucket)| -> Result)>, StorageError> { - let depth = SUBTREE_DEPTHS[bucket_index]; - let cf = self.cf_handle(cf_for_depth(depth))?; - let keys: Vec<_> = - bucket.iter().map(|(_, idx)| Self::subtree_db_key(*idx)).collect(); - - let db_results = self.db.multi_get_cf(keys.iter().map(|k| (cf, k.as_ref()))); - - // Process results for this bucket - bucket - .into_iter() - .zip(db_results) - .map(|((original_index, node_index), db_result)| { - let subtree = match db_result { - Ok(Some(bytes)) => Some(Subtree::from_vec(node_index, &bytes)?), - Ok(None) => None, - Err(e) => return Err(map_rocksdb_err(e)), - }; - Ok((original_index, subtree)) - }) - .collect() - }, - ) - .collect(); - - // Flatten results and place them in correct positions - for bucket_result in bucket_results? { - for (original_index, subtree) in bucket_result { - results[original_index] = subtree; - } - } - - Ok(results) - } - /// Stores a single subtree in RocksDB and optionally updates the depth-24 root cache. /// /// The subtree is serialized and written to its corresponding column family. @@ -732,27 +757,6 @@ impl SmtStorage for RocksDbStorage { Ok(()) } - /// Retrieves a single inner node (non-leaf node) from within a Subtree. - /// - /// This method is intended for accessing nodes at depths greater than or equal to - /// `IN_MEMORY_DEPTH`. It first finds the appropriate Subtree containing the `index`, then - /// delegates to `Subtree::get_inner_node()`. - /// - /// # Errors - /// - `StorageError::Backend`: If `index.depth() < IN_MEMORY_DEPTH`, or if RocksDB errors occur. - /// - `StorageError::Value`: If the containing Subtree data is corrupt. - fn get_inner_node(&self, index: NodeIndex) -> Result, StorageError> { - if index.depth() < IN_MEMORY_DEPTH { - return Err(StorageError::Unsupported( - "Cannot get inner node from upper part of the tree".into(), - )); - } - let subtree_root_index = Subtree::find_subtree_root(index); - Ok(self - .get_subtree(subtree_root_index)? - .and_then(|subtree| subtree.get_inner_node(index))) - } - /// Sets or updates a single inner node (non-leaf node) within a Subtree. /// /// This method is intended for `index.depth() >= IN_MEMORY_DEPTH`. @@ -923,68 +927,9 @@ impl SmtStorage for RocksDbStorage { Ok(()) } - /// Returns an iterator over all (logical u64 index, `SmtLeaf`) pairs in the `LEAVES_CF`. - /// - /// The iterator uses a RocksDB snapshot for consistency and iterates in lexicographical - /// order of the keys (leaf indices). Errors during iteration (e.g., deserialization issues) - /// cause the iterator to skip the problematic item and attempt to continue. - /// - /// # Errors - /// - `StorageError::Backend`: If the leaves column family is missing or a RocksDB error occurs - /// during iterator creation. - fn iter_leaves(&self) -> Result + '_>, StorageError> { - let cf = self.cf_handle(LEAVES_CF)?; - let mut read_opts = ReadOptions::default(); - read_opts.set_total_order_seek(true); - let db_iter = self.db.iterator_cf_opt(cf, read_opts, IteratorMode::Start); - - Ok(Box::new(RocksDbDirectLeafIterator { iter: db_iter })) - } - - /// Returns an iterator over all `Subtree` instances across all subtree column families. - /// - /// The iterator uses a RocksDB snapshot and iterates in lexicographical order of keys - /// (subtree root NodeIndex) across all depth column families (24, 32, 40, 48, 56). - /// Errors during iteration (e.g., deserialization issues) cause the iterator to skip - /// the problematic item and attempt to continue. - /// - /// # Errors - /// - `StorageError::Backend`: If any subtree column family is missing or a RocksDB error occurs - /// during iterator creation. - fn iter_subtrees(&self) -> Result + '_>, StorageError> { - // All subtree column family names in order - const SUBTREE_CFS: [&str; 5] = - [SUBTREE_24_CF, SUBTREE_32_CF, SUBTREE_40_CF, SUBTREE_48_CF, SUBTREE_56_CF]; - - let mut cf_handles = Vec::new(); - for cf_name in SUBTREE_CFS { - cf_handles.push(self.cf_handle(cf_name)?); - } - - Ok(Box::new(RocksDbSubtreeIterator::new(&self.db, cf_handles))) - } - - /// Retrieves all depth 24 hashes for fast tree rebuilding. - /// - /// # Errors - /// - `StorageError::Backend`: If the depth24 column family is missing or a RocksDB error - /// occurs. - /// - `StorageError::Value`: If any hash bytes are corrupt. - fn get_depth24(&self) -> Result, StorageError> { - let cf = self.cf_handle(DEPTH_24_CF)?; - let iter = self.db.iterator_cf(cf, IteratorMode::Start); - let mut hashes = Vec::new(); - - for item in iter { - let (key_bytes, value_bytes) = item.map_err(map_rocksdb_err)?; - - let index = index_from_key_bytes(&key_bytes)?; - let hash = Word::read_from_bytes_with_budget(&value_bytes, value_bytes.len())?; - - hashes.push((index, hash)); - } - - Ok(hashes) + /// Returns the read-only snapshot storage. + fn reader(&self) -> Self::Reader { + self.snapshot_storage() } } @@ -1010,8 +955,8 @@ impl Drop for RocksDbStorage { /// Wraps a `DBIteratorWithThreadMode` and handles deserialization of keys to `u64` (leaf index) /// and values to `SmtLeaf`. Skips items that fail to deserialize or if a RocksDB error occurs /// for an item, attempting to continue iteration. -struct RocksDbDirectLeafIterator<'a> { - iter: DBIteratorWithThreadMode<'a, DB>, +pub(crate) struct RocksDbDirectLeafIterator<'a> { + pub(crate) iter: DBIteratorWithThreadMode<'a, DB>, } impl Iterator for RocksDbDirectLeafIterator<'_> { @@ -1032,7 +977,7 @@ impl Iterator for RocksDbDirectLeafIterator<'_> { /// /// Iterates through all subtree column families (24, 32, 40, 48, 56) sequentially. /// When one column family is exhausted, it moves to the next one. -struct RocksDbSubtreeIterator<'a> { +pub(crate) struct RocksDbSubtreeIterator<'a> { db: &'a DB, cf_handles: Vec<&'a rocksdb::ColumnFamily>, current_cf_index: usize, @@ -1040,7 +985,7 @@ struct RocksDbSubtreeIterator<'a> { } impl<'a> RocksDbSubtreeIterator<'a> { - fn new(db: &'a DB, cf_handles: Vec<&'a rocksdb::ColumnFamily>) -> Self { + pub(crate) fn new(db: &'a DB, cf_handles: Vec<&'a rocksdb::ColumnFamily>) -> Self { let mut iterator = Self { db, cf_handles, @@ -1239,20 +1184,6 @@ impl AsRef<[u8]> for KeyBytes { // HELPERS // -------------------------------------------------------------------------------------------- -/// Deserializes an index (u64) from a RocksDB key byte slice. -/// Expects `key_bytes` to be exactly 8 bytes long. -/// -/// # Errors -/// - `StorageError::BadKeyLen`: If `key_bytes` is not 8 bytes long or conversion fails. -fn index_from_key_bytes(key_bytes: &[u8]) -> Result { - if key_bytes.len() != 8 { - return Err(StorageError::BadKeyLen { expected: 8, found: key_bytes.len() }); - } - let mut arr = [0u8; 8]; - arr.copy_from_slice(key_bytes); - Ok(u64::from_be_bytes(arr)) -} - /// Reconstructs a `NodeIndex` from the variable-length subtree key stored in `RocksDB`. /// /// * `key_bytes` is the big-endian tail of the 64-bit value: @@ -1288,7 +1219,7 @@ fn subtree_root_from_key_bytes(key_bytes: &[u8], depth: u8) -> Result &'static str { +pub(crate) fn cf_for_depth(depth: u8) -> &'static str { match depth { 24 => SUBTREE_24_CF, 32 => SUBTREE_32_CF, diff --git a/crates/large-smt-backend-rocksdb/src/rocksdb_snapshot.rs b/crates/large-smt-backend-rocksdb/src/rocksdb_snapshot.rs new file mode 100644 index 000000000..fc4e0e1b6 --- /dev/null +++ b/crates/large-smt-backend-rocksdb/src/rocksdb_snapshot.rs @@ -0,0 +1,243 @@ +use alloc::boxed::Box; +use alloc::vec::Vec; +use std::mem::ManuallyDrop; +use std::sync::Arc; + +use miden_crypto::merkle::NodeIndex; +use miden_crypto::merkle::smt::{InnerNode, SmtLeaf, Subtree}; +use rocksdb::{DB, IteratorMode, ReadOptions}; + +use super::{SmtStorageReader, StorageError}; +use crate::Word; +use crate::helpers::{ + bucket_by_depth, + map_rocksdb_err, + read_count, + read_depth24_entries, + read_leaf, + read_leaves, + read_subtree, + read_subtree_batch, +}; +use crate::rocksdb::{ + DEPTH_24_CF, + ENTRY_COUNT_KEY, + IN_MEMORY_DEPTH, + LEAF_COUNT_KEY, + LEAVES_CF, + METADATA_CF, + RocksDbDirectLeafIterator, + RocksDbStorage, + RocksDbSubtreeIterator, + SUBTREE_24_CF, + SUBTREE_32_CF, + SUBTREE_40_CF, + SUBTREE_48_CF, + SUBTREE_56_CF, + SUBTREE_DEPTHS, + cf_for_depth, +}; + +// SNAPSHOT STORAGE +// -------------------------------------------------------------------------------------------- + +/// Inner state shared by all clones of a snapshot storage. +/// +/// This struct pairs a RocksDB snapshot with an `Arc` to ensure the database +/// lives for as long as the snapshot that references it. +/// +/// # Safety +/// +/// `snapshot` borrows from `db`, so `snapshot` must be dropped before `db`'s refcount +/// is decremented. This is enforced by the `Drop` impl, which manually drops the +/// snapshot before the compiler auto-drops the `Arc`. +struct SnapshotInner { + snapshot: ManuallyDrop>, + db: Arc, +} + +impl Drop for SnapshotInner { + fn drop(&mut self) { + // Ensure that the snapshot is dropped before the database reference count is decremented. + unsafe { + ManuallyDrop::drop(&mut self.snapshot); + } + } +} + +/// A read-only, `Clone`-able RocksDB storage that reads from a point-in-time snapshot. +/// +/// All clones share the same snapshot via `Arc`, providing a consistent view of +/// the database at the time the snapshot was created. +/// +/// Implements [`SmtStorageReader`] only (read-only snapshot). +#[derive(Clone)] +pub struct RocksDbSnapshotStorage { + inner: Arc, +} + +impl std::fmt::Debug for RocksDbSnapshotStorage { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("RocksDbSnapshotStorage").finish_non_exhaustive() + } +} + +impl RocksDbSnapshotStorage { + /// Creates a new snapshot storage from the given database. + pub fn new(db: Arc) -> Self { + // SAFETY: We can transmute the snapshot to a static lifetime because we know that + // the database will outlive the snapshot. + let snapshot = db.snapshot(); + let snapshot: rocksdb::Snapshot<'static> = unsafe { std::mem::transmute(snapshot) }; + Self { + inner: Arc::new(SnapshotInner { + snapshot: ManuallyDrop::new(snapshot), + db, + }), + } + } + + fn cf_handle(&self, name: &str) -> Result<&rocksdb::ColumnFamily, StorageError> { + self.inner + .db + .cf_handle(name) + .ok_or_else(|| StorageError::Unsupported(format!("unknown column family `{name}`"))) + } + + #[inline(always)] + fn subtree_cf(&self, index: NodeIndex) -> &rocksdb::ColumnFamily { + let name = cf_for_depth(index.depth()); + self.cf_handle(name).expect("CF handle missing") + } +} + +impl SmtStorageReader for RocksDbSnapshotStorage { + fn leaf_count(&self) -> Result { + let cf = self.cf_handle(METADATA_CF)?; + self.inner + .snapshot + .get_cf(cf, LEAF_COUNT_KEY) + .map_err(map_rocksdb_err)? + .map_or(Ok(0), |b| read_count("leaf count", &b)) + } + + fn entry_count(&self) -> Result { + let cf = self.cf_handle(METADATA_CF)?; + self.inner + .snapshot + .get_cf(cf, ENTRY_COUNT_KEY) + .map_err(map_rocksdb_err)? + .map_or(Ok(0), |b| read_count("entry count", &b)) + } + + fn get_leaf(&self, index: u64) -> Result, StorageError> { + let cf = self.cf_handle(LEAVES_CF)?; + let key = RocksDbStorage::index_db_key(index); + self.inner + .snapshot + .get_cf(cf, key) + .map_err(map_rocksdb_err)? + .map_or(Ok(None), read_leaf) + } + + fn get_leaves(&self, indices: &[u64]) -> Result>, StorageError> { + let cf = self.cf_handle(LEAVES_CF)?; + let db_keys: Vec<[u8; 8]> = + indices.iter().map(|&idx| RocksDbStorage::index_db_key(idx)).collect(); + let results = self.inner.snapshot.multi_get_cf(db_keys.iter().map(|k| (cf, k.as_ref()))); + + let leaves = results + .into_iter() + .collect::>>, rocksdb::Error>>() + .map_err(map_rocksdb_err)?; + read_leaves(leaves) + } + + fn has_leaves(&self) -> Result { + Ok(self.leaf_count()? > 0) + } + + fn get_subtree(&self, index: NodeIndex) -> Result, StorageError> { + let cf = self.subtree_cf(index); + let key = RocksDbStorage::subtree_db_key(index); + read_subtree(index, self.inner.snapshot.get_cf(cf, key).map_err(map_rocksdb_err)?) + } + + fn get_subtrees(&self, indices: &[NodeIndex]) -> Result>, StorageError> { + use rayon::prelude::*; + + let depth_buckets = bucket_by_depth(indices)?; + let mut results = vec![None; indices.len()]; + + let bucket_results: Result, StorageError> = depth_buckets + .into_par_iter() + .enumerate() + .filter(|(_, bucket)| !bucket.is_empty()) + .map( + |(bucket_index, bucket)| -> Result)>, StorageError> { + let depth = SUBTREE_DEPTHS[bucket_index]; + let cf = self.cf_handle(cf_for_depth(depth))?; + let keys: Vec<_> = bucket + .iter() + .map(|(_, idx)| RocksDbStorage::subtree_db_key(*idx)) + .collect(); + + let db_results: Vec<_> = self + .inner + .snapshot + .multi_get_cf(keys.iter().map(|k| (cf, k.as_ref()))) + .into_iter() + .collect(); + + read_subtree_batch(bucket, db_results) + }, + ) + .collect(); + + for bucket_result in bucket_results? { + for (original_index, subtree) in bucket_result { + results[original_index] = subtree; + } + } + + Ok(results) + } + + fn get_inner_node(&self, index: NodeIndex) -> Result, StorageError> { + if index.depth() < IN_MEMORY_DEPTH { + return Err(StorageError::Unsupported( + "Cannot get inner node from upper part of the tree".into(), + )); + } + let subtree_root_index = Subtree::find_subtree_root(index); + Ok(self + .get_subtree(subtree_root_index)? + .and_then(|subtree| subtree.get_inner_node(index))) + } + + fn iter_leaves(&self) -> Result + '_>, StorageError> { + let cf = self.cf_handle(LEAVES_CF)?; + let mut read_opts = ReadOptions::default(); + read_opts.set_total_order_seek(true); + let db_iter = self.inner.snapshot.iterator_cf_opt(cf, read_opts, IteratorMode::Start); + + Ok(Box::new(RocksDbDirectLeafIterator { iter: db_iter })) + } + + fn iter_subtrees(&self) -> Result + '_>, StorageError> { + const SUBTREE_CFS: [&str; 5] = + [SUBTREE_24_CF, SUBTREE_32_CF, SUBTREE_40_CF, SUBTREE_48_CF, SUBTREE_56_CF]; + + let mut cf_handles = Vec::new(); + for cf_name in SUBTREE_CFS { + cf_handles.push(self.cf_handle(cf_name)?); + } + + Ok(Box::new(RocksDbSubtreeIterator::new(&self.inner.db, cf_handles))) + } + + fn get_depth24(&self) -> Result, StorageError> { + let cf = self.cf_handle(DEPTH_24_CF)?; + read_depth24_entries(self.inner.snapshot.iterator_cf(cf, IteratorMode::Start)) + } +} diff --git a/crates/ntx-builder/src/builder.rs b/crates/ntx-builder/src/builder.rs index 86246188e..869aa4118 100644 --- a/crates/ntx-builder/src/builder.rs +++ b/crates/ntx-builder/src/builder.rs @@ -292,6 +292,14 @@ impl NetworkTransactionBuilder { async fn update_chain_tip(&mut self, tip: BlockHeader) { let mut chain_state = self.chain_state.write().await; + // Skip if this block is already accounted for. This can happen during initialization: + // the mempool subscription is created before the chain state is fetched from the store, + // so BlockCommitted events for blocks that occurred in between are already reflected in + // the initial chain state. + if tip.block_num() <= chain_state.chain_tip_header.block_num() { + return; + } + // Update MMR which lags by one block. let mmr_tip = chain_state.chain_tip_header.clone(); Arc::make_mut(&mut chain_state.chain_mmr).add_block(&mmr_tip, true); diff --git a/crates/store/Cargo.toml b/crates/store/Cargo.toml index aa8cfc3a2..e6230a082 100644 --- a/crates/store/Cargo.toml +++ b/crates/store/Cargo.toml @@ -16,6 +16,7 @@ workspace = true [dependencies] anyhow = { workspace = true } +arc-swap = "1" deadpool = { default-features = false, features = ["managed", "rt_tokio_1"], version = "0.12" } deadpool-diesel = { features = ["sqlite"], version = "0.6" } diesel = { features = ["numeric", "sqlite"], version = "2.3" } diff --git a/crates/store/src/account_state_forest/mod.rs b/crates/store/src/account_state_forest/mod.rs index d20ecdf2d..a4dabb89d 100644 --- a/crates/store/src/account_state_forest/mod.rs +++ b/crates/store/src/account_state_forest/mod.rs @@ -58,12 +58,15 @@ pub enum WitnessError { StorageMapError(#[from] StorageMapError), #[error("failed to construct asset")] AssetError(#[from] AssetError), + #[error("block {0} is unknown")] + UnknownBlock(BlockNumber), } // ACCOUNT STATE FOREST // ================================================================================================ /// Container for forest-related state that needs to be updated atomically. +#[derive(Clone)] pub(crate) struct AccountStateForest { /// `LargeSmtForest` for efficient account storage reconstruction. /// Populated during block import with storage and vault SMTs. diff --git a/crates/store/src/accounts/mod.rs b/crates/store/src/accounts/mod.rs index 31c7ed9c6..91c1f9d00 100644 --- a/crates/store/src/accounts/mod.rs +++ b/crates/store/src/accounts/mod.rs @@ -15,6 +15,7 @@ use miden_protocol::crypto::merkle::smt::{ SMT_DEPTH, SmtLeaf, SmtStorage, + SmtStorageReader, }; use miden_protocol::crypto::merkle::{ EmptySubtreeRoots, @@ -71,7 +72,7 @@ enum HistoricalSelector { /// Captures reversion state for historical queries at a specific block. #[derive(Debug, Clone)] -struct HistoricalOverlay { +pub(crate) struct HistoricalOverlay { block_number: BlockNumber, root: Word, node_mutations: HashMap, @@ -118,8 +119,8 @@ impl HistoricalOverlay { /// This structure maintains a sliding window of historical account states by storing /// reversion data (mutations that undo changes). Historical witnesses are reconstructed /// by starting from the latest state and applying reversion overlays backwards in time. -#[derive(Debug)] -pub struct AccountTreeWithHistory { +#[derive(Debug, Clone)] +pub struct AccountTreeWithHistory { /// The current block number (latest state). block_number: BlockNumber, /// The latest account tree state. @@ -128,7 +129,7 @@ pub struct AccountTreeWithHistory { overlays: BTreeMap, } -impl AccountTreeWithHistory { +impl AccountTreeWithHistory { /// Maximum number of historical blocks to maintain. pub const MAX_HISTORY: usize = 50; @@ -151,6 +152,17 @@ impl AccountTreeWithHistory { } } + /// Creates an `AccountTreeWithHistory` from its constituent parts. + /// + /// This is used by the writer to construct a snapshot-backed read-only copy. + pub(crate) fn from_parts( + latest: AccountTree>, + block_number: BlockNumber, + overlays: BTreeMap, + ) -> Self { + Self { block_number, latest, overlays } + } + // PUBLIC ACCESSORS // -------------------------------------------------------------------------------------------- @@ -230,6 +242,11 @@ impl AccountTreeWithHistory { self.latest.contains_account_id_prefix(prefix) } + /// Returns a reference to the historical overlays. + pub(crate) fn overlays(&self) -> &BTreeMap { + &self.overlays + } + // PRIVATE HELPERS - HISTORICAL RECONSTRUCTION // -------------------------------------------------------------------------------------------- @@ -345,10 +362,20 @@ impl AccountTreeWithHistory { let path = SparseMerklePath::try_from(path).ok()?; Some((path, leaf)) } +} - // PUBLIC MUTATORS +impl AccountTreeWithHistory { + // MUTATORS // -------------------------------------------------------------------------------------------- + /// Computes mutations relative to the latest state. + pub fn compute_mutations( + &self, + account_commitments: impl IntoIterator, + ) -> Result { + Ok(self.latest.compute_mutations(account_commitments)?) + } + /// Computes and applies mutations in one operation. /// /// This is a convenience method primarily for testing. @@ -360,14 +387,6 @@ impl AccountTreeWithHistory { self.apply_mutations(mutations) } - /// Computes mutations relative to the latest state. - pub fn compute_mutations( - &self, - account_commitments: impl IntoIterator, - ) -> Result { - Ok(self.latest.compute_mutations(account_commitments)?) - } - /// Applies mutations and advances to the next block. /// /// This method: @@ -397,3 +416,22 @@ impl AccountTreeWithHistory { Ok(()) } } + +impl AccountTreeWithHistory +where + S: SmtStorage, +{ + /// Returns a read-only `AccountTreeWithHistory` backed by a reader view of this tree's + /// storage. + /// + /// The returned tree shares the same block number and historical overlays as `self`, and its + /// latest `AccountTree` is produced by [`AccountTree::reader`]. The returned tree's storage + /// type is `S::Reader: SmtStorageReader`, so it cannot be used for mutations. + pub fn reader(&self) -> AccountTreeWithHistory { + AccountTreeWithHistory { + block_number: self.block_number, + latest: self.latest.reader(), + overlays: self.overlays.clone(), + } + } +} diff --git a/crates/store/src/blocks.rs b/crates/store/src/blocks.rs index 749ef0289..4ed251bd5 100644 --- a/crates/store/src/blocks.rs +++ b/crates/store/src/blocks.rs @@ -83,9 +83,9 @@ impl BlockStore { #[instrument( target = COMPONENT, name = "store.block_store.save_block", - skip(self, data), + skip_all, err, - fields(block_size = data.len()) + fields(block.number = block_num.as_u32(), block.size = data.len()) )] pub async fn save_block(&self, block_num: BlockNumber, data: &[u8]) -> std::io::Result<()> { let (epoch_path, block_path) = self.epoch_block_path(block_num)?; diff --git a/crates/store/src/db/mod.rs b/crates/store/src/db/mod.rs index b4c6c8ce7..a18de5239 100644 --- a/crates/store/src/db/mod.rs +++ b/crates/store/src/db/mod.rs @@ -25,7 +25,6 @@ use miden_protocol::note::{ }; use miden_protocol::transaction::TransactionHeader; use miden_protocol::utils::serde::{Deserializable, Serializable}; -use tokio::sync::oneshot; use tracing::{info, instrument}; use crate::COMPONENT; @@ -517,26 +516,30 @@ impl Db { .await } - /// Returns all note commitments from the DB that match the provided ones. + /// Returns all note commitments from the DB that match the provided ones, scoped to notes + /// committed at or before the given block number. #[instrument(level = "debug", target = COMPONENT, skip_all, ret(level = "debug"), err)] pub async fn select_existing_note_commitments( &self, note_commitments: Vec, + block_num: BlockNumber, ) -> Result> { self.transact("note by commitment", move |conn| { - queries::select_existing_note_commitments(conn, note_commitments.as_slice()) + queries::select_existing_note_commitments(conn, note_commitments.as_slice(), block_num) }) .await } - /// Loads inclusion proofs for notes matching the given note commitments. + /// Loads inclusion proofs for notes matching the given note commitments, scoped to notes + /// committed at or before the given block number. #[instrument(level = "debug", target = COMPONENT, skip_all, ret(level = "debug"), err)] pub async fn select_note_inclusion_proofs( &self, note_commitments: BTreeSet, + block_num: BlockNumber, ) -> Result> { self.transact("block note inclusion proofs by commitment", move |conn| { - models::queries::select_note_inclusion_proofs(conn, ¬e_commitments) + models::queries::select_note_inclusion_proofs(conn, ¬e_commitments, block_num) }) .await } @@ -549,25 +552,13 @@ impl Db { #[instrument(target = COMPONENT, skip_all, err)] pub async fn apply_block( &self, - allow_acquire: oneshot::Sender<()>, - acquire_done: oneshot::Receiver<()>, signed_block: SignedBlock, notes: Vec<(NoteRecord, Option)>, proving_inputs: Option, ) -> Result<()> { self.transact("apply block", move |conn| -> Result<()> { models::queries::apply_block(conn, &signed_block, ¬es, proving_inputs)?; - - // XXX FIXME TODO free floating mutex MUST NOT exist - // it doesn't bind it properly to the data locked! - if allow_acquire.send(()).is_err() { - tracing::warn!(target: COMPONENT, "failed to send notification for successful block application, potential deadlock"); - } - models::queries::prune_history(conn, signed_block.header().block_num())?; - - acquire_done.blocking_recv()?; - Ok(()) }) .await diff --git a/crates/store/src/db/models/queries/notes.rs b/crates/store/src/db/models/queries/notes.rs index 266ef7f1a..db966378e 100644 --- a/crates/store/src/db/models/queries/notes.rs +++ b/crates/store/src/db/models/queries/notes.rs @@ -233,7 +233,8 @@ pub(crate) fn select_notes_by_id( Ok(records) } -/// Select the subset of note commitments that already exist in the notes table +/// Select the subset of note commitments that already exist in the notes table, scoped to notes +/// committed at or before the given block number. /// /// # Raw SQL /// @@ -242,10 +243,12 @@ pub(crate) fn select_notes_by_id( /// notes.note_commitment /// FROM notes /// WHERE note_commitment IN (?1) +/// AND committed_at <= ?2 /// ``` pub(crate) fn select_existing_note_commitments( conn: &mut SqliteConnection, note_commitments: &[Word], + block_num: BlockNumber, ) -> Result, DatabaseError> { QueryParamNoteCommitmentLimit::check(note_commitments.len())?; @@ -253,6 +256,7 @@ pub(crate) fn select_existing_note_commitments( let raw_commitments = SelectDsl::select(schema::notes::table, schema::notes::note_commitment) .filter(schema::notes::note_commitment.eq_any(¬e_commitments)) + .filter(schema::notes::committed_at.le(block_num.to_raw_sql())) .load::>(conn)?; let commitments = raw_commitments @@ -311,36 +315,12 @@ pub(crate) fn select_all_notes( Ok(records) } -/// Select note inclusion proofs matching the note commitments. -/// -/// # Parameters -/// * `note_ids`: Set of note IDs to query -/// - Limit: 0 <= count <= 1000 -/// -/// # Returns -/// -/// - Empty map if no matching `note`. -/// - Otherwise, note inclusion proofs, which `note_id` matches the `NoteId` as bytes. -/// -/// # Raw SQL -/// -/// ```sql -/// SELECT -/// committed_at, -/// note_id, -/// batch_index, -/// note_index, -/// inclusion_path -/// FROM -/// notes -/// WHERE -/// note_id IN (?1) -/// ORDER BY -/// committed_at ASC -/// ``` +/// Select note inclusion proofs for notes matching the given commitments, scoped to notes +/// committed at or before the given block number. pub(crate) fn select_note_inclusion_proofs( conn: &mut SqliteConnection, note_commitments: &BTreeSet, + block_num: BlockNumber, ) -> Result, DatabaseError> { QueryParamNoteCommitmentLimit::check(note_commitments.len())?; @@ -357,6 +337,7 @@ pub(crate) fn select_note_inclusion_proofs( ), ) .filter(schema::notes::note_commitment.eq_any(note_commitments)) + .filter(schema::notes::committed_at.le(block_num.to_raw_sql())) .order_by(schema::notes::committed_at.asc()) .load::<(i64, Vec, i32, i32, Vec)>(conn)?; @@ -804,7 +785,7 @@ impl TryInto for NoteMetadataRawRow { type Error = DatabaseError; fn try_into(self) -> Result { let sender = AccountId::read_from_bytes(&self.sender[..])?; - let note_type = NoteType::try_from(self.note_type as u32) + let note_type = NoteType::try_from(self.note_type as u8) .map_err(miden_node_db::DatabaseError::conversiont_from_sql::)?; let tag = NoteTag::new(self.tag as u32); let attachment = NoteAttachment::read_from_bytes(&self.attachment)?; diff --git a/crates/store/src/errors.rs b/crates/store/src/errors.rs index 1e008b593..e0c670a28 100644 --- a/crates/store/src/errors.rs +++ b/crates/store/src/errors.rs @@ -24,11 +24,13 @@ use miden_protocol::errors::{ use miden_protocol::note::{NoteId, Nullifier}; use miden_protocol::transaction::OutputNote; use thiserror::Error; +use tokio::sync::mpsc::error::SendError; use tokio::sync::oneshot::error::RecvError; use tonic::Status; use crate::account_state_forest::{AccountStateForestError, WitnessError}; use crate::db::models::conv::DatabaseTypeConversionError; +use crate::state::writer::WriteRequest; // PROOF SCHEDULER ERRORS // ================================================================================================= @@ -85,6 +87,8 @@ pub enum DatabaseError { AccountsNotFoundInDb(Vec), #[error("account {0} is not on the chain")] AccountNotPublic(AccountId), + #[error("block {0} is unknown")] + UnknownBlock(BlockNumber), #[error("invalid block parameters: block_from ({from}) > block_to ({to})")] InvalidBlockRange { from: BlockNumber, to: BlockNumber }, #[error("data corrupted: {0}")] @@ -198,13 +202,13 @@ pub enum ApplyBlockError { InvalidBlockError(#[from] InvalidBlockError), #[error("account state forest error")] AccountStateForestError(#[from] AccountStateForestError), + #[error("failed to send block to writer task (channel closed)")] + WriterTaskSendFailed(#[from] Box>), + #[error("writer task dropped the result channel")] + WriterTaskRecvFailed(#[from] RecvError), // OTHER ERRORS // --------------------------------------------------------------------------------------------- - #[error("block applying was cancelled because of closed channel on database side")] - ClosedChannel(#[from] RecvError), - #[error("concurrent write detected")] - ConcurrentWrite, #[error("database doesn't have any block header data")] DbBlockHeaderEmpty, #[error("database update failed: {0}")] @@ -229,6 +233,9 @@ pub enum GetBlockHeaderError { #[error("error retrieving the merkle proof for the block")] #[grpc(internal)] MmrError(#[from] MmrError), + #[error("block {0} is unknown")] + #[grpc(invalid_argument)] + UnknownBlock(BlockNumber), } #[derive(Error, Debug)] @@ -254,17 +261,16 @@ pub enum StateSyncError { EmptyBlockHeadersTable, #[error("failed to build MMR delta")] FailedToBuildMmrDelta(#[from] MmrError), + #[error("block {0} is unknown")] + UnknownBlock(BlockNumber), } #[derive(Error, Debug, GrpcError)] pub enum SyncChainMmrError { #[error("invalid block range")] InvalidBlockRange(#[source] InvalidBlockRange), - #[error("start block is not known")] - FutureBlock { - chain_tip: BlockNumber, - block_from: BlockNumber, - }, + #[error("block {0} is unknown")] + UnknownBlock(BlockNumber), #[error("malformed block number")] DeserializationFailed(#[source] ConversionError), #[error("database error")] @@ -348,6 +354,8 @@ pub enum SyncNullifiersError { InvalidPrefixLength(u32), #[error("malformed nullifier prefix")] DeserializationFailed(#[from] ConversionError), + #[error("block {0} is unknown")] + UnknownBlock(BlockNumber), } // SYNC ACCOUNT VAULT ERRORS diff --git a/crates/store/src/genesis/config/samples/02-with-account-files/bridge.mac b/crates/store/src/genesis/config/samples/02-with-account-files/bridge.mac index 4eaf26f63..f1e539b01 100644 Binary files a/crates/store/src/genesis/config/samples/02-with-account-files/bridge.mac and b/crates/store/src/genesis/config/samples/02-with-account-files/bridge.mac differ diff --git a/crates/store/src/server/api.rs b/crates/store/src/server/api.rs index d73cad9c1..4a0a7a623 100644 --- a/crates/store/src/server/api.rs +++ b/crates/store/src/server/api.rs @@ -17,6 +17,7 @@ use tracing::{info, instrument}; use crate::COMPONENT; use crate::errors::GetBlockInputsError; use crate::state::State; +use crate::state::writer::WriteHandle; // STORE API // ================================================================================================ @@ -24,6 +25,8 @@ use crate::state::State; #[derive(Clone)] pub struct StoreApi { pub(super) state: Arc, + /// Handle for submitting blocks to the writer loop. + pub(super) write_handle: WriteHandle, /// Sender used to notify the proof scheduler of the latest committed block number. pub(super) chain_tip_sender: watch::Sender, } diff --git a/crates/store/src/server/block_producer.rs b/crates/store/src/server/block_producer.rs index 0102a1928..e7ea9eac1 100644 --- a/crates/store/src/server/block_producer.rs +++ b/crates/store/src/server/block_producer.rs @@ -14,7 +14,7 @@ use miden_protocol::batch::OrderedBatches; use miden_protocol::block::{BlockBody, BlockHeader, BlockNumber, SignedBlock}; use miden_protocol::utils::serde::Deserializable; use tonic::{Request, Response, Status}; -use tracing::{Instrument, error}; +use tracing::error; use crate::errors::ApplyBlockError; use crate::server::api::{ @@ -26,7 +26,6 @@ use crate::server::api::{ validate_note_commitments, validate_nullifiers, }; -use crate::state::Finality; // BLOCK PRODUCER ENDPOINTS // ================================================================================================ @@ -89,44 +88,28 @@ impl block_producer_server::BlockProducer for StoreApi { block_inputs, }; - // We perform the apply block work in a separate task. This prevents the caller - // cancelling the request and thereby cancelling the task at an arbitrary point of - // execution. - // - // Normally this shouldn't be a problem, however our apply_block isn't quite ACID compliant - // so things get a bit messy. This is more a temporary hack-around to minimize this risk. - let this = self.clone(); - tokio::spawn( - async move { - let block_num = header.block_num(); - let signed_block = SignedBlock::new(header, body, signature) - .map_err(|err| Status::new(tonic::Code::Internal, err.as_report()))?; - // Note: This is an internal endpoint, so its safe to expose the full error - // report. - this.state - .apply_block(signed_block, Some(proving_inputs)) - .await - .inspect(|_| { - if let Err(err) = this.chain_tip_sender.send(block_num) { - error!("Failed to send chain tip: {:?}", err); - } - }) - .map_err(|err| { - span.set_error(&err); - let code = match err { - ApplyBlockError::InvalidBlockError(_) => tonic::Code::InvalidArgument, - _ => tonic::Code::Internal, - }; - Status::new(code, err.as_report()) - }) - } - .in_current_span(), - ) - .await - .map_err(|err| { - tonic::Status::internal(err.as_report_context("joining apply_block task failed")) - }) - .flatten()?; + let block_num = header.block_num(); + let signed_block = SignedBlock::new(header, body, signature) + .map_err(|err| Status::new(tonic::Code::Internal, err.as_report()))?; + + // Apply the block. + self.write_handle + .apply_block(signed_block, Some(proving_inputs)) + .await + .inspect(|_| { + if let Err(err) = self.chain_tip_sender.send(block_num) { + error!("Failed to send chain tip: {:?}", err); + } + }) + .map_err(|err| { + span.set_error(&err); + let code = match err { + ApplyBlockError::InvalidBlockError(_) => tonic::Code::InvalidArgument, + _ => tonic::Code::Internal, + }; + Status::new(code, err.as_report()) + })?; + Ok(Response::new(())) } @@ -201,14 +184,14 @@ impl block_producer_server::BlockProducer for StoreApi { let unauthenticated_note_commitments = validate_note_commitments(&request.unauthenticated_notes)?; - let tx_inputs = self + let result = self .state .get_transaction_inputs(account_id, &nullifiers, unauthenticated_note_commitments) .await .inspect_err(|err| tracing::Span::current().set_error(err)) .map_err(|err| tonic::Status::internal(err.as_report()))?; - - let block_height = self.state.chain_tip(Finality::Committed).await.as_u32(); + let block_height = result.chain_tip(); + let tx_inputs = result.into_inner(); Ok(Response::new(proto::store::TransactionInputs { account_state: Some(proto::store::transaction_inputs::AccountTransactionInputRecord { @@ -231,7 +214,7 @@ impl block_producer_server::BlockProducer for StoreApi { .map(Into::into) .collect(), new_account_id_prefix_is_unique: tx_inputs.new_account_id_prefix_is_unique, - block_height, + block_height: block_height.as_u32(), })) } } diff --git a/crates/store/src/server/mod.rs b/crates/store/src/server/mod.rs index bcd8689be..884fc2ac4 100644 --- a/crates/store/src/server/mod.rs +++ b/crates/store/src/server/mod.rs @@ -93,7 +93,7 @@ impl Store { // Load initial state. let (termination_ask, mut termination_signal) = tokio::sync::mpsc::channel::(1); - let (state, tx_proven_tip) = + let (state, write_handle, tx_proven_tip) = State::load(&self.data_directory, self.storage_options, termination_ask) .await .context("failed to load state")?; @@ -104,12 +104,12 @@ impl Store { self.block_prover_url, self.max_concurrent_proofs, tx_proven_tip, - ) - .await; + ); // Spawn gRPC Servers. let mut join_set = Self::spawn_grpc_servers( state, + write_handle, chain_tip_sender, self.grpc_options, self.rpc_listener, @@ -140,7 +140,7 @@ impl Store { /// /// Returns the scheduler task handle and the chain tip sender (needed by gRPC services to /// notify the scheduler of new blocks). - async fn spawn_proof_scheduler( + fn spawn_proof_scheduler( state: &State, block_prover_url: Option, max_concurrent_proofs: NonZeroUsize, @@ -155,7 +155,7 @@ impl Store { Arc::new(BlockProver::local()) }; - let chain_tip = state.chain_tip(crate::state::Finality::Committed).await; + let chain_tip = state.chain_tip(crate::state::Finality::Committed); let (chain_tip_tx, chain_tip_rx) = watch::channel(chain_tip); let handle = proof_scheduler::spawn( @@ -172,25 +172,28 @@ impl Store { /// Spawns the gRPC servers and the DB maintenance background task. fn spawn_grpc_servers( - state: State, + state: Arc, + write_handle: crate::state::writer::WriteHandle, chain_tip_sender: watch::Sender, grpc_options: GrpcOptionsInternal, rpc_listener: TcpListener, ntx_builder_listener: TcpListener, block_producer_listener: TcpListener, ) -> anyhow::Result>> { - let state = Arc::new(state); let rpc_service = store::rpc_server::RpcServer::new(api::StoreApi { state: Arc::clone(&state), + write_handle: write_handle.clone(), chain_tip_sender: chain_tip_sender.clone(), }); let ntx_builder_service = store::ntx_builder_server::NtxBuilderServer::new(api::StoreApi { state: Arc::clone(&state), + write_handle: write_handle.clone(), chain_tip_sender: chain_tip_sender.clone(), }); let block_producer_service = store::block_producer_server::BlockProducerServer::new(api::StoreApi { state: Arc::clone(&state), + write_handle, chain_tip_sender, }); let reflection_service = tonic_reflection::server::Builder::configure() diff --git a/crates/store/src/server/ntx_builder.rs b/crates/store/src/server/ntx_builder.rs index ff1b6eb33..ffc888088 100644 --- a/crates/store/src/server/ntx_builder.rs +++ b/crates/store/src/server/ntx_builder.rs @@ -103,8 +103,6 @@ impl ntx_builder_server::NtxBuilder for StoreApi { request.account_id, )?; - let state = self.state.clone(); - let size = NonZero::try_from(request.page_size as usize).map_err(|err: TryFromIntError| { invalid_argument(err.as_report_context("invalid page_size")) @@ -112,7 +110,8 @@ impl ntx_builder_server::NtxBuilder for StoreApi { let page = Page { token: request.page_token, size }; // TODO: no need to get the whole NoteRecord here, a NetworkNote wrapper should be created // instead - let (notes, next_page) = state + let (notes, next_page) = self + .state .get_unconsumed_network_notes_for_account(account_id, block_num, page) .await .map_err(internal_error)?; @@ -146,22 +145,18 @@ impl ntx_builder_server::NtxBuilder for StoreApi { ) -> Result, Status> { let request = request.into_inner(); - let mut chain_tip = self.state.chain_tip(Finality::Committed).await; + let chain_tip = self.state.chain_tip(Finality::Committed); let block_range = read_block_range::(Some(request), "GetNetworkAccountIds")? .into_inclusive_range::(&chain_tip)?; - let (account_ids, mut last_block_included) = + let result = self.state.get_all_network_accounts(block_range).await.map_err(internal_error)?; + let chain_tip = result.chain_tip(); + let (account_ids, last_block_included) = result.into_inner(); let account_ids = Vec::from_iter(account_ids.into_iter().map(Into::into)); - if last_block_included > chain_tip { - last_block_included = chain_tip; - } - - chain_tip = self.state.chain_tip(Finality::Committed).await; - Ok(Response::new(proto::store::NetworkAccountIdList { account_ids, pagination_info: Some(proto::rpc::PaginationInfo { @@ -252,14 +247,13 @@ impl ntx_builder_server::NtxBuilder for StoreApi { let block_num = if let Some(num) = request.block_num { num.into() } else { - self.state.chain_tip(Finality::Committed).await + self.state.chain_tip(Finality::Committed) }; // Retrieve the asset witnesses. let asset_witnesses = self .state .get_vault_asset_witnesses(account_id, block_num, vault_keys) - .await .map_err(internal_error)?; // Convert AssetWitness to protobuf format by extracting witness data. @@ -306,14 +300,13 @@ impl ntx_builder_server::NtxBuilder for StoreApi { let block_num = if let Some(num) = request.block_num { num.into() } else { - self.state.chain_tip(Finality::Committed).await + self.state.chain_tip(Finality::Committed) }; // Retrieve the storage map witness. let storage_witness = self .state .get_storage_map_witness(account_id, &slot_name, block_num, map_key) - .await .map_err(internal_error)?; // Convert StorageMapWitness to protobuf format by extracting witness data. @@ -323,7 +316,7 @@ impl ntx_builder_server::NtxBuilder for StoreApi { key: Some(map_key.into()), proof: Some(proof.into()), }), - block_num: self.state.chain_tip(Finality::Committed).await.as_u32(), + block_num: block_num.as_u32(), })) } } diff --git a/crates/store/src/server/rpc_api.rs b/crates/store/src/server/rpc_api.rs index 87055936f..a4d371636 100644 --- a/crates/store/src/server/rpc_api.rs +++ b/crates/store/src/server/rpc_api.rs @@ -1,5 +1,5 @@ use miden_node_proto::convert; -use miden_node_proto::domain::block::SyncTarget; +use miden_node_proto::domain::block::{InvalidBlockRange, SyncTarget}; use miden_node_proto::generated::store::rpc_server; use miden_node_proto::generated::{self as proto}; use miden_node_utils::limiter::{ @@ -94,16 +94,18 @@ impl rpc_server::Rpc for StoreApi { return Err(SyncNullifiersError::InvalidPrefixLength(request.prefix_len).into()); } - let chain_tip = self.state.chain_tip(Finality::Committed).await; + let chain_tip = self.state.chain_tip(Finality::Committed); let block_range = read_block_range::(request.block_range, "SyncNullifiersRequest")? .into_inclusive_range::(&chain_tip)?; - let (nullifiers, block_num) = self + let result = self .state .sync_nullifiers(request.prefix_len, request.nullifiers, block_range) .await .map_err(SyncNullifiersError::from)?; + let chain_tip = result.chain_tip(); + let (nullifiers, block_num) = result.into_inner(); let nullifiers = nullifiers .into_iter() @@ -129,19 +131,17 @@ impl rpc_server::Rpc for StoreApi { ) -> Result, Status> { let request = request.into_inner(); - let chain_tip = self.state.chain_tip(Finality::Committed).await; + let chain_tip = self.state.chain_tip(Finality::Committed); let block_range = read_block_range::(request.block_range, "SyncNotesRequest")? .into_inclusive_range::(&chain_tip)?; - if *block_range.end() > chain_tip { - Err(NoteSyncError::FutureBlock { chain_tip, block_to: *block_range.end() })?; - } // Validate note tags count check::(request.note_tags.len())?; - let (results, last_block_checked) = - self.state.sync_notes(request.note_tags, block_range).await?; + let result = self.state.sync_notes(request.note_tags, block_range).await?; + let chain_tip = result.chain_tip(); + let (results, last_block_checked) = result.into_inner(); let blocks = results .into_iter() @@ -180,15 +180,20 @@ impl rpc_server::Rpc for StoreApi { let block_to = match sync_target { SyncTarget::BlockNumber(block_num) => { - block_num.min(self.state.chain_tip(Finality::Committed).await) + block_num.min(self.state.chain_tip(Finality::Committed)) }, - SyncTarget::CommittedChainTip => self.state.chain_tip(Finality::Committed).await, - SyncTarget::ProvenChainTip => self.state.chain_tip(Finality::Proven).await, + SyncTarget::CommittedChainTip => self.state.chain_tip(Finality::Committed), + SyncTarget::ProvenChainTip => self.state.chain_tip(Finality::Proven), }; + // Check range sanity. if block_from > block_to { - Err(SyncChainMmrError::FutureBlock { chain_tip: block_to, block_from })?; + Err(SyncChainMmrError::InvalidBlockRange(InvalidBlockRange::StartGreaterThanEnd { + start: block_from, + end: block_to, + }))?; } + let block_range = block_from..=block_to; let (mmr_delta, block_header) = self.state.sync_chain_mmr(block_range.clone()).await.map_err(internal_error)?; @@ -274,7 +279,7 @@ impl rpc_server::Rpc for StoreApi { request: Request, ) -> Result, Status> { let request = request.into_inner(); - let chain_tip = self.state.chain_tip(Finality::Committed).await; + let chain_tip = self.state.chain_tip(Finality::Committed); let account_id: AccountId = read_account_id::< proto::rpc::SyncAccountVaultRequest, @@ -291,11 +296,13 @@ impl rpc_server::Rpc for StoreApi { )? .into_inclusive_range::(&chain_tip)?; - let (last_included_block, updates) = self + let result = self .state .sync_account_vault(account_id, block_range) .await .map_err(SyncAccountVaultError::from)?; + let chain_tip = result.chain_tip(); + let (last_included_block, updates) = result.into_inner(); let updates = updates .into_iter() @@ -336,18 +343,20 @@ impl rpc_server::Rpc for StoreApi { Err(SyncAccountStorageMapsError::AccountNotPublic(account_id))?; } - let chain_tip = self.state.chain_tip(Finality::Committed).await; + let chain_tip = self.state.chain_tip(Finality::Committed); let block_range = read_block_range::( request.block_range, "SyncAccountStorageMapsRequest", )? .into_inclusive_range::(&chain_tip)?; - let storage_maps_page = self + let result = self .state .sync_account_storage_maps(account_id, block_range) .await .map_err(SyncAccountStorageMapsError::from)?; + let chain_tip = result.chain_tip(); + let storage_maps_page = result.into_inner(); let updates = storage_maps_page .values @@ -376,7 +385,7 @@ impl rpc_server::Rpc for StoreApi { Ok(Response::new(proto::rpc::StoreStatus { version: env!("CARGO_PKG_VERSION").to_string(), status: "connected".to_string(), - chain_tip: self.state.chain_tip(Finality::Committed).await.as_u32(), + chain_tip: self.state.chain_tip(Finality::Committed).as_u32(), })) } @@ -408,7 +417,7 @@ impl rpc_server::Rpc for StoreApi { let request = request.into_inner(); - let chain_tip = self.state.chain_tip(Finality::Committed).await; + let chain_tip = self.state.chain_tip(Finality::Committed); let block_range = read_block_range::( request.block_range, "SyncTransactionsRequest", @@ -421,11 +430,13 @@ impl rpc_server::Rpc for StoreApi { // Validate account IDs count check::(account_ids.len())?; - let (last_block_included, transaction_records_db) = self + let result = self .state .sync_transactions(account_ids, block_range.clone()) .await .map_err(SyncTransactionsError::from)?; + let chain_tip = result.chain_tip(); + let (last_block_included, transaction_records_db) = result.into_inner(); // Convert database TransactionRecords directly to proto TransactionRecords. // All data needed for the proto TransactionHeader is stored in the transactions table. diff --git a/crates/store/src/state/apply_block.rs b/crates/store/src/state/apply_block.rs deleted file mode 100644 index d8aab21e7..000000000 --- a/crates/store/src/state/apply_block.rs +++ /dev/null @@ -1,301 +0,0 @@ -use std::sync::Arc; - -use miden_node_proto::BlockProofRequest; -use miden_node_utils::ErrorReport; -use miden_protocol::account::delta::AccountUpdateDetails; -use miden_protocol::block::SignedBlock; -use miden_protocol::note::NoteDetails; -use miden_protocol::transaction::OutputNote; -use miden_protocol::utils::serde::Serializable; -use tokio::sync::oneshot; -use tracing::{Instrument, info, info_span, instrument}; - -use crate::db::NoteRecord; -use crate::errors::{ApplyBlockError, InvalidBlockError}; -use crate::state::State; -use crate::{COMPONENT, HistoricalError}; - -impl State { - /// Apply changes of a new block to the DB and in-memory data structures. - /// - /// ## Note on state consistency - /// - /// The server contains in-memory representations of the existing trees, the in-memory - /// representation must be kept consistent with the committed data, this is necessary so to - /// provide consistent results for all endpoints. In order to achieve consistency, the - /// following steps are used: - /// - /// - the request data is validated, prior to starting any modifications. - /// - block is being saved into the store in parallel with updating the DB, but before - /// committing. This block is considered as candidate and not yet available for reading - /// because the latest block pointer is not updated yet. - /// - a transaction is open in the DB and the writes are started. - /// - while the transaction is not committed, concurrent reads are allowed, both the DB and the - /// in-memory representations, which are consistent at this stage. - /// - prior to committing the changes to the DB, an exclusive lock to the in-memory data is - /// acquired, preventing concurrent reads to the in-memory data, since that will be - /// out-of-sync w.r.t. the DB. - /// - the DB transaction is committed, and requests that read only from the DB can proceed to - /// use the fresh data. - /// - the in-memory structures are updated, including the latest block pointer and the lock is - /// released. - /// - /// # Errors - /// - /// Returns an error if `proving_inputs` is `None` and the block is not the genesis block. - // TODO: This span is logged in a root span, we should connect it to the parent span. - #[expect(clippy::too_many_lines)] - #[instrument(target = COMPONENT, skip_all, err)] - pub async fn apply_block( - &self, - signed_block: SignedBlock, - proving_inputs: Option, - ) -> Result<(), ApplyBlockError> { - let _lock = self.writer.try_lock().map_err(|_| ApplyBlockError::ConcurrentWrite)?; - - let header = signed_block.header(); - let body = signed_block.body(); - - // Validate that header and body match. - let tx_commitment = body.transactions().commitment(); - if header.tx_commitment() != tx_commitment { - return Err(InvalidBlockError::InvalidBlockTxCommitment { - expected: tx_commitment, - actual: header.tx_commitment(), - } - .into()); - } - - let block_num = header.block_num(); - let block_commitment = header.commitment(); - - // Validate that the applied block is the next block in sequence. - let prev_block = self - .db - .select_block_header_by_block_num(None) - .await? - .ok_or(ApplyBlockError::DbBlockHeaderEmpty)?; - let expected_block_num = prev_block.block_num().child(); - if block_num != expected_block_num { - return Err(InvalidBlockError::NewBlockInvalidBlockNum { - expected: expected_block_num, - submitted: block_num, - } - .into()); - } - if header.prev_block_commitment() != prev_block.commitment() { - return Err(InvalidBlockError::NewBlockInvalidPrevCommitment.into()); - } - - // Save the block to the block store. In a case of a rolled-back DB transaction, the - // in-memory state will be unchanged, but the block might still be written into the - // block store. Thus, such block should be considered as block candidates, but not - // finalized blocks. So we should check for the latest block when getting block from - // the store. - let signed_block_bytes = signed_block.to_bytes(); - let store = Arc::clone(&self.block_store); - let block_save_task = tokio::spawn( - async move { store.save_block(block_num, &signed_block_bytes).await }.in_current_span(), - ); - - // Scope to read in-memory data, compute mutations required for updating account - // and nullifier trees, and validate the request. - let ( - nullifier_tree_old_root, - nullifier_tree_update, - account_tree_old_root, - account_tree_update, - ) = { - let inner = self.inner.read().await; - - let _span = info_span!(target: COMPONENT, "update_in_memory_structs").entered(); - - // nullifiers can be produced only once - let duplicate_nullifiers: Vec<_> = body - .created_nullifiers() - .iter() - .filter(|&nullifier| inner.nullifier_tree.get_block_num(nullifier).is_some()) - .copied() - .collect(); - if !duplicate_nullifiers.is_empty() { - return Err(InvalidBlockError::DuplicatedNullifiers(duplicate_nullifiers).into()); - } - - // compute updates for the in-memory data structures - - // new_block.chain_root must be equal to the chain MMR root prior to the update - let peaks = inner.blockchain.peaks(); - if peaks.hash_peaks() != header.chain_commitment() { - return Err(InvalidBlockError::NewBlockInvalidChainCommitment.into()); - } - - // compute update for nullifier tree - let nullifier_tree_update = inner - .nullifier_tree - .compute_mutations( - body.created_nullifiers().iter().map(|nullifier| (*nullifier, block_num)), - ) - .map_err(InvalidBlockError::NewBlockNullifierAlreadySpent)?; - - if nullifier_tree_update.as_mutation_set().root() != header.nullifier_root() { - // We do our best here to notify the serve routine, if it doesn't care (dropped the - // receiver) we can't do much. - let _ = self.termination_ask.try_send(ApplyBlockError::InvalidBlockError( - InvalidBlockError::NewBlockInvalidNullifierRoot, - )); - return Err(InvalidBlockError::NewBlockInvalidNullifierRoot.into()); - } - - // compute update for account tree - let account_tree_update = inner - .account_tree - .compute_mutations( - body.updated_accounts() - .iter() - .map(|update| (update.account_id(), update.final_state_commitment())), - ) - .map_err(|e| match e { - HistoricalError::AccountTreeError(err) => { - InvalidBlockError::NewBlockDuplicateAccountIdPrefix(err) - }, - HistoricalError::MerkleError(_) => { - panic!("Unexpected MerkleError during account tree mutation computation") - }, - })?; - - if account_tree_update.as_mutation_set().root() != header.account_root() { - let _ = self.termination_ask.try_send(ApplyBlockError::InvalidBlockError( - InvalidBlockError::NewBlockInvalidAccountRoot, - )); - return Err(InvalidBlockError::NewBlockInvalidAccountRoot.into()); - } - - ( - inner.nullifier_tree.root(), - nullifier_tree_update, - inner.account_tree.root_latest(), - account_tree_update, - ) - }; - - // Build note tree. - let note_tree = body.compute_block_note_tree(); - if note_tree.root() != header.note_root() { - return Err(InvalidBlockError::NewBlockInvalidNoteRoot.into()); - } - - let notes = body - .output_notes() - .map(|(note_index, note)| { - let (details, nullifier) = match note { - OutputNote::Public(note) => { - (Some(NoteDetails::from(note.as_note())), Some(note.as_note().nullifier())) - }, - OutputNote::Private(_) => (None, None), - }; - - let inclusion_path = note_tree.open(note_index); - - let note_record = NoteRecord { - block_num, - note_index, - note_id: note.id().as_word(), - note_commitment: note.to_commitment(), - metadata: note.metadata().clone(), - details, - inclusion_path, - }; - - Ok((note_record, nullifier)) - }) - .collect::, InvalidBlockError>>()?; - - // Signals the transaction is ready to be committed, and the write lock can be acquired. - let (allow_acquire, acquired_allowed) = oneshot::channel::<()>(); - // Signals the write lock has been acquired, and the transaction can be committed. - let (inform_acquire_done, acquire_done) = oneshot::channel::<()>(); - - // Extract public account updates with deltas before block is moved into async task. - // Private accounts are filtered out since they don't expose their state changes. - let account_deltas = - Vec::from_iter(body.updated_accounts().iter().filter_map( - |update| match update.details() { - AccountUpdateDetails::Delta(delta) => Some(delta.clone()), - AccountUpdateDetails::Private => None, - }, - )); - - // The DB and in-memory state updates need to be synchronized and are partially - // overlapping. Namely, the DB transaction only proceeds after this task acquires the - // in-memory write lock. This requires the DB update to run concurrently, so a new task is - // spawned. - let db = Arc::clone(&self.db); - let db_update_task = tokio::spawn( - async move { - db.apply_block(allow_acquire, acquire_done, signed_block, notes, proving_inputs) - .await - } - .in_current_span(), - ); - - // Wait for the message from the DB update task, that we ready to commit the DB transaction. - acquired_allowed.await.map_err(ApplyBlockError::ClosedChannel)?; - - // Awaiting the block saving task to complete without errors. - block_save_task.await??; - - // Scope to update the in-memory data. - async move { - // We need to hold the write lock here to prevent inconsistency between the in-memory - // state and the DB state. Thus, we need to wait for the DB update task to complete - // successfully. - let mut inner = self.inner.write().await; - - // We need to check that neither the nullifier tree nor the account tree have changed - // while we were waiting for the DB preparation task to complete. If either of them - // did change, we do not proceed with in-memory and database updates, since it may - // lead to an inconsistent state. - if inner.nullifier_tree.root() != nullifier_tree_old_root - || inner.account_tree.root_latest() != account_tree_old_root - { - return Err(ApplyBlockError::ConcurrentWrite); - } - - // Notify the DB update task that the write lock has been acquired, so it can commit - // the DB transaction. - inform_acquire_done - .send(()) - .map_err(|_| ApplyBlockError::DbUpdateTaskFailed("Receiver was dropped".into()))?; - - // TODO: shutdown #91 - // Await for successful commit of the DB transaction. If the commit fails, we mustn't - // change in-memory state, so we return a block applying error and don't proceed with - // in-memory updates. - db_update_task - .await? - .map_err(|err| ApplyBlockError::DbUpdateTaskFailed(err.as_report()))?; - - // Update the in-memory data structures after successful commit of the DB transaction - inner - .nullifier_tree - .apply_mutations(nullifier_tree_update) - .expect("Unreachable: old nullifier tree root must be checked before this step"); - inner - .account_tree - .apply_mutations(account_tree_update) - .expect("Unreachable: old account tree root must be checked before this step"); - - inner.blockchain.push(block_commitment); - - Ok(()) - } - .in_current_span() - .await?; - - self.forest.write().await.apply_block_updates(block_num, account_deltas)?; - - info!(%block_commitment, block_num = block_num.as_u32(), COMPONENT, "apply_block successful"); - - Ok(()) - } -} diff --git a/crates/store/src/state/loader.rs b/crates/store/src/state/loader.rs index 181578d6d..9439640f0 100644 --- a/crates/store/src/state/loader.rs +++ b/crates/store/src/state/loader.rs @@ -14,8 +14,7 @@ use std::path::Path; use miden_crypto::merkle::mmr::Mmr; #[cfg(feature = "rocksdb")] -use miden_large_smt_backend_rocksdb::RocksDbStorage; -#[cfg(feature = "rocksdb")] +use miden_large_smt_backend_rocksdb::{RocksDbStorage, SmtStorageReader}; use miden_node_utils::clap::RocksDbOptions; use miden_protocol::block::account_tree::{AccountIdKey, AccountTree}; use miden_protocol::block::nullifier_tree::NullifierTree; @@ -58,12 +57,21 @@ const PUBLIC_ACCOUNT_IDS_PAGE_SIZE: NonZeroUsize = NonZeroUsize::new(1_000).unwr // STORAGE TYPE ALIAS // ================================================================================================ -/// The storage backend for trees. +/// The writable storage backend for trees. #[cfg(feature = "rocksdb")] pub type TreeStorage = RocksDbStorage; #[cfg(not(feature = "rocksdb"))] pub type TreeStorage = MemoryStorage; +/// The read-only storage backend used by `InMemoryState` for lock-free reads. +/// +/// With `rocksdb`, this is a snapshot-backed read-only storage (`RocksDbSnapshotStorage`). +/// Without `rocksdb`, this is the same as `TreeStorage` (in-memory, already `Clone`). +#[cfg(feature = "rocksdb")] +pub type SnapshotTreeStorage = miden_large_smt_backend_rocksdb::RocksDbSnapshotStorage; +#[cfg(not(feature = "rocksdb"))] +pub type SnapshotTreeStorage = MemoryStorage; + // ERROR CONVERSION // ================================================================================================ @@ -341,7 +349,7 @@ impl StorageLoader for RocksDbStorage { /// Loads an SMT from persistent storage. #[cfg(feature = "rocksdb")] -pub fn load_smt(storage: S) -> Result, StateInitializationError> { +pub fn load_smt(storage: S) -> Result, StateInitializationError> { LargeSmt::load(storage).map_err(account_tree_large_smt_error_to_init_error) } diff --git a/crates/store/src/state/mod.rs b/crates/store/src/state/mod.rs index 6c714cfea..e6e7aa999 100644 --- a/crates/store/src/state/mod.rs +++ b/crates/store/src/state/mod.rs @@ -1,13 +1,20 @@ -//! Abstraction to synchronize state modifications. +//! State management for the Miden store. //! -//! The [State] provides data access and modifications methods, its main purpose is to ensure that -//! data is atomically written, and that reads are consistent. +//! The [State] provides data access and modification methods. A single writer task, serialized by +//! a channel, applies block mutations. All reader-visible state (trees, blockchain MMR, forest) is +//! held in an [`Arc`] behind an [`ArcSwap`](arc_swap::ArcSwap), providing wait-free +//! reads with no lock contention. +//! +//! Readers obtain an `Arc` via [`State::snapshot()`] (wait-free, no locks). +//! The writer applies mutations to its own writable trees (owned directly, no locks), then builds +//! a new `InMemoryState` with snapshot-backed read-only copies and atomically swaps the pointer. use std::collections::{BTreeMap, BTreeSet, HashSet}; use std::ops::RangeInclusive; use std::path::Path; use std::sync::Arc; +use arc_swap::ArcSwap; use miden_node_proto::domain::account::{ AccountDetailRequest, AccountDetails, @@ -31,11 +38,11 @@ use miden_protocol::block::account_tree::AccountWitness; use miden_protocol::block::nullifier_tree::{NullifierTree, NullifierWitness}; use miden_protocol::block::{BlockHeader, BlockInputs, BlockNumber, Blockchain}; use miden_protocol::crypto::merkle::mmr::{MmrPeaks, MmrProof, PartialMmr}; -use miden_protocol::crypto::merkle::smt::{LargeSmt, SmtProof, SmtStorage}; +use miden_protocol::crypto::merkle::smt::{LargeSmt, SmtProof}; use miden_protocol::note::{NoteId, NoteScript, Nullifier}; use miden_protocol::transaction::PartialBlockchain; -use tokio::sync::{Mutex, RwLock}; -use tracing::{Instrument, info, instrument}; +use tokio::sync::mpsc; +use tracing::{info, instrument}; use crate::account_state_forest::{AccountStateForest, WitnessError}; use crate::accounts::AccountTreeWithHistory; @@ -60,6 +67,7 @@ mod loader; use loader::{ ACCOUNT_TREE_STORAGE_DIR, NULLIFIER_TREE_STORAGE_DIR, + SnapshotTreeStorage, StorageLoader, TreeStorage, load_mmr, @@ -67,8 +75,8 @@ use loader::{ verify_tree_consistency, }; -mod apply_block; mod sync_state; +pub(crate) mod writer; // FINALITY // ================================================================================================ @@ -93,51 +101,117 @@ pub struct TransactionInputs { pub new_account_id_prefix_is_unique: Option, } -/// Container for state that needs to be updated atomically. -struct InnerState -where - S: SmtStorage, -{ - nullifier_tree: NullifierTree>, - blockchain: Blockchain, - account_tree: AccountTreeWithHistory, +// SCOPED RESULT +// ================================================================================================ + +/// A query result scoped to a specific chain tip. +/// +/// Wraps an inner value `T` with the [`BlockNumber`] of the snapshot that was used to produce it. +/// This ensures callers always know which block the data corresponds to. +#[derive(Debug)] +pub struct Scoped { + /// The chain tip at the time the query was executed. + chain_tip: BlockNumber, + /// The query result. + inner: T, } -impl InnerState { - /// Returns the latest block number. - fn latest_block_num(&self) -> BlockNumber { - self.blockchain - .chain_tip() - .expect("chain should always have at least the genesis block") +impl Scoped { + /// Creates a new scoped result. + pub fn new(chain_tip: BlockNumber, inner: T) -> Self { + Self { chain_tip, inner } + } + + /// Returns the chain tip at the time the query was executed. + pub fn chain_tip(&self) -> BlockNumber { + self.chain_tip + } + + /// Consumes the scoped type and returns the inner value. + pub fn into_inner(self) -> T { + self.inner } } +// IN-MEMORY STATE +// ================================================================================================ + +/// A consistent, immutable snapshot of all in-memory state at a given block. +/// +/// Held behind an [`ArcSwap`] in [`State`]. +/// +/// ## Performance +/// +/// - **Readers** obtain an `Arc` via [`State::snapshot()`], which calls +/// `ArcSwap::load_full()` — a wait-free atomic refcount bump with no data cloning. The returned +/// `Arc` is a frozen view: even if the writer swaps in a new state, readers continue to see their +/// snapshot unchanged until they drop the `Arc`. +/// +/// - **Writer** (once per block) deep-clones this struct via `InMemoryState::clone()` to produce a +/// mutable copy, applies mutations, and atomically swaps the pointer via `ArcSwap::store()`. This +/// is the only place where a deep clone occurs. +#[derive(Clone)] +pub(crate) struct InMemoryState { + /// The committed block number for this snapshot. + pub block_num: BlockNumber, + /// Nullifier tree (read-only, snapshot-backed). + pub nullifier_tree: NullifierTree>, + /// Account tree with historical overlay support (read-only, snapshot-backed). + pub account_tree: AccountTreeWithHistory, + /// Chain MMR (Merkle Mountain Range of block commitments). + pub blockchain: Blockchain, + /// Forest state for account storage maps and vault witnesses. + pub forest: AccountStateForest, +} + // CHAIN STATE // ================================================================================================ /// The rollup state. +/// +/// State is comprised of three data sets: +/// +/// 1. **In-memory** ([`InMemoryState`]): nullifier tree, account tree, blockchain MMR, and account +/// state forest. Held behind an [`ArcSwap`] for wait-free reads. +/// 2. **SQLite**: block headers, notes, nullifiers, accounts, transactions, and other relational +/// data. +/// 3. **File-based** ([`BlockStore`]): serialized blocks and proofs stored on disk. +/// +/// A single writer task (serialized by a channel) mutates all three data sets. The writer owns +/// writable copies of the in-memory trees directly (passed as owned values to +/// [`writer::BlockWriter::run`]) and creates snapshot-backed read-only copies for +/// [`InMemoryState`] after each block. The writer commits to SQLite and the block store *before* +/// swapping the in-memory pointer, so there is a window where the DB/files are ahead of the +/// in-memory state. +/// +/// ## Consistency rules for reader methods +/// +/// Any method that combines in-memory and SQLite data **must** take a snapshot and use its +/// `block_num` to scope all DB queries. This ensures the two data sets are consistent even +/// during the window described above. Concretely, such a method must either: +/// +/// - Reject requests where the caller-supplied block number exceeds the snapshot's chain tip, or +/// - Inherently limit its DB query scope to `<= snapshot.block_num`. +/// +/// Methods that operate purely on SQLite or file-based data (e.g. loading a block by number, +/// querying account details by a caller-supplied block number that was already validated) are +/// free to access those stores directly without taking a snapshot. pub struct State { /// The database which stores block headers, nullifiers, notes, and the latest states of /// accounts. - db: Arc, + pub(super) db: Arc, /// The block store which stores full block contents for all blocks. - block_store: Arc, + pub(super) block_store: Arc, - /// Read-write lock used to prevent writing to a structure while it is being used. + /// All in-memory state held atomically behind an `ArcSwap`. /// - /// The lock is writer-preferring, meaning the writer won't be starved. - inner: RwLock>, - - /// Forest-related state `(SmtForest, storage_map_roots, vault_roots)` with its own lock. - forest: RwLock, - - /// To allow readers to access the tree data while an update in being performed, and prevent - /// TOCTOU issues, there must be no concurrent writers. This locks to serialize the writers. - writer: Mutex<()>, - - /// Request termination of the process due to a fatal internal state error. - termination_ask: tokio::sync::mpsc::Sender, + /// Readers call `snapshot()` which returns `Arc` via a wait-free atomic + /// refcount bump — no data cloning. The writer builds a new `InMemoryState` with + /// snapshot-backed trees after each block and atomically swaps via `ArcSwap::store()`. + /// + /// Wrapped in `Arc` so the writer context can share the same `ArcSwap` instance. + pub(super) in_memory: Arc>, /// The latest proven-in-sequence block number, updated by the proof scheduler. proven_tip: ProvenTipReader, @@ -148,12 +222,16 @@ impl State { // -------------------------------------------------------------------------------------------- /// Loads the state from the data directory. + /// + /// Returns `(Arc, WriteHandle, ProvenTipWriter)`. The `WriteHandle` is the only way + /// to submit blocks to the writer loop. The writer task is spawned internally; dropping the + /// `WriteHandle` closes the channel and terminates the writer task. #[instrument(target = COMPONENT, skip_all)] pub async fn load( data_path: &Path, storage_options: StorageOptions, termination_ask: tokio::sync::mpsc::Sender, - ) -> Result<(Self, ProvenTipWriter), StateInitializationError> { + ) -> Result<(Arc, writer::WriteHandle, ProvenTipWriter), StateInitializationError> { let data_directory = DataDirectory::load(data_path.to_path_buf()) .map_err(StateInitializationError::DataDirectoryLoadError)?; @@ -170,38 +248,82 @@ impl State { let blockchain = load_mmr(&mut db).await?; let latest_block_num = blockchain.chain_tip().unwrap_or(BlockNumber::GENESIS); + let account_storage = TreeStorage::create( + data_path, + &storage_options.account_tree.into(), + ACCOUNT_TREE_STORAGE_DIR, + )?; + + // Grab the DB handle before loading (needed for creating snapshots). #[cfg(feature = "rocksdb")] - let account_storage_config = storage_options.account_tree.into(); - #[cfg(not(feature = "rocksdb"))] - let account_storage_config = { - let _ = &storage_options; - () - }; - let account_storage = - TreeStorage::create(data_path, &account_storage_config, ACCOUNT_TREE_STORAGE_DIR)?; + let account_db = std::sync::Arc::clone(account_storage.db()); + let account_tree = account_storage.load_account_tree(&mut db).await?; + let nullifier_storage = TreeStorage::create( + data_path, + &storage_options.nullifier_tree.into(), + NULLIFIER_TREE_STORAGE_DIR, + )?; + + // Grab the DB handle before loading (needed for creating snapshots). #[cfg(feature = "rocksdb")] - let nullifier_storage_config = storage_options.nullifier_tree.into(); - #[cfg(not(feature = "rocksdb"))] - let nullifier_storage_config = (); - let nullifier_storage = - TreeStorage::create(data_path, &nullifier_storage_config, NULLIFIER_TREE_STORAGE_DIR)?; + let nullifier_db = std::sync::Arc::clone(nullifier_storage.db()); + let nullifier_tree = nullifier_storage.load_nullifier_tree(&mut db).await?; // Verify that tree roots match the expected roots from the database. - // This catches any divergence between persistent storage and the database caused by - // corruption or incomplete shutdown. verify_tree_consistency(account_tree.root(), nullifier_tree.root(), &mut db).await?; - let account_tree = AccountTreeWithHistory::new(account_tree, latest_block_num); + // Create the writable account tree with history (owned by the writer). + let account_tree_with_history = AccountTreeWithHistory::new(account_tree, latest_block_num); + + // Create a snapshot-backed read-only account tree for InMemoryState. + let snapshot_account_tree = { + #[cfg(feature = "rocksdb")] + { + use miden_large_smt_backend_rocksdb::RocksDbSnapshotStorage; + + let snapshot_storage = RocksDbSnapshotStorage::new(Arc::clone(&account_db)); + let snapshot_smt = loader::load_smt(snapshot_storage) + .map_err(|e| StateInitializationError::AccountTreeIoError(e.to_string()))?; + // SAFETY: The snapshot reads from the same DB that the writable tree + // was just loaded and validated from. No need to re-validate. + let snapshot_tree = + miden_protocol::block::account_tree::AccountTree::new_unchecked(snapshot_smt); + AccountTreeWithHistory::from_parts( + snapshot_tree, + account_tree_with_history.block_number_latest(), + account_tree_with_history.overlays().clone(), + ) + } + #[cfg(not(feature = "rocksdb"))] + { + // In memory mode, the trees are the same type, just clone. + account_tree_with_history.clone() + } + }; - let forest = load_smt_forest(&mut db, latest_block_num).await?; + // Create a snapshot-backed read-only nullifier tree for InMemoryState. + let snapshot_nullifier_tree = { + #[cfg(feature = "rocksdb")] + { + use miden_large_smt_backend_rocksdb::RocksDbSnapshotStorage; + + let snapshot_storage = + RocksDbSnapshotStorage::new(std::sync::Arc::clone(&nullifier_db)); + let snapshot_smt = loader::load_smt(snapshot_storage) + .map_err(|e| StateInitializationError::NullifierTreeIoError(e.to_string()))?; + NullifierTree::new_unchecked(snapshot_smt) + } + #[cfg(not(feature = "rocksdb"))] + { + nullifier_tree.clone() + } + }; - let inner = RwLock::new(InnerState { nullifier_tree, blockchain, account_tree }); + let forest = load_smt_forest(&mut db, latest_block_num).await?; - let forest = RwLock::new(forest); - let writer = Mutex::new(()); let db = Arc::new(db); // Initialize the proven tip from database. @@ -209,18 +331,36 @@ impl State { db.proven_chain_tip().await.map_err(StateInitializationError::DatabaseError)?; let (proven_tip_writer, proven_tip) = ProvenTipWriter::new(proven_tip); - Ok(( - Self { - db, - block_store, - inner, - forest, - writer, - termination_ask, - proven_tip, - }, - proven_tip_writer, - )) + // Create the writer channel. Buffer size of 1: only one block can be in flight. + let (writer_tx, writer_rx) = mpsc::channel(1); + + let in_memory = Arc::new(ArcSwap::from_pointee(InMemoryState { + block_num: latest_block_num, + nullifier_tree: snapshot_nullifier_tree, + account_tree: snapshot_account_tree, + blockchain, + forest, + })); + + // Build the writer context. + let writer_ctx = writer::BlockWriter { + rx: writer_rx, + account_tree: account_tree_with_history, + nullifier_tree, + db: Arc::clone(&db), + block_store: Arc::clone(&block_store), + in_memory: Arc::clone(&in_memory), + termination_ask: termination_ask.clone(), + }; + + let state = Arc::new(Self { db, block_store, in_memory, proven_tip }); + + // Spawn the single writer task. + tokio::spawn(writer_ctx.run()); + + let write_handle = writer::WriteHandle::new(writer_tx); + + Ok((state, write_handle, proven_tip_writer)) } /// Returns the database. @@ -236,6 +376,31 @@ impl State { // STATE ACCESSORS // -------------------------------------------------------------------------------------------- + /// Takes a consistent snapshot of all in-memory state. + /// + /// Returns an `Arc` via a wait-free `ArcSwap::load_full()`. This performs + /// only an atomic refcount increment — **no data is cloned**. No locks are acquired. + /// + /// The returned `Arc` is a frozen view: it keeps the snapshot alive for as long as needed, + /// even if the writer swaps in a new state in the meantime. Readers holding the old `Arc` + /// are completely unaffected by the swap. + fn snapshot(&self) -> Arc { + self.in_memory.load_full() + } + + /// Returns the effective chain tip for the given finality level. + /// + /// - [`Finality::Committed`]: returns the latest committed block number from the in-memory + /// state snapshot (wait-free via `ArcSwap`). + /// - [`Finality::Proven`]: returns the latest proven-in-sequence block number (cached via watch + /// channel, updated by the proof scheduler). + pub fn chain_tip(&self, finality: Finality) -> BlockNumber { + match finality { + Finality::Committed => self.snapshot().block_num, + Finality::Proven => self.proven_tip.read(), + } + } + /// Queries a [BlockHeader] from the database, and returns it alongside its inclusion proof. /// /// If [None] is given as the value of `block_num`, the data for the latest [BlockHeader] is @@ -246,11 +411,17 @@ impl State { block_num: Option, include_mmr_proof: bool, ) -> Result<(Option, Option), GetBlockHeaderError> { - let block_header = self.db.select_block_header_by_block_num(block_num).await?; + // Scope the DB query to the snapshot's block number to ensure consistency between + // the block header (from SQLite) and the MMR proof (from the snapshot). + let snapshot = self.snapshot(); + let block_num = block_num.unwrap_or(snapshot.block_num); + if block_num > snapshot.block_num { + return Err(GetBlockHeaderError::UnknownBlock(block_num)); + } + let block_header = self.db.select_block_header_by_block_num(Some(block_num)).await?; if let Some(header) = block_header { let mmr_proof = if include_mmr_proof { - let inner = self.inner.read().await; - let mmr_proof = inner.blockchain.open(header.block_num())?; + let mmr_proof = snapshot.blockchain.open(header.block_num())?; Some(mmr_proof) } else { None @@ -267,10 +438,10 @@ impl State { /// Note: these proofs are invalidated once the nullifier tree is modified, i.e. on a new block. #[instrument(level = "debug", target = COMPONENT, skip_all, ret)] pub async fn check_nullifiers(&self, nullifiers: &[Nullifier]) -> Vec { - let inner = self.inner.read().await; + let snapshot = self.snapshot(); nullifiers .iter() - .map(|n| inner.nullifier_tree.open(n)) + .map(|n| snapshot.nullifier_tree.open(n)) .map(NullifierWitness::into_proof) .collect() } @@ -292,22 +463,21 @@ impl State { &self, block_num: Option, ) -> Result, GetCurrentBlockchainDataError> { - let blockchain = &self.inner.read().await.blockchain; + let snapshot = self.snapshot(); if let Some(number) = block_num - && number == self.chain_tip(Finality::Committed).await + && number == snapshot.block_num { return Ok(None); } - // SAFETY: `select_block_header_by_block_num` will always return `Some(chain_tip_header)` - // when `None` is passed let block_header: BlockHeader = self .db - .select_block_header_by_block_num(None) + .select_block_header_by_block_num(Some(snapshot.block_num)) .await .map_err(GetCurrentBlockchainDataError::ErrorRetrievingBlockHeader)? - .unwrap(); - let peaks = blockchain + .expect("block header for snapshot block number must exist in DB"); + let peaks = snapshot + .blockchain .peaks_at(block_header.block_num()) .map_err(GetCurrentBlockchainDataError::InvalidPeaks)?; @@ -341,12 +511,15 @@ impl State { return Err(GetBatchInputsError::TransactionBlockReferencesEmpty); } + let snapshot = self.snapshot(); + let latest_block_num = snapshot.block_num; + // First we grab note inclusion proofs for the known notes. These proofs only // prove that the note was included in a given block. We then also need to prove that // each of those blocks is included in the chain. let note_proofs = self .db - .select_note_inclusion_proofs(unauthenticated_note_commitments) + .select_note_inclusion_proofs(unauthenticated_note_commitments, latest_block_num) .await .map_err(GetBatchInputsError::SelectNoteInclusionProofError)?; @@ -359,41 +532,31 @@ impl State { let mut blocks: BTreeSet = tx_reference_blocks; blocks.extend(note_blocks); - // Scoped block to automatically drop the read lock guard as soon as we're done. - // We also avoid accessing the db in the block as this would delay dropping the guard. - let (batch_reference_block, partial_mmr) = { - let inner_state = self.inner.read().await; + let highest_block_num = + *blocks.last().expect("we should have checked for empty block references"); + if highest_block_num > latest_block_num { + return Err(GetBatchInputsError::UnknownTransactionBlockReference { + highest_block_num, + latest_block_num, + }); + } - let latest_block_num = inner_state.latest_block_num(); + // Remove the latest block from the to-be-tracked blocks as it will be the reference + // block for the batch itself and thus added to the MMR within the batch kernel, so + // there is no need to prove its inclusion. + blocks.remove(&latest_block_num); - let highest_block_num = - *blocks.last().expect("we should have checked for empty block references"); - if highest_block_num > latest_block_num { - return Err(GetBatchInputsError::UnknownTransactionBlockReference { - highest_block_num, - latest_block_num, - }); - } + // SAFETY: + // - The latest block num was retrieved from the snapshot and the blockchain within the + // snapshot is guaranteed to be consistent with that block number. + // - We have checked that no block number in the blocks set is greater than latest block + // number *and* latest block num was removed from the set. + let partial_mmr = + snapshot.blockchain.partial_mmr_from_blocks(&blocks, latest_block_num).expect( + "latest block num should exist and all blocks in set should be < than latest block", + ); - // Remove the latest block from the to-be-tracked blocks as it will be the reference - // block for the batch itself and thus added to the MMR within the batch kernel, so - // there is no need to prove its inclusion. - blocks.remove(&latest_block_num); - - // SAFETY: - // - The latest block num was retrieved from the inner blockchain from which we will - // also retrieve the proofs, so it is guaranteed to exist in that chain. - // - We have checked that no block number in the blocks set is greater than latest block - // number *and* latest block num was removed from the set. Therefore only block - // numbers smaller than latest block num remain in the set. Therefore all the block - // numbers are guaranteed to exist in the chain state at latest block num. - let partial_mmr = inner_state - .blockchain - .partial_mmr_from_blocks(&blocks, latest_block_num) - .expect("latest block num should exist and all blocks in set should be < than latest block"); - - (latest_block_num, partial_mmr) - }; + let batch_reference_block = latest_block_num; // Fetch the reference block of the batch as part of this query, so we can avoid looking it // up in a separate DB access. @@ -441,12 +604,13 @@ impl State { unauthenticated_note_commitments: BTreeSet, reference_blocks: BTreeSet, ) -> Result { + let snapshot = self.snapshot(); + let latest_block_number = snapshot.block_num; + // Get the note inclusion proofs from the DB. - // We do this first so we have to acquire the lock to the state just once. There we need the - // reference blocks of the note proofs to get their authentication paths in the chain MMR. let unauthenticated_note_proofs = self .db - .select_note_inclusion_proofs(unauthenticated_note_commitments) + .select_note_inclusion_proofs(unauthenticated_note_commitments, snapshot.block_num) .await .map_err(GetBlockInputsError::SelectNoteInclusionProofError)?; @@ -458,8 +622,8 @@ impl State { let mut blocks = reference_blocks; blocks.extend(note_proof_reference_blocks); - let (latest_block_number, account_witnesses, nullifier_witnesses, partial_mmr) = - self.get_block_inputs_witnesses(&mut blocks, account_ids, nullifiers).await?; + let (account_witnesses, nullifier_witnesses, partial_mmr) = + Self::get_block_inputs_witnesses(&snapshot, &mut blocks, &account_ids, &nullifiers)?; // Fetch the block headers for all blocks in the partial MMR plus the latest one which will // be used as the previous block header of the block being built. @@ -482,14 +646,6 @@ impl State { // The order doesn't matter for PartialBlockchain::new, so swap remove is fine. let latest_block_header = headers.swap_remove(latest_block_header_index); - // SAFETY: This should not error because: - // - we're passing exactly the block headers that we've added to the partial MMR, - // - so none of the block header's block numbers should exceed the chain length of the - // partial MMR, - // - and we've added blocks to a BTreeSet, so there can be no duplicates. - // - // We construct headers and partial MMR in concert, so they are consistent. This is why we - // can call the unchecked constructor. let partial_block_chain = PartialBlockchain::new_unchecked(partial_mmr, headers) .expect("partial mmr and block headers should be consistent"); @@ -502,29 +658,24 @@ impl State { )) } - /// Get account and nullifier witnesses for the requested account IDs and nullifier as well as + /// Get account and nullifier witnesses for the requested account IDs and nullifiers as well as /// the [`PartialMmr`] for the given blocks. The MMR won't contain the latest block and its /// number is removed from `blocks` and returned separately. - /// - /// This method acquires the lock to the inner state and does not access the DB so we release - /// the lock asap. - async fn get_block_inputs_witnesses( - &self, + #[expect(clippy::type_complexity)] + fn get_block_inputs_witnesses( + snapshot: &Arc, blocks: &mut BTreeSet, - account_ids: Vec, - nullifiers: Vec, + account_ids: &[AccountId], + nullifiers: &[Nullifier], ) -> Result< ( - BlockNumber, BTreeMap, BTreeMap, PartialMmr, ), GetBlockInputsError, > { - let inner = self.inner.read().await; - - let latest_block_number = inner.latest_block_num(); + let latest_block_number = snapshot.block_num; // If `blocks` is empty, use the latest block number which will never trigger the error. let highest_block_number = blocks.last().copied().unwrap_or(latest_block_number); @@ -535,22 +686,12 @@ impl State { }); } - // The latest block is not yet in the chain MMR, so we can't (and don't need to) prove its - // inclusion in the chain. + // The latest block is not yet in the chain MMR, so we can't (and don't need to) prove + // its inclusion in the chain. blocks.remove(&latest_block_number); - // Fetch the partial MMR at the state of the latest block with authentication paths for the - // provided set of blocks. - // - // SAFETY: - // - The latest block num was retrieved from the inner blockchain from which we will also - // retrieve the proofs, so it is guaranteed to exist in that chain. - // - We have checked that no block number in the blocks set is greater than latest block - // number *and* latest block num was removed from the set. Therefore only block numbers - // smaller than latest block num remain in the set. Therefore all the block numbers are - // guaranteed to exist in the chain state at latest block num. let partial_mmr = - inner.blockchain.partial_mmr_from_blocks(blocks, latest_block_number).expect( + snapshot.blockchain.partial_mmr_from_blocks(blocks, latest_block_number).expect( "latest block num should exist and all blocks in set should be < than latest block", ); @@ -558,7 +699,7 @@ impl State { let account_witnesses = account_ids .iter() .copied() - .map(|account_id| (account_id, inner.account_tree.open_latest(account_id))) + .map(|account_id| (account_id, snapshot.account_tree.open_latest(account_id))) .collect::>(); // Fetch witnesses for all nullifiers. We don't check whether the nullifiers are spent or @@ -566,59 +707,72 @@ impl State { let nullifier_witnesses: BTreeMap = nullifiers .iter() .copied() - .map(|nullifier| (nullifier, inner.nullifier_tree.open(&nullifier))) + .map(|nullifier| (nullifier, snapshot.nullifier_tree.open(&nullifier))) .collect(); - Ok((latest_block_number, account_witnesses, nullifier_witnesses, partial_mmr)) + Ok((account_witnesses, nullifier_witnesses, partial_mmr)) } - /// Returns data needed by the block producer to verify transactions validity. + /// Returns data needed by the block producer to verify transactions validity and the + /// corresponding block height. #[instrument(target = COMPONENT, skip_all, ret)] pub async fn get_transaction_inputs( &self, account_id: AccountId, nullifiers: &[Nullifier], unauthenticated_note_commitments: Vec, - ) -> Result { + ) -> Result, DatabaseError> { info!(target: COMPONENT, account_id = %account_id.to_string(), nullifiers = %format_array(nullifiers)); - let inner = self.inner.read().await; + // Take a snapshot and extract everything we need, then drop it so readers of newer + // snapshots aren't held up by this Arc. + let snapshot = self.snapshot(); + let block_height = snapshot.block_num; - let account_commitment = inner.account_tree.get_latest_commitment(account_id); + let account_commitment = snapshot.account_tree.get_latest_commitment(account_id); let new_account_id_prefix_is_unique = if account_commitment.is_empty() { - Some(!inner.account_tree.contains_account_id_prefix_in_latest(account_id.prefix())) + Some(!snapshot.account_tree.contains_account_id_prefix_in_latest(account_id.prefix())) } else { None }; // Non-unique account Id prefixes for new accounts are not allowed. if let Some(false) = new_account_id_prefix_is_unique { - return Ok(TransactionInputs { - new_account_id_prefix_is_unique, - ..Default::default() - }); + return Ok(Scoped::new( + block_height, + TransactionInputs { + new_account_id_prefix_is_unique, + ..Default::default() + }, + )); } let nullifiers = nullifiers .iter() .map(|nullifier| NullifierInfo { nullifier: *nullifier, - block_num: inner.nullifier_tree.get_block_num(nullifier).unwrap_or_default(), + block_num: snapshot.nullifier_tree.get_block_num(nullifier).unwrap_or_default(), }) .collect(); + // Drop snapshot immediately after using it. + drop(snapshot); + let found_unauthenticated_notes = self .db - .select_existing_note_commitments(unauthenticated_note_commitments) + .select_existing_note_commitments(unauthenticated_note_commitments, block_height) .await?; - Ok(TransactionInputs { - account_commitment, - nullifiers, - found_unauthenticated_notes, - new_account_id_prefix_is_unique, - }) + Ok(Scoped::new( + block_height, + TransactionInputs { + account_commitment, + nullifiers, + found_unauthenticated_notes, + new_account_id_prefix_is_unique, + }, + )) } /// Returns details for public (on-chain) account. @@ -636,17 +790,18 @@ impl State { /// Returns network account IDs within the specified block range (based on account creation /// block). - /// - /// The function may return fewer accounts than exist in the range if the result would exceed - /// `MAX_RESPONSE_PAYLOAD_BYTES / AccountId::SERIALIZED_SIZE` rows. In this case, the result is - /// truncated at a block boundary to ensure all accounts from included blocks are returned. - /// - /// The response includes the last block number that was fully included in the result. pub async fn get_all_network_accounts( &self, block_range: RangeInclusive, - ) -> Result<(Vec, BlockNumber), DatabaseError> { - self.db.select_all_network_account_ids(block_range).await + ) -> Result, BlockNumber)>, DatabaseError> { + let snapshot = self.snapshot(); + let chain_tip = snapshot.block_num; + // Clamp the upper bound to the chain tip so callers can use BlockNumber::MAX to mean + // "up to the latest block". + let clamped_end = std::cmp::min(*block_range.end(), chain_tip); + let block_range = *block_range.start()..=clamped_end; + let result = self.db.select_all_network_account_ids(block_range).await?; + Ok(Scoped::new(chain_tip, result)) } /// Returns an account witness and optionally account details at a specific block. @@ -669,10 +824,17 @@ impl State { return Err(GetAccountError::AccountNotPublic(account_id)); } - let (block_num, witness) = self.get_account_witness(block_num, account_id).await?; + let snapshot = self.snapshot(); + + let (block_num, witness) = Self::get_account_witness(&snapshot, block_num, account_id)?; let details = if let Some(request) = details { - Some(self.fetch_public_account_details(account_id, block_num, request).await?) + Some( + Self::fetch_public_account_details( + &self.db, &snapshot, account_id, block_num, request, + ) + .await?, + ) } else { None }; @@ -685,20 +847,20 @@ impl State { /// If `block_num` is provided, returns the witness at that historical block; /// otherwise, returns the witness at the latest block. #[instrument(target = COMPONENT, skip_all)] - async fn get_account_witness( - &self, + fn get_account_witness( + snapshot: &Arc, block_num: Option, account_id: AccountId, ) -> Result<(BlockNumber, AccountWitness), GetAccountError> { - let inner_state = - self.inner.read().instrument(tracing::info_span!("acquire_inner_state")).await; - // Determine which block to query let (block_num, witness) = if let Some(requested_block) = block_num { + if requested_block > snapshot.block_num { + return Err(GetAccountError::UnknownBlock(requested_block)); + } // Historical query: use the account tree with history let witness = - inner_state.account_tree.open_at(account_id, requested_block).ok_or_else(|| { - let latest_block = inner_state.account_tree.block_number_latest(); + snapshot.account_tree.open_at(account_id, requested_block).ok_or_else(|| { + let latest_block = snapshot.account_tree.block_number_latest(); if requested_block > latest_block { GetAccountError::UnknownBlock(requested_block) } else { @@ -708,8 +870,8 @@ impl State { (requested_block, witness) } else { // Latest query: use the latest state - let block_num = inner_state.account_tree.block_number_latest(); - let witness = inner_state.account_tree.open_latest(account_id); + let block_num = snapshot.account_tree.block_number_latest(); + let witness = snapshot.account_tree.open_latest(account_id); (block_num, witness) }; @@ -728,7 +890,8 @@ impl State { #[expect(clippy::too_many_lines)] #[instrument(target = COMPONENT, skip_all)] async fn fetch_public_account_details( - &self, + db: &Arc, + snapshot: &Arc, account_id: AccountId, block_num: BlockNumber, detail_request: AccountDetailRequest, @@ -743,19 +906,12 @@ impl State { return Err(GetAccountError::AccountNotPublic(account_id)); } - // Validate block exists in the blockchain before querying the database - { - let inner = self.inner.read().instrument(tracing::info_span!("acquire_inner")).await; - let latest_block_num = inner.latest_block_num(); - - if block_num > latest_block_num { - return Err(GetAccountError::UnknownBlock(block_num)); - } + if block_num > snapshot.block_num { + return Err(GetAccountError::UnknownBlock(block_num)); } // Query account header and storage header together in a single DB call - let (account_header, storage_header) = self - .db + let (account_header, storage_header) = db .select_account_header_with_storage_header_at_block(account_id, block_num) .await? .ok_or(GetAccountError::AccountNotFound(account_id, block_num))?; @@ -763,9 +919,7 @@ impl State { let account_code = match code_commitment { Some(commitment) if commitment == account_header.code_commitment() => None, Some(_) => { - self.db - .select_account_code_by_commitment(account_header.code_commitment()) - .await? + db.select_account_code_by_commitment(account_header.code_commitment()).await? }, None => None, }; @@ -775,8 +929,7 @@ impl State { AccountVaultDetails::empty() }, Some(_) => { - let vault_assets = - self.db.select_account_vault_at_block(account_id, block_num).await?; + let vault_assets = db.select_account_vault_at_block(account_id, block_num).await?; AccountVaultDetails::from_assets(vault_assets) }, None => AccountVaultDetails::empty(), @@ -805,10 +958,9 @@ impl State { let mut storage_map_details_by_index = vec![None; storage_request_slots.len()]; if !map_keys_requests.is_empty() { - let forest_guard = - self.forest.read().instrument(tracing::info_span!("acquire_forest")).await; for (index, slot_name, keys) in map_keys_requests { - let details = forest_guard + let details = snapshot + .forest .get_storage_map_details_for_keys( account_id, slot_name.clone(), @@ -827,8 +979,7 @@ impl State { // TODO parallelize the read requests for (index, slot_name) in all_entries_requests { - let details = self - .db + let details = db .reconstruct_storage_map_from_db( account_id, slot_name.clone(), @@ -866,31 +1017,11 @@ impl State { }) } - /// Returns the effective chain tip for the given finality level. - /// - /// - [`Finality::Committed`]: returns the latest committed block number (from in-memory MMR). - /// - [`Finality::Proven`]: returns the latest proven-in-sequence block number (cached via watch - /// channel, updated by the proof scheduler). - pub async fn chain_tip(&self, finality: Finality) -> BlockNumber { - match finality { - Finality::Committed => self - .inner - .read() - .instrument(tracing::info_span!("acquire_inner")) - .await - .latest_block_num(), - Finality::Proven => self.proven_tip.read(), - } - } - /// Loads a block from the block store. Return `Ok(None)` if the block is not found. pub async fn load_block( &self, block_num: BlockNumber, ) -> Result>, DatabaseError> { - if block_num > self.chain_tip(Finality::Committed).await { - return Ok(None); - } self.block_store.load_block(block_num).await.map_err(Into::into) } @@ -899,9 +1030,6 @@ impl State { &self, block_num: BlockNumber, ) -> Result>, DatabaseError> { - if block_num > self.chain_tip(Finality::Proven).await { - return Ok(None); - } self.block_store.load_proof(block_num).await.map_err(Into::into) } @@ -918,6 +1046,8 @@ impl State { block_num: BlockNumber, page: Page, ) -> Result<(Vec, Page), DatabaseError> { + let snapshot = self.snapshot(); + let block_num = std::cmp::min(block_num, snapshot.block_num); self.db.select_unconsumed_network_notes(account_id, block_num, page).await } @@ -930,17 +1060,18 @@ impl State { } /// Returns vault asset witnesses for the specified account and block number. - pub async fn get_vault_asset_witnesses( + pub fn get_vault_asset_witnesses( &self, account_id: AccountId, block_num: BlockNumber, vault_keys: BTreeSet, ) -> Result, WitnessError> { - let witnesses = self - .forest - .read() - .await - .get_vault_asset_witnesses(account_id, block_num, vault_keys)?; + let snapshot = self.snapshot(); + if block_num > snapshot.block_num { + return Err(WitnessError::UnknownBlock(block_num)); + } + let witnesses = + snapshot.forest.get_vault_asset_witnesses(account_id, block_num, vault_keys)?; Ok(witnesses) } @@ -949,17 +1080,19 @@ impl State { /// /// Note that the `raw_key` is the raw, user-provided key that needs to be hashed in order to /// get the actual key into the storage map. - pub async fn get_storage_map_witness( + pub fn get_storage_map_witness( &self, account_id: AccountId, slot_name: &StorageSlotName, block_num: BlockNumber, raw_key: StorageMapKey, ) -> Result { - let witness = self + let snapshot = self.snapshot(); + if block_num > snapshot.block_num { + return Err(WitnessError::UnknownBlock(block_num)); + } + let witness = snapshot .forest - .read() - .await .get_storage_map_witness(account_id, slot_name, block_num, raw_key)?; Ok(witness) } diff --git a/crates/store/src/state/sync_state.rs b/crates/store/src/state/sync_state.rs index cda17e6ab..2ff7df6d6 100644 --- a/crates/store/src/state/sync_state.rs +++ b/crates/store/src/state/sync_state.rs @@ -7,7 +7,7 @@ use miden_protocol::block::{BlockHeader, BlockNumber}; use miden_protocol::crypto::merkle::mmr::{Forest, MmrDelta, MmrProof}; use tracing::instrument; -use super::State; +use super::{Scoped, State}; use crate::COMPONENT; use crate::db::models::queries::StorageMapValuesPage; use crate::db::{AccountVaultValue, NoteSyncUpdate, NullifierInfo}; @@ -34,8 +34,15 @@ impl State { &self, account_ids: Vec, block_range: RangeInclusive, - ) -> Result<(BlockNumber, Vec), DatabaseError> { - self.db.select_transactions_records(account_ids, block_range).await + ) -> Result)>, DatabaseError> { + let snapshot = self.snapshot(); + let chain_tip = snapshot.block_num; + let block_to = *block_range.end(); + if block_to > chain_tip { + return Err(DatabaseError::UnknownBlock(block_to)); + } + let result = self.db.select_transactions_records(account_ids, block_range).await?; + Ok(Scoped::new(chain_tip, result)) } /// Returns the chain MMR delta and the `block_to` block header for the specified block range. @@ -44,11 +51,14 @@ impl State { &self, block_range: RangeInclusive, ) -> Result<(MmrDelta, BlockHeader), StateSyncError> { + let snapshot = self.snapshot(); + let chain_tip = snapshot.block_num; let block_from = *block_range.start(); let block_to = *block_range.end(); + if block_to > chain_tip { + return Err(StateSyncError::UnknownBlock(block_to)); + } - // SAFETY: block_to has been validated to be <= the effective tip (chain tip or latest - // proven block) by the caller, so it must exist in the database. let block_header = self .db .select_block_header_by_block_num(Some(block_to)) @@ -77,10 +87,7 @@ impl State { let from_forest = (block_from + 1).as_usize(); let to_forest = block_to.as_usize(); - let mmr_delta = self - .inner - .read() - .await + let mmr_delta = snapshot .blockchain .as_mmr() .get_delta(Forest::new(from_forest), Forest::new(to_forest)) @@ -97,13 +104,21 @@ impl State { /// /// Also returns the last block number checked. If this equals `block_range.end()`, the /// sync is complete. + #[expect(clippy::type_complexity)] #[instrument(level = "debug", target = COMPONENT, skip_all, ret(level = "debug"), err)] pub async fn sync_notes( &self, note_tags: Vec, block_range: RangeInclusive, - ) -> Result<(Vec<(NoteSyncUpdate, MmrProof)>, BlockNumber), NoteSyncError> { + ) -> Result, BlockNumber)>, NoteSyncError> { + // Ensure the requested block range is within the chain's current tip. + let snapshot = self.snapshot(); + let chain_tip = snapshot.block_num; let block_end = *block_range.end(); + if block_range.end() > &chain_tip { + Err(NoteSyncError::FutureBlock { chain_tip, block_to: *block_range.end() })?; + } + let note_tags: Arc<[u32]> = note_tags.into(); let mut results = Vec::new(); @@ -129,8 +144,7 @@ impl State { // SAFETY: it is ensured that block_end <= chain_tip, and the blockchain MMR always has // at least chain_tip + 1 leaves. let mmr_checkpoint = block_end + 1; - let mmr_proof = - self.inner.read().await.blockchain.open_at(block_num, mmr_checkpoint)?; + let mmr_proof = snapshot.blockchain.open_at(block_num, mmr_checkpoint)?; results.push((note_sync, mmr_proof)); current_from = block_num + 1; @@ -140,38 +154,65 @@ impl State { let last_block_checked = results.last().map_or(block_end, |(update, _)| update.block_header.block_num()); - Ok((results, last_block_checked)) + Ok(Scoped::new(chain_tip, (results, last_block_checked))) } + /// Returns nullifiers matching the given prefixes within the block range. + /// + /// The block range is validated against the snapshot's chain tip. Returns the matching + /// nullifiers, the last block included, and the chain tip at the time of the query. pub async fn sync_nullifiers( &self, prefix_len: u32, nullifier_prefixes: Vec, block_range: RangeInclusive, - ) -> Result<(Vec, BlockNumber), DatabaseError> { - self.db + ) -> Result, BlockNumber)>, DatabaseError> { + // Ensure the db query is scoped by the snapshot's chain tip. + let chain_tip = self.snapshot().block_num; + if block_range.end() > &chain_tip { + return Err(DatabaseError::UnknownBlock(*block_range.end())); + } + + let result = self + .db .select_nullifiers_by_prefix(prefix_len, nullifier_prefixes, block_range) - .await + .await?; + + Ok(Scoped::new(chain_tip, result)) } // ACCOUNT STATE SYNCHRONIZATION // -------------------------------------------------------------------------------------------- - /// Returns account vault updates for specified account within a block range. + /// Returns account vault updates for specified account within a block range, including the last + /// included block and the chain tip. pub async fn sync_account_vault( &self, account_id: AccountId, block_range: RangeInclusive, - ) -> Result<(BlockNumber, Vec), DatabaseError> { - self.db.get_account_vault_sync(account_id, block_range).await + ) -> Result)>, DatabaseError> { + // Ensure the db query is scoped by the snapshot's chain tip. + let chain_tip = self.snapshot().block_num; + if block_range.end() > &chain_tip { + return Err(DatabaseError::UnknownBlock(*block_range.end())); + } + let result = self.db.get_account_vault_sync(account_id, block_range).await?; + Ok(Scoped::new(chain_tip, result)) } - /// Returns storage map values for syncing within a block range. + /// Returns storage map values for syncing within a block range including the chain tip. pub async fn sync_account_storage_maps( &self, account_id: AccountId, block_range: RangeInclusive, - ) -> Result { - self.db.select_storage_map_sync_values(account_id, block_range, None).await + ) -> Result, DatabaseError> { + // Ensure the db query is scoped by the snapshot's chain tip. + let chain_tip = self.snapshot().block_num; + if block_range.end() > &chain_tip { + return Err(DatabaseError::UnknownBlock(*block_range.end())); + } + let storage_map_values = + self.db.select_storage_map_sync_values(account_id, block_range, None).await?; + Ok(Scoped::new(chain_tip, storage_map_values)) } } diff --git a/crates/store/src/state/writer.rs b/crates/store/src/state/writer.rs new file mode 100644 index 000000000..4786406e7 --- /dev/null +++ b/crates/store/src/state/writer.rs @@ -0,0 +1,372 @@ +use std::sync::Arc; + +use arc_swap::ArcSwap; +use miden_node_proto::BlockProofRequest; +use miden_node_utils::ErrorReport; +use miden_protocol::account::delta::AccountUpdateDetails; +use miden_protocol::block::account_tree::AccountMutationSet; +use miden_protocol::block::nullifier_tree::{NullifierMutationSet, NullifierTree}; +use miden_protocol::block::{BlockBody, BlockHeader, SignedBlock}; +use miden_protocol::crypto::merkle::smt::LargeSmt; +use miden_protocol::note::NoteDetails; +use miden_protocol::transaction::OutputNote; +use miden_protocol::utils::serde::Serializable; +use tokio::sync::{mpsc, oneshot}; +use tracing::{info, instrument}; + +use crate::accounts::AccountTreeWithHistory; +use crate::blocks::BlockStore; +use crate::db::{Db, NoteRecord}; +use crate::errors::{ApplyBlockError, InvalidBlockError}; +use crate::state::InMemoryState; +use crate::state::loader::{SnapshotTreeStorage, TreeStorage}; +use crate::{COMPONENT, HistoricalError}; + +// WRITE HANDLE +// ================================================================================================ + +/// Handle for submitting blocks to the writer loop. +/// +/// This is intentionally separated from [`super::State`] to avoid a circular reference: the writer +/// loop must not hold a reference back to the sender's owner. +#[derive(Clone)] +pub struct WriteHandle { + tx: mpsc::Sender, +} + +impl WriteHandle { + pub(crate) fn new(tx: mpsc::Sender) -> Self { + Self { tx } + } + + /// Sends a block to the writer loop and awaits the result. + pub async fn apply_block( + &self, + signed_block: SignedBlock, + proving_inputs: Option, + ) -> Result<(), ApplyBlockError> { + let (result_tx, result_rx) = oneshot::channel(); + self.tx + .send(WriteRequest { signed_block, proving_inputs, result_tx }) + .await + .map_err(|e| ApplyBlockError::WriterTaskSendFailed(Box::new(e)))?; + result_rx.await? + } +} + +// BLOCK WRITER +// ================================================================================================ + +/// Single writer task that serializes all block mutations. +/// +/// Owns the channel receiver and writable trees, and holds shared references to the database, +/// block store, and in-memory state. Deliberately does not reference `State` to avoid circular +/// references — the channel sender lives in [`WriteHandle`], which is independent of `State`. +pub(crate) struct BlockWriter { + /// Channel receiver for incoming block write requests. + pub rx: mpsc::Receiver, + + /// Writable account tree with historical overlays, owned exclusively by the writer. + pub account_tree: AccountTreeWithHistory, + /// Writable nullifier tree, owned exclusively by the writer. + pub nullifier_tree: NullifierTree>, + + /// Shared database for persisting blocks, accounts, notes, and nullifiers. + pub db: Arc, + /// Shared block store for persisting raw block data. + pub block_store: Arc, + /// Shared in-memory state. The writer publishes new snapshots via `ArcSwap::store()`. + pub in_memory: Arc>, + + /// Channel to request process termination on fatal internal state errors. + pub termination_ask: mpsc::Sender, +} + +// WRITE REQUEST +// ================================================================================================ + +/// A request to apply a new block, sent through the writer channel. +pub struct WriteRequest { + pub signed_block: SignedBlock, + pub proving_inputs: Option, + pub result_tx: oneshot::Sender>, +} + +impl BlockWriter { + /// Runs the single writer loop. Receives blocks through the channel and applies them + /// sequentially. + pub(crate) async fn run(mut self) { + while let Some(req) = self.rx.recv().await { + let result = Box::pin(self.write_block(req.signed_block, req.proving_inputs)).await; + let _ = req.result_tx.send(result); + } + } + + /// Apply changes of a new block to the DB, file, and in-memory data structures. + /// + /// ## Consistency model + /// + /// This function is the sole writer to all state. The writer owns the writable trees directly. + /// + /// Because SQLite/files are committed **before** the in-memory swap, there is a window where + /// the DB is ahead of the in-memory state. Reader methods that combine in-memory and SQLite + /// data must scope their DB queries by the snapshot's `block_num` to maintain consistency + /// (see the doc comment on [`super::State`] for the full rules). + /// + /// Readers never block: they obtain an `Arc` via `ArcSwap::load_full()`, which performs only + /// an atomic refcount increment with no data cloning. The atomic swap guarantees readers see + /// either the old or new state, never a partial update. Readers holding an `Arc` to the old + /// state are completely unaffected by the swap. + #[instrument(target = COMPONENT, skip_all, err, fields(block.number = signed_block.header().block_num().as_u32()))] + async fn write_block( + &mut self, + signed_block: SignedBlock, + proving_inputs: Option, + ) -> Result<(), ApplyBlockError> { + let header = signed_block.header(); + let body = signed_block.body(); + let block_num = header.block_num(); + let block_commitment = header.commitment(); + + self.validate_block_header(header, body).await?; + + // Load the current in-memory state snapshot for validation (wait-free). + let snapshot = self.in_memory.load_full(); + + // Tree mutation, RocksDB writes, and snapshot construction are all CPU- and I/O-bound + // synchronous workloads. Run them inside block_in_place so tokio can evacuate other tasks + // from this thread for the duration. + let (snapshot_nullifier_tree, snapshot_account_tree, notes, account_deltas) = + tokio::task::block_in_place(|| -> Result<_, ApplyBlockError> { + let (nullifier_tree_update, account_tree_update) = + self.compute_tree_mutations(&snapshot, header, body)?; + + let notes = build_note_records(header, body)?; + + let account_deltas = + Vec::from_iter(body.updated_accounts().iter().filter_map(|update| { + match update.details() { + AccountUpdateDetails::Delta(delta) => Some(delta.clone()), + AccountUpdateDetails::Private => None, + } + })); + + self.nullifier_tree + .apply_mutations(nullifier_tree_update) + .expect("Unreachable: mutations were computed from the current tree state"); + + self.account_tree + .apply_mutations(account_tree_update) + .expect("Unreachable: mutations were computed from the current tree state"); + + let snapshot_nullifier_tree = self.build_snapshot_nullifier_tree(); + let snapshot_account_tree = self.build_snapshot_account_tree(); + + Ok((snapshot_nullifier_tree, snapshot_account_tree, notes, account_deltas)) + })?; + + let mut new_blockchain = snapshot.blockchain.clone(); + new_blockchain.push(block_commitment); + + let mut new_forest = snapshot.forest.clone(); + new_forest.apply_block_updates(block_num, account_deltas)?; + + let new_state = InMemoryState { + block_num, + nullifier_tree: snapshot_nullifier_tree, + account_tree: snapshot_account_tree, + blockchain: new_blockchain, + forest: new_forest, + }; + + // We have completed all in-memory mutations on the new clone of in-memory state. Now + // commit to storage before swapping the Arc. + + // Save the block to the block store. + let signed_block_bytes = signed_block.to_bytes(); + self.block_store.save_block(block_num, &signed_block_bytes).await?; + + // Commit to DB. Readers continue to see the old in-memory state (via their Arc) while + // the DB commits. We ensure consistency by scoping all RPC queries that hit DB data by + // the block number that is Arc swapped at the end of this function. + self.db + .apply_block(signed_block, notes, proving_inputs) + .await + .map_err(|err| ApplyBlockError::DbUpdateTaskFailed(err.as_report()))?; + + // Atomically publish the new state. Readers that call snapshot() after this point + // will see the updated state. Readers holding the old Arc continue unaffected. + self.in_memory.store(Arc::new(new_state)); + + info!(%block_commitment, block_num = block_num.as_u32(), COMPONENT, "apply_block successful"); + + Ok(()) + } + + /// Validates that the block header is consistent with the body and follows the previous block. + async fn validate_block_header( + &self, + header: &BlockHeader, + body: &BlockBody, + ) -> Result<(), ApplyBlockError> { + let tx_commitment = body.transactions().commitment(); + if header.tx_commitment() != tx_commitment { + return Err(InvalidBlockError::InvalidBlockTxCommitment { + expected: tx_commitment, + actual: header.tx_commitment(), + } + .into()); + } + + let block_num = header.block_num(); + let prev_block = self + .db + .select_block_header_by_block_num(None) + .await? + .ok_or(ApplyBlockError::DbBlockHeaderEmpty)?; + let expected_block_num = prev_block.block_num().child(); + if block_num != expected_block_num { + return Err(InvalidBlockError::NewBlockInvalidBlockNum { + expected: expected_block_num, + submitted: block_num, + } + .into()); + } + if header.prev_block_commitment() != prev_block.commitment() { + return Err(InvalidBlockError::NewBlockInvalidPrevCommitment.into()); + } + + Ok(()) + } + + /// Compute mutations for the nullifier tree and account tree. + fn compute_tree_mutations( + &mut self, + snapshot: &Arc, + header: &BlockHeader, + body: &BlockBody, + ) -> Result<(NullifierMutationSet, AccountMutationSet), ApplyBlockError> { + // Nullifiers can be produced only once. + let duplicate_nullifiers: Vec<_> = body + .created_nullifiers() + .iter() + .filter(|&nullifier| self.nullifier_tree.get_block_num(nullifier).is_some()) + .copied() + .collect(); + if !duplicate_nullifiers.is_empty() { + return Err(InvalidBlockError::DuplicatedNullifiers(duplicate_nullifiers).into()); + } + + // new_block.chain_root must be equal to the chain MMR root prior to the update. + let peaks = snapshot.blockchain.peaks(); + if peaks.hash_peaks() != header.chain_commitment() { + return Err(InvalidBlockError::NewBlockInvalidChainCommitment.into()); + } + + // Compute update for nullifier tree. + let nullifier_tree_update = self + .nullifier_tree + .compute_mutations( + body.created_nullifiers() + .iter() + .map(|nullifier| (*nullifier, header.block_num())), + ) + .map_err(InvalidBlockError::NewBlockNullifierAlreadySpent)?; + + if nullifier_tree_update.as_mutation_set().root() != header.nullifier_root() { + let _ = self.termination_ask.try_send(ApplyBlockError::InvalidBlockError( + InvalidBlockError::NewBlockInvalidNullifierRoot, + )); + return Err(InvalidBlockError::NewBlockInvalidNullifierRoot.into()); + } + + // Compute update for account tree from the writable tree (always in sync with DB). + let account_tree_update = self + .account_tree + .compute_mutations( + body.updated_accounts() + .iter() + .map(|update| (update.account_id(), update.final_state_commitment())), + ) + .map_err(|e| match e { + HistoricalError::AccountTreeError(err) => { + InvalidBlockError::NewBlockDuplicateAccountIdPrefix(err) + }, + HistoricalError::MerkleError(_) => { + panic!("Unexpected MerkleError during account tree mutation computation") + }, + })?; + + if account_tree_update.as_mutation_set().root() != header.account_root() { + let _ = self.termination_ask.try_send(ApplyBlockError::InvalidBlockError( + InvalidBlockError::NewBlockInvalidAccountRoot, + )); + return Err(InvalidBlockError::NewBlockInvalidAccountRoot.into()); + } + + Ok((nullifier_tree_update, account_tree_update)) + } + + /// Builds a snapshot-backed nullifier tree for the new in-memory state. + fn build_snapshot_nullifier_tree(&self) -> NullifierTree> { + #[cfg(feature = "rocksdb")] + { + self.nullifier_tree.reader() + } + #[cfg(not(feature = "rocksdb"))] + { + self.nullifier_tree.clone() + } + } + + /// Builds a snapshot-backed account tree for the new in-memory state. + fn build_snapshot_account_tree(&self) -> AccountTreeWithHistory { + #[cfg(feature = "rocksdb")] + { + self.account_tree.reader() + } + #[cfg(not(feature = "rocksdb"))] + { + self.account_tree.clone() + } + } +} + +/// Builds the note tree, validates its root against the header, and collects note records. +fn build_note_records( + header: &BlockHeader, + body: &BlockBody, +) -> Result)>, ApplyBlockError> { + let block_num = header.block_num(); + + let note_tree = body.compute_block_note_tree(); + if note_tree.root() != header.note_root() { + return Err(InvalidBlockError::NewBlockInvalidNoteRoot.into()); + } + + body.output_notes() + .map(|(note_index, note)| { + let (details, nullifier) = match note { + OutputNote::Public(note) => { + (Some(NoteDetails::from(note.as_note())), Some(note.as_note().nullifier())) + }, + OutputNote::Private(_) => (None, None), + }; + + let inclusion_path = note_tree.open(note_index); + + let note_record = NoteRecord { + block_num, + note_index, + note_id: note.id().as_word(), + note_commitment: note.to_commitment(), + metadata: note.metadata().clone(), + details, + inclusion_path, + }; + + Ok((note_record, nullifier)) + }) + .collect::, InvalidBlockError>>() + .map_err(Into::into) +} diff --git a/crates/validator/src/server/tests.rs b/crates/validator/src/server/tests.rs index 66357ab22..be7cbdede 100644 --- a/crates/validator/src/server/tests.rs +++ b/crates/validator/src/server/tests.rs @@ -324,7 +324,7 @@ async fn unknown_transactions_rejected() { let tx_id = tx_header.id(); // Build a ProvenBatch containing this transaction. - let batch = ProvenBatch::new( + let batch = ProvenBatch::new_unchecked( BatchId::from_ids(std::iter::once((tx_id, account_id))), genesis_header.commitment(), BlockNumber::GENESIS,