diff --git a/api/src/handlers/pool_api.rs b/api/src/handlers/pool_api.rs index 299b79a287..8e0dbba108 100644 --- a/api/src/handlers/pool_api.rs +++ b/api/src/handlers/pool_api.rs @@ -95,7 +95,7 @@ impl PoolPushHandler { .chain_head() .context(ErrorKind::Internal("Failed to get chain head".to_owned()))?; let res = tx_pool - .add_to_pool(source, tx, !fluff, &header) + .add_to_pool(&source, &tx, !fluff, &header) .context(ErrorKind::Internal("Failed to update pool".to_owned()))?; Ok(res) }), diff --git a/chain/src/chain.rs b/chain/src/chain.rs index 33504d5445..d15ee6b303 100644 --- a/chain/src/chain.rs +++ b/chain/src/chain.rs @@ -525,7 +525,7 @@ impl Chain { let height = self.next_block_height()?; let txhashset = self.txhashset.read(); txhashset::utxo_view(&txhashset, |utxo| { - utxo.verify_coinbase_maturity(&tx.inputs(), height)?; + utxo.verify_coinbase_maturity(tx.inputs(), height)?; Ok(()) }) } diff --git a/chain/src/txhashset/utxo_view.rs b/chain/src/txhashset/utxo_view.rs index 2823acbe80..c82318baae 100644 --- a/chain/src/txhashset/utxo_view.rs +++ b/chain/src/txhashset/utxo_view.rs @@ -100,7 +100,7 @@ impl<'a> UTXOView<'a> { /// Verify we are not attempting to spend any coinbase outputs /// that have not sufficiently matured. - pub fn verify_coinbase_maturity(&self, inputs: &Vec, height: u64) -> Result<(), Error> { + pub fn verify_coinbase_maturity(&self, inputs: &[Input], height: u64) -> Result<(), Error> { // Find the greatest output pos of any coinbase // outputs we are attempting to spend. let pos = inputs diff --git a/core/src/core/block.rs b/core/src/core/block.rs index 5f1887af04..b103f1f612 100644 --- a/core/src/core/block.rs +++ b/core/src/core/block.rs @@ -480,7 +480,7 @@ impl Block { let all_kernels = Vec::from_iter(all_kernels); // Initialize a tx body and sort everything. - let body = TransactionBody::init(all_inputs, all_outputs, all_kernels, false)?; + let body = TransactionBody::init(&all_inputs, &all_outputs, &all_kernels, false)?; // Finally return the full block. // Note: we have not actually validated the block here, @@ -509,7 +509,7 @@ impl Block { // A block is just a big transaction, aggregate and add the reward output // and reward kernel. At this point the tx is technically invalid but the // tx body is valid if we account for the reward (i.e. as a block). - let agg_tx = transaction::aggregate(txs)? + let agg_tx = transaction::aggregate(&txs)? .with_output(reward_out) .with_kernel(reward_kern); @@ -598,7 +598,7 @@ impl Block { let kernels = self.kernels().clone(); // Initialize tx body and sort everything. - let body = TransactionBody::init(inputs, outputs, kernels, false)?; + let body = TransactionBody::init(&inputs, &outputs, &kernels, false)?; Ok(Block { header: self.header, diff --git a/core/src/core/transaction.rs b/core/src/core/transaction.rs index 0e67e64899..55e9d4922c 100644 --- a/core/src/core/transaction.rs +++ b/core/src/core/transaction.rs @@ -451,7 +451,7 @@ impl Readable for TransactionBody { let kernels = read_multi(reader, kernel_len)?; // Initialize tx body and verify everything is sorted. - let body = TransactionBody::init(inputs, outputs, kernels, true) + let body = TransactionBody::init(&inputs, &outputs, &kernels, true) .map_err(|_| ser::Error::CorruptedData)?; Ok(body) @@ -499,15 +499,15 @@ impl TransactionBody { /// the provided inputs, outputs and kernels. /// Guarantees inputs, outputs, kernels are sorted lexicographically. pub fn init( - inputs: Vec, - outputs: Vec, - kernels: Vec, + inputs: &[Input], + outputs: &[Output], + kernels: &[TxKernel], verify_sorted: bool, ) -> Result { let mut body = TransactionBody { - inputs, - outputs, - kernels, + inputs: inputs.to_vec(), + outputs: outputs.to_vec(), + kernels: kernels.to_vec(), }; if verify_sorted { @@ -830,7 +830,7 @@ impl Transaction { /// Creates a new transaction initialized with /// the provided inputs, outputs, kernels - pub fn new(inputs: Vec, outputs: Vec, kernels: Vec) -> Transaction { + pub fn new(inputs: &[Input], outputs: &[Output], kernels: &[TxKernel]) -> Transaction { let offset = BlindingFactor::zero(); // Initialize a new tx body and sort everything. @@ -877,7 +877,7 @@ impl Transaction { } /// Get inputs - pub fn inputs(&self) -> &Vec { + pub fn inputs(&self) -> &[Input] { &self.body.inputs } @@ -887,7 +887,7 @@ impl Transaction { } /// Get outputs - pub fn outputs(&self) -> &Vec { + pub fn outputs(&self) -> &[Output] { &self.body.outputs } @@ -897,7 +897,7 @@ impl Transaction { } /// Get kernels - pub fn kernels(&self) -> &Vec { + pub fn kernels(&self) -> &[TxKernel] { &self.body.kernels } @@ -990,12 +990,12 @@ pub fn cut_through(inputs: &mut Vec, outputs: &mut Vec) -> Result } /// Aggregate a vec of txs into a multi-kernel tx with cut_through. -pub fn aggregate(mut txs: Vec) -> Result { +pub fn aggregate(txs: &[Transaction]) -> Result { // convenience short-circuiting if txs.is_empty() { return Ok(Transaction::empty()); } else if txs.len() == 1 { - return Ok(txs.pop().unwrap()); + return Ok(txs.first().unwrap().clone()); } let mut n_inputs = 0; let mut n_outputs = 0; @@ -1013,13 +1013,13 @@ pub fn aggregate(mut txs: Vec) -> Result { // we will sum these together at the end to give us the overall offset for the // transaction let mut kernel_offsets: Vec = Vec::with_capacity(txs.len()); - for mut tx in txs { + for tx in txs { // we will sum these later to give a single aggregate offset kernel_offsets.push(tx.offset); - inputs.append(&mut tx.body.inputs); - outputs.append(&mut tx.body.outputs); - kernels.append(&mut tx.body.kernels); + inputs.extend_from_slice(tx.inputs()); + outputs.extend_from_slice(tx.outputs()); + kernels.extend_from_slice(tx.kernels()); } // Sort inputs and outputs during cut_through. @@ -1037,14 +1037,14 @@ pub fn aggregate(mut txs: Vec) -> Result { // * cut-through outputs // * full set of tx kernels // * sum of all kernel offsets - let tx = Transaction::new(inputs, outputs, kernels).with_offset(total_kernel_offset); + let tx = Transaction::new(&inputs, &outputs, &kernels).with_offset(total_kernel_offset); Ok(tx) } /// Attempt to deaggregate a multi-kernel transaction based on multiple /// transactions -pub fn deaggregate(mk_tx: Transaction, txs: Vec) -> Result { +pub fn deaggregate(mk_tx: &Transaction, txs: &[Transaction]) -> Result { let mut inputs: Vec = vec![]; let mut outputs: Vec = vec![]; let mut kernels: Vec = vec![]; @@ -1055,19 +1055,19 @@ pub fn deaggregate(mk_tx: Transaction, txs: Vec) -> Result) -> Result, - proofs: &Vec, - ) -> Result<(), Error> { + /// Batch validates the range proofs using the commitments. + /// TODO - can verify_bullet_proof_multi be reworked to take slices? + pub fn batch_verify_proofs(commits: &[Commitment], proofs: &[RangeProof]) -> Result<(), Error> { let secp = static_secp_instance(); secp.lock() - .verify_bullet_proof_multi(commits.clone(), proofs.clone(), None)?; + .verify_bullet_proof_multi(commits.to_vec(), proofs.to_vec(), None)?; Ok(()) } } diff --git a/p2p/src/peer.rs b/p2p/src/peer.rs index 1b50bf59ad..e57d25a84a 100644 --- a/p2p/src/peer.rs +++ b/p2p/src/peer.rs @@ -13,6 +13,7 @@ // limitations under the License. use crate::util::{Mutex, RwLock}; +use std::fmt; use std::fs::File; use std::net::{Shutdown, TcpStream}; use std::sync::Arc; @@ -54,6 +55,12 @@ pub struct Peer { connection: Option>, } +impl fmt::Debug for Peer { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "Peer({:?})", &self.info) + } +} + impl Peer { // Only accept and connect can be externally used to build a peer fn new(info: PeerInfo, adapter: Arc) -> Peer { diff --git a/p2p/src/peers.rs b/p2p/src/peers.rs index 19713b7d85..920738263b 100644 --- a/p2p/src/peers.rs +++ b/p2p/src/peers.rs @@ -37,7 +37,6 @@ pub struct Peers { pub adapter: Arc, store: PeerStore, peers: RwLock>>, - dandelion_relay: RwLock)>>, config: P2PConfig, } @@ -48,7 +47,6 @@ impl Peers { store, config, peers: RwLock::new(HashMap::new()), - dandelion_relay: RwLock::new(None), } } @@ -87,38 +85,39 @@ impl Peers { self.save_peer(&peer_data) } - // Update the dandelion relay - pub fn update_dandelion_relay(&self) { - let peers = self.outgoing_connected_peers(); - - let peer = &self - .config - .dandelion_peer - .and_then(|ip| peers.iter().find(|x| x.info.addr == ip)) - .or(thread_rng().choose(&peers)); - - match peer { - Some(peer) => self.set_dandelion_relay(peer), - None => debug!("Could not update dandelion relay"), - } - } - - fn set_dandelion_relay(&self, peer: &Arc) { - // Clear the map and add new relay - let dandelion_relay = &self.dandelion_relay; - dandelion_relay - .write() - .replace((Utc::now().timestamp(), peer.clone())); - debug!( - "Successfully updated Dandelion relay to: {}", - peer.info.addr - ); - } - - // Get the dandelion relay - pub fn get_dandelion_relay(&self) -> Option<(i64, Arc)> { - self.dandelion_relay.read().clone() - } + // // Update the dandelion relay + // pub fn update_dandelion_relay(&self) { + // let peers = self.outgoing_connected_peers(); + // + // let peer = &self + // .config + // .dandelion_peer + // .and_then(|ip| peers.iter().find(|x| x.info.addr == ip)) + // .or(thread_rng().choose(&peers)); + // + // match peer { + // Some(peer) => self.set_dandelion_relay(peer), + // None => debug!("Could not update dandelion relay"), + // } + // } + // + // fn set_dandelion_relay(&self, peer: &Arc) { + // // Clear the map and add new relay + // let dandelion_relay = &self.dandelion_relay; + // dandelion_relay.write().clear(); + // dandelion_relay + // .write() + // .insert(Utc::now().timestamp(), peer.clone()); + // debug!( + // "Successfully updated Dandelion relay to: {}", + // peer.info.addr + // ); + // } + // + // // Get the dandelion relay + // pub fn get_dandelion_relay(&self) -> HashMap> { + // self.dandelion_relay.read().clone() + // } pub fn is_known(&self, addr: PeerAddr) -> bool { self.peers.read().contains_key(&addr) @@ -335,29 +334,29 @@ impl Peers { ); } - /// Relays the provided stem transaction to our single stem peer. - pub fn relay_stem_transaction(&self, tx: &core::Transaction) -> Result<(), Error> { - self.get_dandelion_relay() - .or_else(|| { - debug!("No dandelion relay, updating."); - self.update_dandelion_relay(); - self.get_dandelion_relay() - }) - // If still return an error, let the caller handle this as they see fit. - // The caller will "fluff" at this point as the stem phase is finished. - .ok_or(Error::NoDandelionRelay) - .map(|(_, relay)| { - if relay.is_connected() { - if let Err(e) = relay.send_stem_transaction(tx) { - debug!("Error sending stem transaction to peer relay: {:?}", e); - } - } - }) - } - - /// Broadcasts the provided transaction to PEER_PREFERRED_COUNT of our - /// peers. We may be connected to PEER_MAX_COUNT peers so we only - /// want to broadcast to a random subset of peers. + // /// Relays the provided stem transaction to our single stem peer. + // pub fn relay_stem_transaction(&self, tx: &core::Transaction) -> Result<(), Error> { + // let dandelion_relay = self.get_dandelion_relay(); + // if dandelion_relay.is_empty() { + // debug!("No dandelion relay, updating."); + // self.update_dandelion_relay(); + // } + // // If still return an error, let the caller handle this as they see fit. + // // The caller will "fluff" at this point as the stem phase is finished. + // if dandelion_relay.is_empty() { + // return Err(Error::NoDandelionRelay); + // } + // for relay in dandelion_relay.values() { + // if relay.is_connected() { + // if let Err(e) = relay.send_stem_transaction(tx) { + // debug!("Error sending stem transaction to peer relay: {:?}", e); + // } + // } + // } + // Ok(()) + // } + + /// Broadcasts the provided transaction to PEER_MAX_COUNT of our peers. /// A peer implementation may drop the broadcast request /// if it knows the remote peer already has the transaction. pub fn broadcast_transaction(&self, tx: &core::Transaction) { diff --git a/pool/src/lib.rs b/pool/src/lib.rs index b6ccc56c79..c506deaec6 100644 --- a/pool/src/lib.rs +++ b/pool/src/lib.rs @@ -35,6 +35,4 @@ pub mod transaction_pool; pub mod types; pub use crate::transaction_pool::TransactionPool; -pub use crate::types::{ - BlockChain, DandelionConfig, PoolAdapter, PoolConfig, PoolEntryState, PoolError, TxSource, -}; +pub use crate::types::{BlockChain, DandelionConfig, PoolAdapter, PoolConfig, PoolError, TxSource}; diff --git a/pool/src/pool.rs b/pool/src/pool.rs index 5f2e944205..2b51ceba74 100644 --- a/pool/src/pool.rs +++ b/pool/src/pool.rs @@ -23,7 +23,7 @@ use self::core::core::{ Block, BlockHeader, BlockSums, Committed, Transaction, TxKernel, Weighting, }; use self::util::RwLock; -use crate::types::{BlockChain, PoolEntry, PoolEntryState, PoolError}; +use crate::types::{BlockChain, PoolEntry, PoolError}; use grin_core as core; use grin_util as util; use std::collections::{HashMap, HashSet}; @@ -139,7 +139,7 @@ impl Pool { // Verify these txs produce an aggregated tx below max tx weight. // Return a vec of all the valid txs. let txs = self.validate_raw_txs( - tx_buckets, + &tx_buckets, None, &header, Weighting::AsLimitedTransaction { max_weight }, @@ -159,7 +159,7 @@ impl Pool { return Ok(None); } - let tx = transaction::aggregate(txs)?; + let tx = transaction::aggregate(&txs)?; // Validate the single aggregate transaction "as pool", not subject to tx weight limits. tx.validate(Weighting::NoLimit, self.verifier_cache.clone())?; @@ -167,40 +167,24 @@ impl Pool { Ok(Some(tx)) } + /// TODO - Consider reworking this. We pass txs in rather than looking in the pool? pub fn select_valid_transactions( &self, - txs: Vec, - extra_tx: Option, + txs: &[Transaction], + extra_tx: Option<&Transaction>, header: &BlockHeader, ) -> Result, PoolError> { let valid_txs = self.validate_raw_txs(txs, extra_tx, header, Weighting::NoLimit)?; Ok(valid_txs) } - pub fn get_transactions_in_state(&self, state: PoolEntryState) -> Vec { - self.entries - .iter() - .filter(|x| x.state == state) - .map(|x| x.tx.clone()) - .collect::>() - } - - // Transition the specified pool entries to the new state. - pub fn transition_to_state(&mut self, txs: &[Transaction], state: PoolEntryState) { - for x in &mut self.entries { - if txs.contains(&x.tx) { - x.state = state; - } - } - } - // Aggregate this new tx with all existing txs in the pool. // If we can validate the aggregated tx against the current chain state // then we can safely add the tx to the pool. pub fn add_to_pool( &mut self, - entry: PoolEntry, - extra_txs: Vec, + entry: &PoolEntry, + extra_txs: &[Transaction], header: &BlockHeader, ) -> Result<(), PoolError> { // Combine all the txs from the pool with any extra txs provided. @@ -211,7 +195,7 @@ impl Pool { return Err(PoolError::DuplicateTx); } - txs.extend(extra_txs); + txs.extend_from_slice(extra_txs); let agg_tx = if txs.is_empty() { // If we have nothing to aggregate then simply return the tx itself. @@ -220,7 +204,7 @@ impl Pool { // Create a single aggregated tx from the existing pool txs and the // new entry txs.push(entry.tx.clone()); - transaction::aggregate(txs)? + transaction::aggregate(&txs)? }; // Validate aggregated tx (existing pool + new tx), ignoring tx weight limits. @@ -229,7 +213,7 @@ impl Pool { // If we get here successfully then we can safely add the entry to the pool. self.log_pool_add(&entry, header); - self.entries.push(entry); + self.entries.push(entry.clone()); Ok(()) } @@ -267,10 +251,11 @@ impl Pool { Ok(new_sums) } + /// TODO - We seem to have moved a lot of cloning into here. Rethink this. fn validate_raw_txs( &self, - txs: Vec, - extra_tx: Option, + txs: &[Transaction], + extra_tx: Option<&Transaction>, header: &BlockHeader, weighting: Weighting, ) -> Result, PoolError> { @@ -278,18 +263,18 @@ impl Pool { for tx in txs { let mut candidate_txs = vec![]; - if let Some(extra_tx) = extra_tx.clone() { - candidate_txs.push(extra_tx); + if let Some(extra_tx) = extra_tx { + candidate_txs.push(extra_tx.clone()); }; - candidate_txs.extend(valid_txs.clone()); + candidate_txs.extend_from_slice(&valid_txs); candidate_txs.push(tx.clone()); // Build a single aggregate tx from candidate txs. - let agg_tx = transaction::aggregate(candidate_txs)?; + let agg_tx = transaction::aggregate(&candidate_txs)?; // We know the tx is valid if the entire aggregate tx is valid. if self.validate_raw_tx(&agg_tx, header, weighting).is_ok() { - valid_txs.push(tx); + valid_txs.push(tx.clone()); } } @@ -319,7 +304,7 @@ impl Pool { pub fn reconcile( &mut self, - extra_tx: Option, + extra_tx: Option<&Transaction>, header: &BlockHeader, ) -> Result<(), PoolError> { let existing_entries = self.entries.clone(); @@ -327,11 +312,11 @@ impl Pool { let mut extra_txs = vec![]; if let Some(extra_tx) = extra_tx { - extra_txs.push(extra_tx); + extra_txs.push(extra_tx.clone()); } for x in existing_entries { - let _ = self.add_to_pool(x, extra_txs.clone(), header); + let _ = self.add_to_pool(&x, &extra_txs, header); } Ok(()) @@ -392,7 +377,7 @@ impl Pool { // if the aggregate tx is a valid tx. // Otherwise discard and let the next block pick this tx up. let current = tx_buckets[pos].clone(); - if let Ok(agg_tx) = transaction::aggregate(vec![current, entry.tx.clone()]) { + if let Ok(agg_tx) = transaction::aggregate(&vec![current, entry.tx.clone()]) { if agg_tx .validate( Weighting::AsLimitedTransaction { max_weight }, diff --git a/pool/src/transaction_pool.rs b/pool/src/transaction_pool.rs index becd7a7884..9c6b7a4f96 100644 --- a/pool/src/transaction_pool.rs +++ b/pool/src/transaction_pool.rs @@ -23,9 +23,7 @@ use self::core::core::verifier_cache::VerifierCache; use self::core::core::{transaction, Block, BlockHeader, Transaction, Weighting}; use self::util::RwLock; use crate::pool::Pool; -use crate::types::{ - BlockChain, PoolAdapter, PoolConfig, PoolEntry, PoolEntryState, PoolError, TxSource, -}; +use crate::types::{BlockChain, PoolAdapter, PoolConfig, PoolEntry, PoolError, TxSource}; use chrono::prelude::*; use grin_core as core; use grin_util as util; @@ -76,13 +74,15 @@ impl TransactionPool { self.blockchain.chain_head() } - fn add_to_stempool(&mut self, entry: PoolEntry, header: &BlockHeader) -> Result<(), PoolError> { + fn add_to_stempool( + &mut self, + entry: &PoolEntry, + header: &BlockHeader, + ) -> Result<(), PoolError> { // Add tx to stempool (passing in all txs from txpool to validate against). self.stempool - .add_to_pool(entry, self.txpool.all_transactions(), header)?; - - // Note: we do not notify the adapter here, - // we let the dandelion monitor handle this. + .add_to_pool(entry, &self.txpool.all_transactions(), header)?; + self.adapter.stem_tx_accepted(&entry.tx); Ok(()) } @@ -100,14 +100,14 @@ impl TransactionPool { fn add_to_txpool( &mut self, - mut entry: PoolEntry, + entry: &mut PoolEntry, header: &BlockHeader, ) -> Result<(), PoolError> { // First deaggregate the tx based on current txpool txs. if entry.tx.kernels().len() > 1 { let txs = self.txpool.find_matching_transactions(entry.tx.kernels()); if !txs.is_empty() { - let tx = transaction::deaggregate(entry.tx, txs)?; + let tx = transaction::deaggregate(&entry.tx, &txs)?; // Validate this deaggregated tx "as tx", subject to regular tx weight limits. tx.validate(Weighting::AsTransaction, self.verifier_cache.clone())?; @@ -116,13 +116,13 @@ impl TransactionPool { entry.src.debug_name = "deagg".to_string(); } } - self.txpool.add_to_pool(entry.clone(), vec![], header)?; + self.txpool.add_to_pool(entry, &vec![], header)?; // We now need to reconcile the stempool based on the new state of the txpool. // Some stempool txs may no longer be valid and we need to evict them. { let txpool_tx = self.txpool.all_transactions_aggregate()?; - self.stempool.reconcile(txpool_tx, header)?; + self.stempool.reconcile(txpool_tx.as_ref(), header)?; } self.adapter.tx_accepted(&entry.tx); @@ -133,8 +133,8 @@ impl TransactionPool { /// txpool based on stem flag provided. pub fn add_to_pool( &mut self, - src: TxSource, - tx: Transaction, + src: &TxSource, + tx: &Transaction, stem: bool, header: &BlockHeader, ) -> Result<(), PoolError> { @@ -145,7 +145,7 @@ impl TransactionPool { } // Do we have the capacity to accept this transaction? - self.is_acceptable(&tx, stem)?; + self.is_acceptable(tx, stem)?; // Make sure the transaction is valid before anything else. // Validate tx accounting for max tx weight. @@ -153,34 +153,24 @@ impl TransactionPool { .map_err(PoolError::InvalidTx)?; // Check the tx lock_time is valid based on current chain state. - self.blockchain.verify_tx_lock_height(&tx)?; + self.blockchain.verify_tx_lock_height(tx)?; // Check coinbase maturity before we go any further. - self.blockchain.verify_coinbase_maturity(&tx)?; + self.blockchain.verify_coinbase_maturity(tx)?; - let entry = PoolEntry { - state: PoolEntryState::Fresh, - src, + let mut entry = PoolEntry { + src: src.clone(), tx_at: Utc::now(), - tx, + tx: tx.clone(), }; - // If we are in "stem" mode then check if this is a new tx or if we have seen it before. - // If new tx - add it to our stempool. - // If we have seen any of the kernels before then fallback to fluff, - // adding directly to txpool. - if stem - && self - .stempool - .find_matching_transactions(entry.tx.kernels()) - .is_empty() - { - self.add_to_stempool(entry, header)?; - return Ok(()); + if stem { + self.add_to_stempool(&entry, header)?; + } else { + self.add_to_txpool(&mut entry, header)?; + self.add_to_reorg_cache(entry); } - self.add_to_txpool(entry.clone(), header)?; - self.add_to_reorg_cache(entry); Ok(()) } @@ -202,8 +192,8 @@ impl TransactionPool { entries.len(), header.hash(), ); - for entry in entries { - let _ = &self.add_to_txpool(entry.clone(), header); + for mut entry in entries { + let _ = &self.add_to_txpool(&mut entry, header); } debug!( "reconcile_reorg_cache: block: {:?} ... done.", @@ -223,7 +213,7 @@ impl TransactionPool { self.stempool.reconcile_block(block); { let txpool_tx = self.txpool.all_transactions_aggregate()?; - self.stempool.reconcile(txpool_tx, &block.header)?; + self.stempool.reconcile(txpool_tx.as_ref(), &block.header)?; } Ok(()) diff --git a/pool/src/types.rs b/pool/src/types.rs index 04f1a1ee27..b022394a97 100644 --- a/pool/src/types.rs +++ b/pool/src/types.rs @@ -31,13 +31,13 @@ use grin_keychain as keychain; const DANDELION_RELAY_SECS: u64 = 600; /// Dandelion embargo timer -const DANDELION_EMBARGO_SECS: u64 = 180; +const DANDELION_EMBARGO_SECS: u64 = 600; /// Dandelion patience timer const DANDELION_PATIENCE_SECS: u64 = 10; /// Dandelion stem probability (stem 90% of the time, fluff 10%). -const DANDELION_STEM_PROBABILITY: usize = 90; +const DANDELION_STEM_PROBABILITY: u64 = 90; /// Configuration for "Dandelion". /// Note: shared between p2p and pool. @@ -56,7 +56,18 @@ pub struct DandelionConfig { pub patience_secs: Option, /// Dandelion stem probability (stem 90% of the time, fluff 10% etc.) #[serde = "default_dandelion_stem_probability"] - pub stem_probability: Option, + pub stem_probability: Option, +} + +impl DandelionConfig { + pub fn stem_probability(&self) -> u64 { + self.stem_probability.unwrap_or(DANDELION_STEM_PROBABILITY) + } + + // TODO - Cleanup config for Dandelion++ + pub fn epoch_secs(&self) -> u64 { + self.relay_secs.unwrap_or(DANDELION_RELAY_SECS) + } } impl Default for DandelionConfig { @@ -82,7 +93,7 @@ fn default_dandelion_patience_secs() -> Option { Some(DANDELION_PATIENCE_SECS) } -fn default_dandelion_stem_probability() -> Option { +fn default_dandelion_stem_probability() -> Option { Some(DANDELION_STEM_PROBABILITY) } @@ -138,8 +149,6 @@ fn default_mineable_max_weight() -> usize { /// A single (possibly aggregated) transaction. #[derive(Clone, Debug)] pub struct PoolEntry { - /// The state of the pool entry. - pub state: PoolEntryState, /// Info on where this tx originated from. pub src: TxSource, /// Timestamp of when this tx was originally added to the pool. @@ -148,21 +157,6 @@ pub struct PoolEntry { pub tx: Transaction, } -/// The possible states a pool entry can be in. -#[derive(Clone, Copy, Debug, PartialEq)] -pub enum PoolEntryState { - /// A new entry, not yet processed. - Fresh, - /// Tx to be included in the next "stem" run. - ToStem, - /// Tx previously "stemmed" and propagated. - Stemmed, - /// Tx to be included in the next "fluff" run. - ToFluff, - /// Tx previously "fluffed" and broadcast. - Fluffed, -} - /// Placeholder: the data representing where we heard about a tx from. /// /// Used to make decisions based on transaction acceptance priority from @@ -267,13 +261,11 @@ pub trait BlockChain: Sync + Send { /// downstream processing of valid transactions by the rest of the system, most /// importantly the broadcasting of transactions to our peers. pub trait PoolAdapter: Send + Sync { - /// The transaction pool has accepted this transactions as valid and added + /// The transaction pool has accepted this transaction as valid and added /// it to its internal cache. fn tx_accepted(&self, tx: &transaction::Transaction); - /// The stem transaction pool has accepted this transactions as valid and - /// added it to its internal cache, we have waited for the "patience" timer - /// to fire and we now want to propagate the tx to the next Dandelion relay. - fn stem_tx_accepted(&self, tx: &transaction::Transaction) -> Result<(), PoolError>; + + fn stem_tx_accepted(&self, tx: &transaction::Transaction); } /// Dummy adapter used as a placeholder for real implementations @@ -281,9 +273,6 @@ pub trait PoolAdapter: Send + Sync { pub struct NoopAdapter {} impl PoolAdapter for NoopAdapter { - fn tx_accepted(&self, _: &transaction::Transaction) {} - - fn stem_tx_accepted(&self, _: &transaction::Transaction) -> Result<(), PoolError> { - Ok(()) - } + fn tx_accepted(&self, _tx: &transaction::Transaction) {} + fn stem_tx_accepted(&self, _tx: &transaction::Transaction) {} } diff --git a/servers/src/common/adapters.rs b/servers/src/common/adapters.rs index ae160267b6..b2162fadc5 100644 --- a/servers/src/common/adapters.rs +++ b/servers/src/common/adapters.rs @@ -22,7 +22,9 @@ use std::thread; use std::time::Instant; use crate::chain::{self, BlockStatus, ChainAdapter, Options}; -use crate::common::types::{self, ChainValidationMode, ServerConfig, SyncState, SyncStatus}; +use crate::common::types::{ + self, ChainValidationMode, DandelionEpoch, ServerConfig, SyncState, SyncStatus, +}; use crate::core::core::hash::{Hash, Hashed}; use crate::core::core::transaction::Transaction; use crate::core::core::verifier_cache::VerifierCache; @@ -32,6 +34,7 @@ use crate::core::{core, global}; use crate::p2p; use crate::p2p::types::PeerAddr; use crate::pool; +use crate::pool::types::DandelionConfig; use crate::util::OneTime; use chrono::prelude::*; use chrono::Duration; @@ -99,7 +102,7 @@ impl p2p::ChainAdapter for NetToChainAdapter { let res = { let mut tx_pool = self.tx_pool.write(); - tx_pool.add_to_pool(source, tx, stem, &header) + tx_pool.add_to_pool(&source, &tx, stem, &header) }; if let Err(e) = res { @@ -700,26 +703,63 @@ impl ChainToPoolAndNetAdapter { /// transactions that have been accepted. pub struct PoolToNetAdapter { peers: OneTime>, + dandelion_epoch: Arc>, } -impl pool::PoolAdapter for PoolToNetAdapter { - fn stem_tx_accepted(&self, tx: &core::Transaction) -> Result<(), pool::PoolError> { - self.peers() - .relay_stem_transaction(tx) - .map_err(|_| pool::PoolError::DandelionError)?; - Ok(()) +pub trait DandelionAdapter: Send + Sync { + fn is_stem(&self) -> bool; + + fn is_expired(&self) -> bool; + + fn next_epoch(&self); +} + +impl DandelionAdapter for PoolToNetAdapter { + fn is_stem(&self) -> bool { + self.dandelion_epoch.read().is_stem() } + fn is_expired(&self) -> bool { + self.dandelion_epoch.read().is_expired() + } + + fn next_epoch(&self) { + self.dandelion_epoch.write().next_epoch(&self.peers()); + } +} + +impl pool::PoolAdapter for PoolToNetAdapter { fn tx_accepted(&self, tx: &core::Transaction) { self.peers().broadcast_transaction(tx); } + + fn stem_tx_accepted(&self, tx: &core::Transaction) { + let mut epoch = self.dandelion_epoch.write(); + if epoch.is_expired() { + warn!("epoch expired, setting up next epoch"); + epoch.next_epoch(&self.peers()); + } + + if epoch.is_stem() { + if let Some(peer) = epoch.relay_peer(&self.peers()) { + warn!("Stemming this epoch, relaying to next peer."); + peer.send_stem_transaction(tx); + } else { + // TODO - no relay peer, no available outgoing peers, do we fluff here? + error!("What to do here? We have no relay peer?"); + } + } else { + warn!("Not forwarding stem tx. Collecting, aggregating and fluffing txs this epoch. (Ok and expected)."); + } + } } impl PoolToNetAdapter { /// Create a new pool to net adapter - pub fn new() -> PoolToNetAdapter { + pub fn new(config: DandelionConfig) -> PoolToNetAdapter { PoolToNetAdapter { peers: OneTime::new(), + dandelion_epoch: Arc::new(RwLock::new(DandelionEpoch::new(config))), } } diff --git a/servers/src/common/types.rs b/servers/src/common/types.rs index 72db40a036..c8d0352bce 100644 --- a/servers/src/common/types.rs +++ b/servers/src/common/types.rs @@ -13,19 +13,22 @@ // limitations under the License. //! Server types -use crate::util::RwLock; use std::convert::From; use std::sync::Arc; +use chrono::prelude::{DateTime, Utc}; +use rand::prelude::*; + use crate::api; use crate::chain; use crate::core::global::ChainTypes; use crate::core::{core, pow}; use crate::p2p; use crate::pool; +use crate::pool::types::DandelionConfig; use crate::store; +use crate::util::RwLock; use crate::wallet; -use chrono::prelude::{DateTime, Utc}; /// Error type wrapping underlying module errors. #[derive(Debug)] @@ -397,3 +400,70 @@ impl chain::TxHashsetWriteStatus for SyncState { self.update(SyncStatus::TxHashsetDone); } } + +/// A node is either "stem" of "fluff" for the duration of a single epoch. +/// A node also maintains an outbound relay peer for the epoch. +#[derive(Debug)] +pub struct DandelionEpoch { + config: DandelionConfig, + // When did this epoch start? + start_time: Option, + // Are we in "stem" mode or "fluff" mode for this epoch? + is_stem: bool, + // Our current Dandelion relay peer (effective for this epoch). + relay_peer: Option>, +} + +impl DandelionEpoch { + pub fn new(config: DandelionConfig) -> DandelionEpoch { + DandelionEpoch { + config, + start_time: None, + is_stem: false, + relay_peer: None, + } + } + + pub fn is_expired(&self) -> bool { + let expired = if let Some(start_time) = self.start_time { + Utc::now().timestamp().saturating_sub(start_time) > self.config.epoch_secs() as i64 + } else { + true + }; + error!("DandelionEpoch: is_expired: {}", expired); + expired + } + + pub fn next_epoch(&mut self, peers: &Arc) { + self.start_time = Some(Utc::now().timestamp()); + self.relay_peer = peers.outgoing_connected_peers().first().cloned(); + + // If stem_probability == 90 then we stem 90% of the time. + let mut rng = rand::thread_rng(); + self.is_stem = rng.gen_range(0, 101) < self.config.stem_probability(); + + error!("DandelionEpoch: next_epoch: {:?}", self); + } + + // Are we stemming transactions in this epoch? + pub fn is_stem(&self) -> bool { + self.is_stem + } + + pub fn relay_peer(&mut self, peers: &Arc) -> Option> { + let mut update_relay = false; + if let Some(peer) = &self.relay_peer { + if !peer.is_connected() { + update_relay = true; + } + } else { + update_relay = true; + } + + if update_relay { + self.relay_peer = peers.outgoing_connected_peers().first().cloned(); + } + + self.relay_peer.clone() + } +} diff --git a/servers/src/grin/dandelion_monitor.rs b/servers/src/grin/dandelion_monitor.rs index 57fcbb3081..6cbbe4ecf8 100644 --- a/servers/src/grin/dandelion_monitor.rs +++ b/servers/src/grin/dandelion_monitor.rs @@ -12,17 +12,18 @@ // See the License for the specific language governing permissions and // limitations under the License. -use crate::util::{Mutex, RwLock, StopState}; use chrono::prelude::Utc; use rand::{thread_rng, Rng}; use std::sync::Arc; use std::thread; use std::time::Duration; +use crate::common::adapters::DandelionAdapter; use crate::core::core::hash::Hashed; use crate::core::core::transaction; use crate::core::core::verifier_cache::VerifierCache; -use crate::pool::{DandelionConfig, PoolEntryState, PoolError, TransactionPool, TxSource}; +use crate::pool::{DandelionConfig, PoolError, TransactionPool, TxSource}; +use crate::util::{Mutex, RwLock, StopState}; /// A process to monitor transactions in the stempool. /// With Dandelion, transaction can be broadcasted in stem or fluff phase. @@ -35,6 +36,7 @@ use crate::pool::{DandelionConfig, PoolEntryState, PoolError, TransactionPool, T pub fn monitor_transactions( dandelion_config: DandelionConfig, tx_pool: Arc>, + adapter: Arc, verifier_cache: Arc>, stop_state: Arc>, ) { @@ -48,51 +50,43 @@ pub fn monitor_transactions( break; } + // TODO - may be preferable to loop more often and check for expired patience time? // This is the patience timer, we loop every n secs. let patience_secs = dandelion_config.patience_secs.unwrap(); thread::sleep(Duration::from_secs(patience_secs)); - // Step 1: find all "ToStem" entries in stempool from last run. - // Aggregate them up to give a single (valid) aggregated tx and propagate it - // to the next Dandelion relay along the stem. - if process_stem_phase(tx_pool.clone(), verifier_cache.clone()).is_err() { - error!("dand_mon: Problem with stem phase."); + // Our adapter hooks us into the current Dandelion "epoch". + // From this we can determine if we should fluff txs in stempool. + if adapter.is_expired() { + adapter.next_epoch(); } - // Step 2: find all "ToFluff" entries in stempool from last run. - // Aggregate them up to give a single (valid) aggregated tx and (re)add it - // to our pool with stem=false (which will then broadcast it). - if process_fluff_phase(tx_pool.clone(), verifier_cache.clone()).is_err() { - error!("dand_mon: Problem with fluff phase."); - } + // Vastly simplified - + // check if we are is_stem() via the adapter (current epoch) + // * if we are stem then do nothing (nothing to aggregate here) + // * if fluff then aggregate and add to txpool - // Step 3: now find all "Fresh" entries in stempool since last run. - // Coin flip for each (90/10) and label them as either "ToStem" or "ToFluff". - // We will process these in the next run (waiting patience secs). - if process_fresh_entries(dandelion_config.clone(), tx_pool.clone()).is_err() { - error!("dand_mon: Problem processing fresh pool entries."); + if !adapter.is_stem() { + if process_fluff_phase(&tx_pool, &verifier_cache).is_err() { + error!("dand_mon: Problem processing fresh pool entries."); + } } - // Step 4: now find all expired entries based on embargo timer. - if process_expired_entries(dandelion_config.clone(), tx_pool.clone()).is_err() { + // Now find all expired entries based on embargo timer. + if process_expired_entries(&dandelion_config, &tx_pool).is_err() { error!("dand_mon: Problem processing fresh pool entries."); } } }); } -fn process_stem_phase( - tx_pool: Arc>, - verifier_cache: Arc>, +fn process_fluff_phase( + tx_pool: &Arc>, + verifier_cache: &Arc>, ) -> Result<(), PoolError> { let mut tx_pool = tx_pool.write(); - let header = tx_pool.chain_head()?; - - let stem_txs = tx_pool - .stempool - .get_transactions_in_state(PoolEntryState::ToStem); - + let stem_txs = tx_pool.stempool.all_transactions(); if stem_txs.is_empty() { return Ok(()); } @@ -100,118 +94,36 @@ fn process_stem_phase( // Get the aggregate tx representing the entire txpool. let txpool_tx = tx_pool.txpool.all_transactions_aggregate()?; - let stem_txs = tx_pool - .stempool - .select_valid_transactions(stem_txs, txpool_tx, &header)?; - tx_pool - .stempool - .transition_to_state(&stem_txs, PoolEntryState::Stemmed); - - if stem_txs.len() > 0 { - debug!("dand_mon: Found {} txs for stemming.", stem_txs.len()); - - let agg_tx = transaction::aggregate(stem_txs)?; - agg_tx.validate( - transaction::Weighting::AsTransaction, - verifier_cache.clone(), - )?; - - let res = tx_pool.adapter.stem_tx_accepted(&agg_tx); - if res.is_err() { - debug!("dand_mon: Unable to propagate stem tx. No relay, fluffing instead."); - - let src = TxSource { - debug_name: "no_relay".to_string(), - identifier: "?.?.?.?".to_string(), - }; - - tx_pool.add_to_pool(src, agg_tx, false, &header)?; - } - } - Ok(()) -} - -fn process_fluff_phase( - tx_pool: Arc>, - verifier_cache: Arc>, -) -> Result<(), PoolError> { - let mut tx_pool = tx_pool.write(); - let header = tx_pool.chain_head()?; - - let stem_txs = tx_pool - .stempool - .get_transactions_in_state(PoolEntryState::ToFluff); + let stem_txs = + tx_pool + .stempool + .select_valid_transactions(&stem_txs, txpool_tx.as_ref(), &header)?; if stem_txs.is_empty() { return Ok(()); } - // Get the aggregate tx representing the entire txpool. - let txpool_tx = tx_pool.txpool.all_transactions_aggregate()?; - - let stem_txs = tx_pool - .stempool - .select_valid_transactions(stem_txs, txpool_tx, &header)?; - tx_pool - .stempool - .transition_to_state(&stem_txs, PoolEntryState::Fluffed); - - if stem_txs.len() > 0 { - debug!("dand_mon: Found {} txs for fluffing.", stem_txs.len()); - - let agg_tx = transaction::aggregate(stem_txs)?; - agg_tx.validate( - transaction::Weighting::AsTransaction, - verifier_cache.clone(), - )?; - - let src = TxSource { - debug_name: "fluff".to_string(), - identifier: "?.?.?.?".to_string(), - }; + debug!("dand_mon: Found {} txs for fluffing.", stem_txs.len()); - tx_pool.add_to_pool(src, agg_tx, false, &header)?; - } - Ok(()) -} + let agg_tx = transaction::aggregate(&stem_txs)?; + agg_tx.validate( + transaction::Weighting::AsTransaction, + verifier_cache.clone(), + )?; -fn process_fresh_entries( - dandelion_config: DandelionConfig, - tx_pool: Arc>, -) -> Result<(), PoolError> { - let mut tx_pool = tx_pool.write(); + let src = TxSource { + debug_name: "fluff".to_string(), + identifier: "?.?.?.?".to_string(), + }; - let mut rng = thread_rng(); - - let fresh_entries = &mut tx_pool - .stempool - .entries - .iter_mut() - .filter(|x| x.state == PoolEntryState::Fresh) - .collect::>(); - - if fresh_entries.len() > 0 { - debug!( - "dand_mon: Found {} fresh entries in stempool.", - fresh_entries.len() - ); - - for x in &mut fresh_entries.iter_mut() { - let random = rng.gen_range(0, 101); - if random <= dandelion_config.stem_probability.unwrap() { - x.state = PoolEntryState::ToStem; - } else { - x.state = PoolEntryState::ToFluff; - } - } - } + tx_pool.add_to_pool(&src, &agg_tx, false, &header)?; Ok(()) } fn process_expired_entries( - dandelion_config: DandelionConfig, - tx_pool: Arc>, + dandelion_config: &DandelionConfig, + tx_pool: &Arc>, ) -> Result<(), PoolError> { let now = Utc::now().timestamp(); let embargo_sec = dandelion_config.embargo_secs.unwrap() + thread_rng().gen_range(0, 31); @@ -231,24 +143,26 @@ fn process_expired_entries( } } - if expired_entries.len() > 0 { - debug!("dand_mon: Found {} expired txs.", expired_entries.len()); + if expired_entries.is_empty() { + return Ok(()); + } - { - let mut tx_pool = tx_pool.write(); - let header = tx_pool.chain_head()?; - - for entry in expired_entries { - let src = TxSource { - debug_name: "embargo_expired".to_string(), - identifier: "?.?.?.?".to_string(), - }; - match tx_pool.add_to_pool(src, entry.tx, false, &header) { - Ok(_) => debug!("dand_mon: embargo expired, fluffed tx successfully."), - Err(e) => debug!("dand_mon: Failed to fluff expired tx - {:?}", e), - }; - } - } + debug!("dand_mon: Found {} expired txs.", expired_entries.len()); + + let mut tx_pool = tx_pool.write(); + let header = tx_pool.chain_head()?; + + let src = TxSource { + debug_name: "embargo_expired".to_string(), + identifier: "?.?.?.?".to_string(), + }; + + for entry in expired_entries { + match tx_pool.add_to_pool(&src, &entry.tx, false, &header) { + Ok(_) => debug!("dand_mon: embargo expired, fluffed tx successfully."), + Err(e) => debug!("dand_mon: Failed to fluff expired tx - {:?}", e), + }; } + Ok(()) } diff --git a/servers/src/grin/seed.rs b/servers/src/grin/seed.rs index dd6f2d1348..dd66db608c 100644 --- a/servers/src/grin/seed.rs +++ b/servers/src/grin/seed.rs @@ -119,8 +119,6 @@ pub fn connect_and_monitor( preferred_peers.clone(), ); - update_dandelion_relay(peers.clone(), dandelion_config.clone()); - prev = Utc::now(); start_attempt = cmp::min(6, start_attempt + 1); } @@ -244,20 +242,22 @@ fn monitor_peers( } } -fn update_dandelion_relay(peers: Arc, dandelion_config: DandelionConfig) { - // Dandelion Relay Updater - let dandelion_relay = peers.get_dandelion_relay(); - if let Some((last_added, _)) = dandelion_relay { - let dandelion_interval = Utc::now().timestamp() - last_added; - if dandelion_interval >= dandelion_config.relay_secs.unwrap() as i64 { - debug!("monitor_peers: updating expired dandelion relay"); - peers.update_dandelion_relay(); - } - } else { - debug!("monitor_peers: no dandelion relay updating"); - peers.update_dandelion_relay(); - } -} +// fn update_dandelion_relay(peers: Arc, dandelion_config: DandelionConfig) { +// // Dandelion Relay Updater +// let dandelion_relay = peers.get_dandelion_relay(); +// if dandelion_relay.is_empty() { +// debug!("monitor_peers: no dandelion relay updating"); +// peers.update_dandelion_relay(); +// } else { +// for last_added in dandelion_relay.keys() { +// let dandelion_interval = Utc::now().timestamp() - last_added; +// if dandelion_interval >= dandelion_config.relay_secs.unwrap() as i64 { +// debug!("monitor_peers: updating expired dandelion relay"); +// peers.update_dandelion_relay(); +// } +// } +// } +// } // Check if we have any pre-existing peer in db. If so, start with those, // otherwise use the seeds provided. diff --git a/servers/src/grin/server.rs b/servers/src/grin/server.rs index ac25eb3f7e..1ba899e842 100644 --- a/servers/src/grin/server.rs +++ b/servers/src/grin/server.rs @@ -118,7 +118,7 @@ impl Server { let verifier_cache = Arc::new(RwLock::new(LruVerifierCache::new())); let pool_adapter = Arc::new(PoolToChainAdapter::new()); - let pool_net_adapter = Arc::new(PoolToNetAdapter::new()); + let pool_net_adapter = Arc::new(PoolToNetAdapter::new(config.dandelion_config.clone())); let tx_pool = Arc::new(RwLock::new(pool::TransactionPool::new( config.pool_config.clone(), pool_adapter.clone(), @@ -174,6 +174,8 @@ impl Server { genesis.hash(), stop_state.clone(), )?); + + // Initialize various adapters with our set of connected peers. chain_adapter.init(p2p_server.peers.clone()); pool_net_adapter.init(p2p_server.peers.clone()); net_adapter.init(p2p_server.peers.clone()); @@ -248,6 +250,7 @@ impl Server { dandelion_monitor::monitor_transactions( config.dandelion_config.clone(), tx_pool.clone(), + pool_net_adapter.clone(), verifier_cache.clone(), stop_state.clone(), );