update multisig with new commit hash#90
Conversation
|
Added this extra changes to decouple XMSS aggregation from the tick loop via a watch channel and parallelize per-data_root proof generation with Rayon to eliminate the burst-tick stall causing us to fall 15-19 slots behind. |
| // Skip missed ticks instead of bursting: on_tick's while loop already | ||
| // catches up the store to wall-clock time, so replaying stale ticks is | ||
| // pure overhead that floods the executor with lock grabs and log writes. | ||
| tick_interval.set_missed_tick_behavior(MissedTickBehavior::Skip); |
There was a problem hiding this comment.
this is probably not a good idea - the store's on_tick catchup logic is actually not good too. The burst behavior is more predictable/easier to understand than ad-hoc catching up logic
| task::spawn(async move { | ||
| let Some(vs) = vs_for_chain else { return }; | ||
| loop { | ||
| if agg_rx.changed().await.is_err() { | ||
| break; // sender dropped — chain task shut down | ||
| } | ||
| let Some((slot, snapshot)) = agg_rx.borrow_and_update().clone() else { | ||
| continue; | ||
| }; | ||
| let vs = vs.clone(); | ||
| let log_rate = chain_log_inv_rate; | ||
| let result = | ||
| task::spawn_blocking(move || vs.maybe_aggregate(&snapshot, Slot(slot), log_rate)) | ||
| .await | ||
| .unwrap_or(None); | ||
| if res_tx.send((slot, result)).await.is_err() { | ||
| break; // chain task dropped — shut down | ||
| } | ||
| } | ||
| }); |
There was a problem hiding this comment.
this probably should live in its own file
There was a problem hiding this comment.
like a full submodule with types? or a single file like src/aggregation.rs with one public function
There was a problem hiding this comment.
I believe just single file is enough. Although it probably shouldn't be just the public function - probably better to encapsulate it to proper aggregator service
| loop { | ||
| let has_block = store_for_validator | ||
| .read() | ||
| .blocks | ||
| .values() | ||
| .any(|b| b.slot.0 == current_slot); | ||
| if has_block { | ||
| info!(slot = current_slot, "Block arrived, proceeding with attestation"); | ||
| break; | ||
| } | ||
| if tokio::time::Instant::now() >= attest_deadline { | ||
| info!(slot = current_slot, "Block wait timed out, attesting with current head"); | ||
| break; | ||
| } | ||
| tokio::time::sleep(Duration::from_millis(50)).await; | ||
| } |
There was a problem hiding this comment.
this probably should be implemented through channels instead
| /// | ||
| /// Receives store snapshots via `agg_rx` (watch channel — always latest value), | ||
| /// runs XMSS aggregation in a blocking thread, and sends results back via `res_tx`. | ||
| pub fn spawn( |
There was a problem hiding this comment.
yeah so this needs to be more proper service, that should expose the handlers for aggregation, instead of working with channels
No description provided.