diff --git a/Cargo.lock b/Cargo.lock index db9e9099..0a64513d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1081,6 +1081,7 @@ dependencies = [ "protobuf", "regex", "ring", + "rusqlite", "serde", "thiserror", "tokio", diff --git a/mgmtd/Cargo.toml b/mgmtd/Cargo.toml index 8ed44dbd..e151baa4 100644 --- a/mgmtd/Cargo.toml +++ b/mgmtd/Cargo.toml @@ -18,7 +18,7 @@ path = "src/main.rs" sqlite = { path = "../sqlite" } [dependencies] -shared = { path = "../shared", features = ["grpc"] } +shared = { path = "../shared", features = ["grpc", "sqlite"] } protobuf = { workspace = true } sqlite = { path = "../sqlite" } sqlite_check = { path = "../sqlite_check" } diff --git a/mgmtd/src/app.rs b/mgmtd/src/app.rs index c978c811..71389c14 100644 --- a/mgmtd/src/app.rs +++ b/mgmtd/src/app.rs @@ -54,7 +54,7 @@ pub(crate) trait App: Debug + Clone + Send + 'static { /// Send a [Msg] to a node via TCP and receive the response fn request( &self, - node_uid: Uid, + node_uid: &Uid, msg: &M, ) -> impl Future> + Send; diff --git a/mgmtd/src/app/runtime.rs b/mgmtd/src/app/runtime.rs index 74312007..34038e29 100644 --- a/mgmtd/src/app/runtime.rs +++ b/mgmtd/src/app/runtime.rs @@ -2,13 +2,15 @@ use super::*; use crate::ClientPulledStateNotification; use crate::bee_msg::dispatch_request; use crate::license::LicenseVerifier; +use crate::types::SqliteEnumExt; use anyhow::Result; use protobuf::license::GetCertDataResult; use rusqlite::{Connection, Transaction}; use shared::conn::msg_dispatch::{DispatchRequest, Request}; use shared::conn::outgoing::Pool; use shared::run_state::WeakRunStateHandle; -use sqlite::Connections; +use sqlite::{Connections, TransactionExt, rarray_param}; +use sqlite_check::sql; use std::fmt::Debug; use std::ops::Deref; use tokio::sync::mpsc; @@ -113,7 +115,7 @@ impl App for RuntimeApp { async fn request( &self, - node_uid: Uid, + node_uid: &Uid, msg: &M, ) -> Result { Pool::request(&self.conn, node_uid, msg).await @@ -126,22 +128,24 @@ impl App for RuntimeApp { ) { log::trace!("NOTIFICATION to {node_types:?}: {msg:?}"); - for t in node_types { - if let Err(err) = async { - let nodes = self - .read_tx(move |tx| crate::db::node::get_with_type(tx, *t)) - .await?; - - self.conn - .broadcast_datagram(nodes.into_iter().map(|e| e.uid), msg) - .await?; - - Ok(()) as Result<_> - } - .await - { - log::error!("Notification could not be sent to all {t} nodes: {err:#}"); - } + if let Err(err) = async { + let nodes: Vec<_> = self + .read_tx(move |tx| { + Ok(tx.query_map_collect( + sql!("SELECT node_uid, alias FROM nodes_ext WHERE node_type IN rarray(?1)"), + [&rarray_param(node_types.iter().map(|t| t.sql_variant()))], + |row| Ok(Uid::with_info(row.get(0)?, row.get::<_, String>(1)?)), + )?) + }) + .await?; + + self.conn.broadcast_datagram(nodes, msg).await?; + + Ok(()) as Result<_> + } + .await + { + log::error!("Notification could not be sent to all {node_types:?} nodes: {err:#}"); } } diff --git a/mgmtd/src/app/test.rs b/mgmtd/src/app/test.rs index 7d5f4242..660c1a3c 100644 --- a/mgmtd/src/app/test.rs +++ b/mgmtd/src/app/test.rs @@ -129,7 +129,7 @@ impl App for TestApp { async fn request( &self, - _node_uid: Uid, + _node_uid: &Uid, msg: &M, ) -> Result { let mut d = self.data.lock().unwrap(); diff --git a/mgmtd/src/bee_msg/change_target_consistency_states.rs b/mgmtd/src/bee_msg/change_target_consistency_states.rs index efe66a42..bd5beb9c 100644 --- a/mgmtd/src/bee_msg/change_target_consistency_states.rs +++ b/mgmtd/src/bee_msg/change_target_consistency_states.rs @@ -116,7 +116,7 @@ mod test { // No change of consistency states let msg = ChangeTargetConsistencyStates { node_type: NodeType::Storage, - target_ids: vec![1, 5], + target_ids: vec![1.into(), 5.into()], old_states: vec![TargetConsistencyState::Good, TargetConsistencyState::Good], new_states: vec![TargetConsistencyState::Good, TargetConsistencyState::Good], ack_id: "".into(), @@ -136,7 +136,7 @@ mod test { // Change of consistency states let msg = ChangeTargetConsistencyStates { node_type: NodeType::Storage, - target_ids: vec![1, 5], + target_ids: vec![1.into(), 5.into()], old_states: vec![TargetConsistencyState::Good, TargetConsistencyState::Good], new_states: vec![ TargetConsistencyState::NeedsResync, @@ -179,7 +179,7 @@ mod test { // Mismatch of reported old state should not change the consistency states let msg = ChangeTargetConsistencyStates { node_type: NodeType::Storage, - target_ids: vec![1], + target_ids: vec![1.into()], old_states: vec![TargetConsistencyState::NeedsResync], new_states: vec![TargetConsistencyState::Bad], ack_id: "".into(), diff --git a/mgmtd/src/bee_msg/common.rs b/mgmtd/src/bee_msg/common.rs index 6d371867..49f666ae 100644 --- a/mgmtd/src/bee_msg/common.rs +++ b/mgmtd/src/bee_msg/common.rs @@ -44,7 +44,7 @@ pub(super) async fn update_node(msg: RegisterNode, app: &impl App) -> Result= licensed_machines { bail!("Licensed machine limit reached. Node registration denied."); @@ -54,7 +54,7 @@ pub(super) async fn update_node(msg: RegisterNode, app: &impl App) -> Result TargetId::try_from(n).map_err(|_| anyhow!("{n} is not a valid numeric meta node id (must be between 1 and 65535)"))?, }; @@ -147,10 +147,10 @@ client version < 8.0)" db::target::insert( tx, - target_id, + &target_id, tk, NodeTypeServer::Meta, - Some(target_id.into()), + Some(target_id.raw().into()), )?; // If this is the first meta target, set it as meta root @@ -166,7 +166,7 @@ client version < 8.0)" // Update the corresponding nic lists db::node_nic::replace( tx, - node.uid, + &node.uid, msg.nics.iter().map(|e| ReplaceNic { nic_type: e.nic_type, addr: &e.addr, @@ -186,7 +186,7 @@ client version < 8.0)" .await?; app.replace_node_addrs( - node.uid, + node.uid.clone(), nics.clone() .into_iter() .map(|e| SocketAddr::new(e.addr, msg.port)) @@ -219,7 +219,7 @@ client version < 8.0)" root_num_id: match meta_root { MetaRoot::Unknown => 0, MetaRoot::Normal(node_id, _) => node_id, - MetaRoot::Mirrored(group_id) => group_id.into(), + MetaRoot::Mirrored(ref group_id) => group_id.raw().into(), }, is_root_mirrored: match meta_root { MetaRoot::Unknown | MetaRoot::Normal(_, _) => 0, @@ -294,7 +294,7 @@ pub(super) fn update_last_contact_times( node_type: NodeTypeServer, offline_timeout: Duration, ) -> Result { - let target_ids_param = sqlite::rarray_param(target_ids.iter().copied()); + let target_ids_param = sqlite::rarray_param(target_ids.iter().cloned()); tx.execute_cached( sql!( diff --git a/mgmtd/src/bee_msg/get_node_capacity_pools.rs b/mgmtd/src/bee_msg/get_node_capacity_pools.rs index 13ecfc58..384fa5a5 100644 --- a/mgmtd/src/bee_msg/get_node_capacity_pools.rs +++ b/mgmtd/src/bee_msg/get_node_capacity_pools.rs @@ -103,7 +103,7 @@ impl HandleWithResponse for GetNodeCapacityPools { res[cp].push(t.id); } - [(0, res)].into() + [(0.into(), res)].into() } CapacityPoolQueryType::Storage => { @@ -122,8 +122,8 @@ impl HandleWithResponse for GetNodeCapacityPools { .await?; let mut res: HashMap>> = HashMap::new(); - for sp in pools { - let f_targets = targets.iter().filter(|e| e.pool_id == Some(sp)); + for sp in &pools { + let f_targets = targets.iter().filter(|e| e.pool_id.as_ref() == Some(sp)); let cp_calc = CapPoolCalculator::new( app.static_info() @@ -137,12 +137,12 @@ impl HandleWithResponse for GetNodeCapacityPools { f_targets.clone(), )?; - res.insert(sp, vec![Vec::::new(), vec![], vec![]]); + res.insert(sp.clone(), vec![Vec::::new(), vec![], vec![]]); for t in f_targets { let cp = cp_calc .cap_pool(t.free_space(), t.free_inodes()) .bee_msg_vec_index(); - res.get_mut(&sp).unwrap()[cp].push(t.id); + res.get_mut(sp).unwrap()[cp].push(t.id); } } @@ -172,7 +172,7 @@ impl HandleWithResponse for GetNodeCapacityPools { res[cp].push(e.id); } - [(0, res)].into() + [(0.into(), res)].into() } CapacityPoolQueryType::StorageMirrored => { @@ -191,8 +191,8 @@ impl HandleWithResponse for GetNodeCapacityPools { .await?; let mut cap_pools: HashMap>> = HashMap::new(); - for sp in pools { - let f_groups = groups.iter().filter(|e| e.pool_id == Some(sp)); + for sp in &pools { + let f_groups = groups.iter().filter(|e| e.pool_id.as_ref() == Some(sp)); let cp_calc = CapPoolCalculator::new( app.static_info() @@ -206,12 +206,12 @@ impl HandleWithResponse for GetNodeCapacityPools { f_groups.clone(), )?; - cap_pools.insert(sp, vec![Vec::::new(), vec![], vec![]]); + cap_pools.insert(sp.clone(), vec![Vec::::new(), vec![], vec![]]); for t in f_groups { let cp = cp_calc .cap_pool(t.free_space(), t.free_inodes()) .bee_msg_vec_index(); - cap_pools.get_mut(&sp).unwrap()[cp].push(t.id); + cap_pools.get_mut(sp).unwrap()[cp].push(t.id); } } diff --git a/mgmtd/src/bee_msg/get_nodes.rs b/mgmtd/src/bee_msg/get_nodes.rs index acacf68b..b3e6990e 100644 --- a/mgmtd/src/bee_msg/get_nodes.rs +++ b/mgmtd/src/bee_msg/get_nodes.rs @@ -7,46 +7,52 @@ impl HandleWithResponse for GetNodes { type Response = GetNodesResp; async fn handle(self, app: &impl App, _req: &mut impl Request) -> Result { - let res = app + let (nodes, meta_root) = app .read_tx(move |tx| { let node_type = self.node_type; - let res = ( - db::node::get_with_type(tx, node_type)?, - db::node_nic::get_with_type(tx, node_type)?, + let nics = db::node_nic::get_with_type(tx, node_type)?; + + let nodes: Vec<_> = tx.query_map_collect( + sql!( + "SELECT node_uid, port, alias, node_id FROM nodes_ext + WHERE node_type = ?1 ORDER BY node_id ASC" + ), + [node_type.sql_variant()], + |row| { + let uid = row.get(0)?; + let port = row.get(1)?; + Ok(Node { + alias: row.get::<_, String>(2)?.into_bytes(), + nic_list: map_bee_msg_nics( + nics.iter().filter(|e| e.node_uid == uid).cloned(), + ) + .collect(), + num_id: row.get(3)?, + port, + _unused_tcp_port: port, + node_type, + }) + }, + )?; + + Ok(( + nodes, match self.node_type { shared::types::NodeType::Meta => db::misc::get_meta_root(tx)?, _ => MetaRoot::Unknown, }, - ); - - Ok(res) + )) }) .await?; - let mut nodes: Vec = res - .0 - .into_iter() - .map(|n| Node { - alias: n.alias.into_bytes(), - num_id: n.id, - nic_list: map_bee_msg_nics(res.1.iter().filter(|e| e.node_uid == n.uid).cloned()) - .collect(), - port: n.port, - _unused_tcp_port: n.port, - node_type: n.node_type, - }) - .collect(); - - nodes.sort_by(|a, b| a.num_id.cmp(&b.num_id)); - let resp = GetNodesResp { nodes, - root_num_id: match res.2 { + root_num_id: match meta_root { MetaRoot::Unknown => 0, MetaRoot::Normal(node_id, _) => node_id, - MetaRoot::Mirrored(group_id) => group_id.into(), + MetaRoot::Mirrored(ref group_id) => group_id.raw().into(), }, - is_root_mirrored: match res.2 { + is_root_mirrored: match meta_root { MetaRoot::Unknown => 0, MetaRoot::Normal(_, _) => 0, MetaRoot::Mirrored(_) => 1, diff --git a/mgmtd/src/bee_msg/get_storage_pools.rs b/mgmtd/src/bee_msg/get_storage_pools.rs index df1eee90..8af02b9f 100644 --- a/mgmtd/src/bee_msg/get_storage_pools.rs +++ b/mgmtd/src/bee_msg/get_storage_pools.rs @@ -133,11 +133,11 @@ impl HandleWithResponse for GetStoragePools { .cap_pool(target.free_space(), target.free_inodes()) .bee_msg_vec_index(); - let target_id: TargetId = target.id; + let target_id: TargetId = target.id.into(); let node_id = target.node_id.expect("targets have a node id"); - target_map.insert(target_id, node_id); - target_cap_pools[cp].push(target.id); + target_map.insert(target_id.clone(), node_id); + target_cap_pools[cp].push(target.id.into()); if let Some(node_group) = grouped_target_cap_pools[cp].get_mut(&node_id) { node_group.push(target_id); @@ -148,12 +148,12 @@ impl HandleWithResponse for GetStoragePools { // Only collect buddy groups belonging to the current pool for group in f_buddy_groups { - buddy_group_vec.push(group.id); + buddy_group_vec.push(group.id.into()); let cp = cp_buddy_groups_calc .cap_pool(group.free_space(), group.free_inodes()) .bee_msg_vec_index(); - buddy_group_cap_pools[cp].push(group.id); + buddy_group_cap_pools[cp].push(group.id.into()); } Ok(StoragePool { diff --git a/mgmtd/src/bee_msg/heartbeat_request.rs b/mgmtd/src/bee_msg/heartbeat_request.rs index d8413a10..b01bae3b 100644 --- a/mgmtd/src/bee_msg/heartbeat_request.rs +++ b/mgmtd/src/bee_msg/heartbeat_request.rs @@ -11,7 +11,7 @@ impl HandleWithResponse for HeartbeatRequest { Ok(( db::entity::get_alias(tx, MGMTD_UID)? .ok_or_else(|| TypedError::value_not_found("management uid", MGMTD_UID))?, - db::node_nic::get_with_node(tx, MGMTD_UID)?, + db::node_nic::get_with_node(tx, &MGMTD_UID)?, )) }) .await diff --git a/mgmtd/src/bee_msg/map_targets.rs b/mgmtd/src/bee_msg/map_targets.rs index bb7ee7d6..0dd25f75 100644 --- a/mgmtd/src/bee_msg/map_targets.rs +++ b/mgmtd/src/bee_msg/map_targets.rs @@ -8,7 +8,7 @@ impl HandleWithResponse for MapTargets { async fn handle(self, app: &impl App, _req: &mut impl Request) -> Result { fail_on_pre_shutdown(app)?; - let target_ids = self.target_ids.keys().copied().collect::>(); + let target_ids = self.target_ids.keys().cloned().collect::>(); let updated = app .write_tx(move |tx| { diff --git a/mgmtd/src/bee_msg/register_target.rs b/mgmtd/src/bee_msg/register_target.rs index 861c3b2a..94529f2d 100644 --- a/mgmtd/src/bee_msg/register_target.rs +++ b/mgmtd/src/bee_msg/register_target.rs @@ -17,7 +17,7 @@ impl HandleWithResponse for RegisterTarget { tx, EntityType::Target, NodeType::Storage, - self.target_id.into(), + self.target_id.raw().into(), )? { // If the target already exists, check if the registration tokens match let stored_reg_token: Option = tx.query_row( diff --git a/mgmtd/src/bee_msg/remove_node.rs b/mgmtd/src/bee_msg/remove_node.rs index 29b318b5..39c2a6b1 100644 --- a/mgmtd/src/bee_msg/remove_node.rs +++ b/mgmtd/src/bee_msg/remove_node.rs @@ -28,7 +28,7 @@ For server nodes, the grpc handler must be used." } .resolve(tx, EntityType::Node)?; - db::node::delete(tx, node.uid)?; + db::node::delete(tx, &node.uid)?; Ok(node) }) diff --git a/mgmtd/src/bee_msg/request_exceeded_quota.rs b/mgmtd/src/bee_msg/request_exceeded_quota.rs index cf9915ce..71706b4d 100644 --- a/mgmtd/src/bee_msg/request_exceeded_quota.rs +++ b/mgmtd/src/bee_msg/request_exceeded_quota.rs @@ -17,8 +17,8 @@ impl HandleWithResponse for RequestExceededQuota { .read_tx(move |tx| { // Quota is calculated per pool, so if a target ID is given, use its assigned pools // ID. - let pool_id = if self.pool_id != 0 { - self.pool_id + let pool_id = if self.pool_id.raw() != 0 { + self.pool_id.clone() } else { tx.query_row_cached( sql!("SELECT pool_id FROM storage_targets WHERE target_id = ?1"), @@ -77,8 +77,8 @@ mod test { RequestExceededQuota { id_type: QuotaIdType::User, quota_type: QuotaType::Space, - pool_id: 1, - target_id: 0, + pool_id: 1.into(), + target_id: 0.into(), }, &[2, 4, 10], ), @@ -86,8 +86,8 @@ mod test { RequestExceededQuota { id_type: QuotaIdType::Group, quota_type: QuotaType::Space, - pool_id: 1, - target_id: 0, + pool_id: 1.into(), + target_id: 0.into(), }, &[2, 4, 11], ), @@ -95,8 +95,8 @@ mod test { RequestExceededQuota { id_type: QuotaIdType::User, quota_type: QuotaType::Inode, - pool_id: 1, - target_id: 0, + pool_id: 1.into(), + target_id: 0.into(), }, &[2, 4, 12], ), @@ -104,8 +104,8 @@ mod test { RequestExceededQuota { id_type: QuotaIdType::Group, quota_type: QuotaType::Inode, - pool_id: 1, - target_id: 0, + pool_id: 1.into(), + target_id: 0.into(), }, &[2, 4, 13], ), @@ -113,8 +113,8 @@ mod test { RequestExceededQuota { id_type: QuotaIdType::User, quota_type: QuotaType::Space, - pool_id: 2, - target_id: 0, + pool_id: 2.into(), + target_id: 0.into(), }, &[20], ), @@ -122,8 +122,8 @@ mod test { RequestExceededQuota { id_type: QuotaIdType::Group, quota_type: QuotaType::Space, - pool_id: 2, - target_id: 0, + pool_id: 2.into(), + target_id: 0.into(), }, &[], ), @@ -131,8 +131,8 @@ mod test { RequestExceededQuota { id_type: QuotaIdType::User, quota_type: QuotaType::Inode, - pool_id: 2, - target_id: 0, + pool_id: 2.into(), + target_id: 0.into(), }, &[], ), @@ -140,8 +140,8 @@ mod test { RequestExceededQuota { id_type: QuotaIdType::Group, quota_type: QuotaType::Inode, - pool_id: 2, - target_id: 0, + pool_id: 2.into(), + target_id: 0.into(), }, &[], ), @@ -149,8 +149,8 @@ mod test { RequestExceededQuota { id_type: QuotaIdType::User, quota_type: QuotaType::Space, - pool_id: 0, - target_id: 2, + pool_id: 0.into(), + target_id: 2.into(), }, &[20], ), @@ -158,8 +158,8 @@ mod test { RequestExceededQuota { id_type: QuotaIdType::User, quota_type: QuotaType::Space, - pool_id: 4, - target_id: 0, + pool_id: 4.into(), + target_id: 0.into(), }, &[], ), @@ -167,8 +167,8 @@ mod test { RequestExceededQuota { id_type: QuotaIdType::User, quota_type: QuotaType::Space, - pool_id: 0, - target_id: 12, // Pool 4 + pool_id: 0.into(), + target_id: 12.into(), // Pool 4 }, &[], ), diff --git a/mgmtd/src/db/buddy_group.rs b/mgmtd/src/db/buddy_group.rs index 32085bb2..22543c8e 100644 --- a/mgmtd/src/db/buddy_group.rs +++ b/mgmtd/src/db/buddy_group.rs @@ -16,7 +16,7 @@ pub(crate) fn validate_ids( sql!("SELECT COUNT(*) FROM buddy_groups WHERE node_type = ?1 AND group_id IN rarray(?2)"), params![ node_type.sql_variant(), - &rarray_param(group_ids.iter().copied()), + &rarray_param(group_ids.iter().cloned()), ], |row| row.get(0), )?; @@ -49,13 +49,14 @@ pub(crate) fn insert( p_target_id: TargetId, s_target_id: TargetId, ) -> Result<(Uid, BuddyGroupId)> { - let group_id = if group_id == 0 { + let group_id = if group_id.is_zero() { misc::find_new_id(tx, "buddy_groups", "group_id", node_type.into(), 1..=0xFFFF)? + .try_into()? } else if try_resolve_num_id( tx, EntityType::BuddyGroup, node_type.into(), - group_id.into(), + group_id.raw().into(), )? .is_some() { @@ -65,7 +66,7 @@ pub(crate) fn insert( }; // Check targets exist - target::validate_ids(tx, &[p_target_id, s_target_id], node_type)?; + target::validate_ids(tx, &[p_target_id.clone(), s_target_id.clone()], node_type)?; // Check that both given target IDs are not assigned to any buddy group if tx.query_row( @@ -74,7 +75,7 @@ pub(crate) fn insert( WHERE node_type = ?1 AND (p_target_id IN (?2, ?3) OR s_target_id IN (?2, ?3))" ), - params![node_type.sql_variant(), p_target_id, s_target_id], + params![node_type.sql_variant(), &p_target_id, &s_target_id], |row| row.get::<_, i64>(0), )? > 0 { @@ -90,8 +91,8 @@ pub(crate) fn insert( "SELECT pool_id FROM storage_targets WHERE target_id = ?1" ))?; - let p_pool_id: PoolId = check.query_row([p_target_id], |row| row.get(0))?; - let s_pool_id: PoolId = check.query_row([s_target_id], |row| row.get(0))?; + let p_pool_id: PoolId = check.query_row([&p_target_id], |row| row.get(0))?; + let s_pool_id: PoolId = check.query_row([&s_target_id], |row| row.get(0))?; if p_pool_id != s_pool_id { bail!("Primary and secondary target are not assigned to the same storage pool"); @@ -112,7 +113,7 @@ pub(crate) fn insert( let alias = if let Some(alias) = alias { alias } else { - format!("buddy_group_{}_{}", node_type.user_str(), group_id).try_into()? + format!("buddy_group_{}_{}", node_type.user_str(), group_id.raw()).try_into()? }; // Insert entity @@ -121,7 +122,7 @@ pub(crate) fn insert( let pool_id: Option = if matches!(node_type, NodeTypeServer::Storage) { tx.query_row( sql!("SELECT pool_id FROM storage_targets WHERE target_id = ?1"), - [p_target_id], + [&p_target_id], |row| row.get(0), )? } else { @@ -139,8 +140,8 @@ pub(crate) fn insert( new_uid, node_type.sql_variant(), group_id, - p_target_id, - s_target_id, + &p_target_id, + &s_target_id, pool_id ], )?; @@ -154,7 +155,12 @@ pub(crate) fn update_storage_pools( new_pool_id: PoolId, group_ids: &[BuddyGroupId], ) -> Result<()> { - let _ = resolve_num_id(tx, EntityType::Pool, NodeType::Storage, new_pool_id.into())?; + let _ = resolve_num_id( + tx, + EntityType::Pool, + NodeType::Storage, + new_pool_id.raw().into(), + )?; validate_ids(tx, group_ids, NodeTypeServer::Storage)?; @@ -164,7 +170,7 @@ pub(crate) fn update_storage_pools( ), params![ new_pool_id, - rarray_param(group_ids.iter().copied()), + rarray_param(group_ids.iter().cloned()), NodeType::Storage.sql_variant() ], )?; @@ -231,7 +237,7 @@ pub(crate) fn check_and_swap_buddies( /// # Return value /// Returns the UIDs of the primary and the secondary node which own the primary and secondary /// target of the given group. -pub(crate) fn prepare_storage_deletion(tx: &Transaction, id: BuddyGroupId) -> Result<(Uid, Uid)> { +pub(crate) fn prepare_storage_deletion(tx: &Transaction, id: &BuddyGroupId) -> Result<(Uid, Uid)> { if tx.query_row(sql!("SELECT COUNT(*) FROM client_nodes"), [], |row| { row.get::<_, i64>(0) })? > 0 @@ -260,7 +266,7 @@ pub(crate) fn prepare_storage_deletion(tx: &Transaction, id: BuddyGroupId) -> Re /// /// This expects that the nodes owning the affected targets have already been notified and the /// groups deleted. -pub(crate) fn delete_storage(tx: &Transaction, group_id: BuddyGroupId) -> Result<()> { +pub(crate) fn delete_storage(tx: &Transaction, group_id: &BuddyGroupId) -> Result<()> { let affected = tx.execute( sql!("DELETE FROM buddy_groups WHERE group_id = ?1 AND node_type = ?2"), params![group_id, NodeType::Storage.sql_variant()], @@ -293,20 +299,20 @@ mod test { with_test_data(|tx| { super::insert( tx, - 1234, + 1234.into(), Some("g1".try_into().unwrap()), NodeTypeServer::Meta, - 3, - 4, + 3.into(), + 4.into(), ) .unwrap(); super::insert( tx, - 1, + 1.into(), Some("g2".try_into().unwrap()), NodeTypeServer::Storage, - 3, - 7, + 3.into(), + 7.into(), ) .unwrap_err(); @@ -315,7 +321,7 @@ mod test { assert_eq!(2, meta_groups.len()); assert_eq!(2, storage_groups.len()); - assert!(meta_groups.iter().any(|e| e.0 == 1234)); + assert!(meta_groups.iter().any(|e| e.0.raw() == 1234)); }) } @@ -323,12 +329,15 @@ mod test { #[test] fn update_storage_pool() { with_test_data(|tx| { - super::update_storage_pools(tx, 2, &[1]).unwrap(); - super::update_storage_pools(tx, 99, &[1]).unwrap_err(); + super::update_storage_pools(tx, 2.into(), &[1.into()]).unwrap(); + super::update_storage_pools(tx, 99.into(), &[1.into()]).unwrap_err(); let storage_groups = get_with_type(tx, NodeTypeServer::Storage).unwrap(); - assert_eq!(Some(2), storage_groups.iter().find(|e| e.0 == 1).unwrap().3); + assert_eq!( + Some(2.into()), + storage_groups.iter().find(|e| e.0.raw() == 1).unwrap().3 + ); }) } @@ -337,10 +346,10 @@ mod test { let meta_groups = get_with_type(tx, NodeTypeServer::Meta).unwrap(); let storage_groups = get_with_type(tx, NodeTypeServer::Storage).unwrap(); - assert_eq!(2, meta_groups[0].1); - assert_eq!(1, meta_groups[0].2); - assert_eq!(5, storage_groups[0].1); - assert_eq!(1, storage_groups[0].2); + assert_eq!(2, meta_groups[0].1.raw()); + assert_eq!(1, meta_groups[0].2.raw()); + assert_eq!(5, storage_groups[0].1.raw()); + assert_eq!(1, storage_groups[0].2.raw()); } /// Makes sure targets of buddy groups 1 (meta and storage) have not been swapped @@ -348,10 +357,10 @@ mod test { let meta_groups = get_with_type(tx, NodeTypeServer::Meta).unwrap(); let storage_groups = get_with_type(tx, NodeTypeServer::Storage).unwrap(); - assert_eq!(1, meta_groups[0].1); - assert_eq!(2, meta_groups[0].2); - assert_eq!(1, storage_groups[0].1); - assert_eq!(5, storage_groups[0].2); + assert_eq!(1, meta_groups[0].1.raw()); + assert_eq!(2, meta_groups[0].2.raw()); + assert_eq!(1, storage_groups[0].1.raw()); + assert_eq!(5, storage_groups[0].2.raw()); } /// Test swapping primary and secondary member (switchover) when primary runs into timeout @@ -372,12 +381,12 @@ mod test { assert!( swaps .iter() - .any(|e| e.0 == 1 && e.1 == NodeTypeServer::Meta) + .any(|e| e.0.raw() == 1 && e.1 == NodeTypeServer::Meta) ); assert!( swaps .iter() - .any(|e| e.0 == 1 && e.1 == NodeTypeServer::Storage) + .any(|e| e.0.raw() == 1 && e.1 == NodeTypeServer::Storage) ); ensure_swapped_buddies(tx); @@ -410,8 +419,8 @@ mod test { target::update_consistency_states( tx, [ - (1, TargetConsistencyState::NeedsResync), - (2, TargetConsistencyState::NeedsResync), + (1.into(), TargetConsistencyState::NeedsResync), + (2.into(), TargetConsistencyState::NeedsResync), ], NodeTypeServer::Meta, ) @@ -420,8 +429,8 @@ mod test { target::update_consistency_states( tx, [ - (1, TargetConsistencyState::NeedsResync), - (5, TargetConsistencyState::NeedsResync), + (1.into(), TargetConsistencyState::NeedsResync), + (5.into(), TargetConsistencyState::NeedsResync), ], NodeTypeServer::Storage, ) @@ -436,7 +445,7 @@ mod test { #[test] fn mounted_clients_fail_prepare_storage_deletion() { with_test_data(|tx| { - super::prepare_storage_deletion(tx, 1).unwrap_err(); + super::prepare_storage_deletion(tx, &1.into()).unwrap_err(); }) } @@ -449,7 +458,7 @@ mod test { ) .unwrap(); - let res = super::prepare_storage_deletion(tx, 1).unwrap(); + let res = super::prepare_storage_deletion(tx, &1.into()).unwrap(); assert_eq!((Uid::from(102001i64), Uid::from(102002i64)), res); }) @@ -458,7 +467,7 @@ mod test { #[test] fn delete_storage() { with_test_data(|tx| { - super::delete_storage(tx, 1).unwrap(); + super::delete_storage(tx, &1.into()).unwrap(); let groups = get_with_type(tx, NodeTypeServer::Storage).unwrap(); assert_eq!(1, groups.len()); diff --git a/mgmtd/src/db/entity.rs b/mgmtd/src/db/entity.rs index 35eee7f1..acb32e4e 100644 --- a/mgmtd/src/db/entity.rs +++ b/mgmtd/src/db/entity.rs @@ -58,7 +58,7 @@ pub(crate) fn insert(tx: &Transaction, entity_type: EntityType, alias: &Alias) - check_affected_rows(affected, [1])?; - Ok(tx.last_insert_rowid()) + Ok(tx.last_insert_rowid().into()) } #[cfg(test)] diff --git a/mgmtd/src/db/import_v7.rs b/mgmtd/src/db/import_v7.rs index d115da9b..c04aea68 100644 --- a/mgmtd/src/db/import_v7.rs +++ b/mgmtd/src/db/import_v7.rs @@ -22,7 +22,7 @@ use std::path::Path; pub fn import_v7(tx: &rusqlite::Transaction, base_path: &Path) -> Result<()> { // Check DB is new let max_uid: Uid = tx.query_row(sql!("SELECT MAX(uid) FROM entities"), [], |row| row.get(0))?; - if max_uid > 2 { + if max_uid > 2.into() { bail!("Database is not new"); } @@ -146,10 +146,10 @@ fn meta_nodes(tx: &Transaction, f: &Path) -> Result<(NodeId, bool)> { target::insert( tx, - target_id, + &target_id, None, NodeTypeServer::Meta, - Some(target_id.into()), + Some(target_id.raw().into()), )?; } @@ -242,7 +242,7 @@ fn buddy_groups(tx: &Transaction, f: &Path, nt: NodeTypeServer) -> Result<()> { .split_once('=') .ok_or_else(|| anyhow!("invalid line '{l}'"))?; - let g = BuddyGroupId::from_str_radix(g.trim(), 16)?; + let g = u16::from_str_radix(g.trim(), 16)?.into(); let (p_id, s_id) = ts .trim() .split_once(',') @@ -253,8 +253,8 @@ fn buddy_groups(tx: &Transaction, f: &Path, nt: NodeTypeServer) -> Result<()> { g, None, nt, - BuddyGroupId::from_str_radix(p_id.trim(), 16)?, - BuddyGroupId::from_str_radix(s_id.trim(), 16)?, + u16::from_str_radix(p_id.trim(), 16)?.into(), + u16::from_str_radix(s_id.trim(), 16)?.into(), )?; } @@ -272,9 +272,9 @@ fn storage_targets(tx: &Transaction, targets_path: &Path) -> Result<()> { .ok_or_else(|| anyhow!("invalid line '{}'", l))?; let node_id = NodeId::from_str_radix(node.trim(), 16)?; - let target_id = TargetId::from_str_radix(target.trim(), 16)?; + let target_id: TargetId = u16::from_str_radix(target.trim(), 16)?.into(); - target::insert_storage(tx, target_id, None)?; + target::insert_storage(tx, target_id.clone(), None)?; target::update_storage_node_mappings(tx, &[target_id], node_id)?; } @@ -323,10 +323,10 @@ fn storage_pools(tx: &Transaction, f: &Path) -> Result<()> { [alias.as_ref()], )?; } else { - storage_pool::insert(tx, pool.id, &alias)?; + storage_pool::insert(tx, pool.id.clone(), &alias)?; } - target::update_storage_pools(tx, pool.id, &pool.targets)?; + target::update_storage_pools(tx, &pool.id, &pool.targets)?; buddy_group::update_storage_pools(tx, pool.id, &pool.buddy_groups)?; used_aliases.push(alias); @@ -368,10 +368,11 @@ fn quota(tx: &Transaction, quota_path: &Path) -> Result<()> { .file_name() .into_string() .map_err(|s| anyhow!("{s:?} is not a valid storage pool directory"))? - .parse() - .map_err(|_| anyhow!("{:?} is not a valid storage pool directory", e.file_name()))?; + .parse::() + .map_err(|_| anyhow!("{:?} is not a valid storage pool directory", e.file_name()))? + .into(); - quota_default_limits(tx, &e.path().join("quotaDefaultLimits.store"), pool_id) + quota_default_limits(tx, &e.path().join("quotaDefaultLimits.store"), &pool_id) .with_context(|| { format!("quota default limits ({pool_id}/quotaDefaultLimits.store)") })?; @@ -379,7 +380,7 @@ fn quota(tx: &Transaction, quota_path: &Path) -> Result<()> { quota_limits( tx, &e.path().join("quotaUserLimits.store"), - pool_id, + &pool_id, QuotaIdType::User, ) .with_context(|| format!("quota user limits ({pool_id}/quotaUserLimits.store)"))?; @@ -387,7 +388,7 @@ fn quota(tx: &Transaction, quota_path: &Path) -> Result<()> { quota_limits( tx, &e.path().join("quotaGroupLimits.store"), - pool_id, + &pool_id, QuotaIdType::Group, ) .with_context(|| format!("quota group limits ({pool_id}/quotaGroupLimits.store)"))?; @@ -400,7 +401,7 @@ fn quota(tx: &Transaction, quota_path: &Path) -> Result<()> { } /// Imports the default quota limits -fn quota_default_limits(tx: &Transaction, f: &Path, pool_id: PoolId) -> Result<()> { +fn quota_default_limits(tx: &Transaction, f: &Path, pool_id: &PoolId) -> Result<()> { // If the file is missing, skip it let s = match std::fs::read(f) { Ok(s) => s, @@ -487,7 +488,7 @@ fn quota_default_limits(tx: &Transaction, f: &Path, pool_id: PoolId) -> Result<( fn quota_limits( tx: &Transaction, f: &Path, - pool_id: PoolId, + pool_id: &PoolId, quota_id_type: QuotaIdType, ) -> Result<()> { // If the file is missing, skip it diff --git a/mgmtd/src/db/import_v7/test.rs b/mgmtd/src/db/import_v7/test.rs index e2876f83..5611389b 100644 --- a/mgmtd/src/db/import_v7/test.rs +++ b/mgmtd/src/db/import_v7/test.rs @@ -100,16 +100,16 @@ fn import_v7_inner(base_path: &Path) { assert_eq!( res, &[ - (NodeType::Meta, 1, 1, None), - (NodeType::Meta, 2, 2, None), - (NodeType::Meta, 3, 3, None), - (NodeType::Meta, 4, 4, None), - (NodeType::Storage, 1, 1, Some(1)), - (NodeType::Storage, 2, 1, Some(2)), - (NodeType::Storage, 3, 1, Some(2)), - (NodeType::Storage, 4, 2, Some(1)), - (NodeType::Storage, 5, 2, Some(1)), - (NodeType::Storage, 6, 2, Some(2)), + (NodeType::Meta, 1.into(), 1, None), + (NodeType::Meta, 2.into(), 2, None), + (NodeType::Meta, 3.into(), 3, None), + (NodeType::Meta, 4.into(), 4, None), + (NodeType::Storage, 1.into(), 1, Some(1.into())), + (NodeType::Storage, 2.into(), 1, Some(2.into())), + (NodeType::Storage, 3.into(), 1, Some(2.into())), + (NodeType::Storage, 4.into(), 2, Some(1.into())), + (NodeType::Storage, 5.into(), 2, Some(1.into())), + (NodeType::Storage, 6.into(), 2, Some(2.into())), ] ); @@ -136,9 +136,21 @@ fn import_v7_inner(base_path: &Path) { assert_eq!( res, &[ - (NodeType::Meta, 1, 1, 2, None), - (NodeType::Storage, 1, 1, 4, Some(1)), - (NodeType::Storage, 2, 3, 6, Some(2)), + (NodeType::Meta, 1.into(), 1.into(), 2.into(), None), + ( + NodeType::Storage, + 1.into(), + 1.into(), + 4.into(), + Some(1.into()) + ), + ( + NodeType::Storage, + 2.into(), + 3.into(), + 6.into(), + Some(2.into()) + ), ] ); @@ -151,7 +163,7 @@ fn import_v7_inner(base_path: &Path) { ) .unwrap(); - assert_eq!(res, (None, Some(1))); + assert_eq!(res, (None, Some(1.into()))); // Check storage pools let res: Vec<(PoolId, String)> = tx @@ -162,7 +174,13 @@ fn import_v7_inner(base_path: &Path) { ) .unwrap(); - assert_eq!(res, &[(1, "Default".to_string()), (2, "pool2".to_string())]); + assert_eq!( + res, + &[ + (1.into(), "Default".to_string()), + (2.into(), "pool2".to_string()) + ] + ); // Check quota default limits let res: Vec<(QuotaType, QuotaIdType, PoolId, u64)> = tx @@ -186,14 +204,14 @@ fn import_v7_inner(base_path: &Path) { assert_eq!( res, &[ - (QuotaType::Space, QuotaIdType::User, 1, 1000), - (QuotaType::Space, QuotaIdType::User, 2, 2000), - (QuotaType::Space, QuotaIdType::Group, 1, 0), - (QuotaType::Space, QuotaIdType::Group, 2, 0), - (QuotaType::Inode, QuotaIdType::User, 1, 100), - (QuotaType::Inode, QuotaIdType::User, 2, 200), - (QuotaType::Inode, QuotaIdType::Group, 1, 0), - (QuotaType::Inode, QuotaIdType::Group, 2, 0), + (QuotaType::Space, QuotaIdType::User, 1.into(), 1000), + (QuotaType::Space, QuotaIdType::User, 2.into(), 2000), + (QuotaType::Space, QuotaIdType::Group, 1.into(), 0), + (QuotaType::Space, QuotaIdType::Group, 2.into(), 0), + (QuotaType::Inode, QuotaIdType::User, 1.into(), 100), + (QuotaType::Inode, QuotaIdType::User, 2.into(), 200), + (QuotaType::Inode, QuotaIdType::Group, 1.into(), 0), + (QuotaType::Inode, QuotaIdType::Group, 2.into(), 0), ] ); @@ -220,12 +238,12 @@ fn import_v7_inner(base_path: &Path) { assert_eq!( res, &[ - (QuotaType::Space, QuotaIdType::User, 0, 1, 1000), - (QuotaType::Space, QuotaIdType::User, 0, 2, 2000), - (QuotaType::Space, QuotaIdType::User, 5000, 1, 5000), - (QuotaType::Inode, QuotaIdType::User, 0, 1, 100), - (QuotaType::Inode, QuotaIdType::User, 0, 2, 200), - (QuotaType::Inode, QuotaIdType::User, 5000, 1, 500), + (QuotaType::Space, QuotaIdType::User, 0, 1.into(), 1000), + (QuotaType::Space, QuotaIdType::User, 0, 2.into(), 2000), + (QuotaType::Space, QuotaIdType::User, 5000, 1.into(), 5000), + (QuotaType::Inode, QuotaIdType::User, 0, 1.into(), 100), + (QuotaType::Inode, QuotaIdType::User, 0, 2.into(), 200), + (QuotaType::Inode, QuotaIdType::User, 5000, 1.into(), 500), ] ); diff --git a/mgmtd/src/db/misc.rs b/mgmtd/src/db/misc.rs index 259fcb78..738d1cc1 100644 --- a/mgmtd/src/db/misc.rs +++ b/mgmtd/src/db/misc.rs @@ -151,12 +151,12 @@ mod test { fn meta_root() { with_test_data(|tx| { let meta_root = super::get_meta_root(tx).unwrap(); - assert_eq!(MetaRoot::Normal(1, 101001i64), meta_root); + assert_eq!(MetaRoot::Normal(1, 101001i64.into()), meta_root); super::enable_metadata_mirroring(tx).unwrap(); let meta_root = super::get_meta_root(tx).unwrap(); - assert_eq!(MetaRoot::Mirrored(1), meta_root); + assert_eq!(MetaRoot::Mirrored(1.into()), meta_root); super::enable_metadata_mirroring(tx).unwrap_err(); }) diff --git a/mgmtd/src/db/node.rs b/mgmtd/src/db/node.rs index 0ecce5c0..f4a45657 100644 --- a/mgmtd/src/db/node.rs +++ b/mgmtd/src/db/node.rs @@ -1,72 +1,6 @@ //! Functions for node management use super::*; -use std::time::Duration; - -/// Represents a node entry. -#[derive(Clone, Debug)] -pub(crate) struct Node { - pub uid: Uid, - pub id: NodeId, - pub node_type: NodeType, - pub alias: String, - pub port: Port, -} - -impl Node { - fn from_row(row: &Row) -> rusqlite::Result { - Ok(Node { - uid: row.get(0)?, - id: row.get(1)?, - node_type: NodeType::from_row(row, 2)?, - alias: row.get(3)?, - port: row.get(4)?, - }) - } -} - -/// Retrieve a list of nodes filtered by node type. -pub(crate) fn get_with_type(tx: &Transaction, node_type: NodeType) -> Result> { - Ok(tx.query_map_collect( - sql!( - "SELECT node_uid, node_id, node_type, alias, port - FROM nodes_ext - WHERE node_type = ?1" - ), - [node_type.sql_variant()], - Node::from_row, - )?) -} - -/// Retrieve a node by its alias. -pub(crate) fn get_by_alias(tx: &Transaction, alias: &str) -> Result { - Ok(tx.query_row( - sql!( - "SELECT node_uid, node_id, node_type, alias, port - FROM nodes_ext - WHERE alias = ?1" - ), - [alias], - Node::from_row, - )?) -} - -/// Delete client nodes with a last contact time bigger than `timeout`. -/// -/// # Return value -/// Returns the number of deleted clients. -pub(crate) fn delete_stale_clients(tx: &Transaction, timeout: Duration) -> Result { - let affected = { - let mut stmt = tx.prepare_cached(sql!( - "DELETE FROM nodes - WHERE DATETIME(last_contact) < DATETIME('now', '-' || ?1 || ' seconds') - AND node_type = ?2" - ))?; - stmt.execute(params![timeout.as_secs(), NodeType::Client.sql_variant()])? - }; - - Ok(affected) -} /// Inserts a node into the database. If node_id is 0, a new ID is chosen automatically. pub(crate) fn insert( @@ -140,7 +74,7 @@ pub(crate) fn insert( /// This function is meant to be called whenever a node registers or sends a heartbeat. pub(crate) fn update( tx: &Transaction, - node_uid: Uid, + node_uid: &Uid, new_port: Port, new_machine_uuid: Option<&str>, ) -> Result<()> { @@ -195,7 +129,7 @@ pub(crate) fn count_machines( } /// Delete a node from the database. -pub(crate) fn delete(tx: &Transaction, node_uid: Uid) -> Result<()> { +pub(crate) fn delete(tx: &Transaction, node_uid: &Uid) -> Result<()> { let affected = tx.execute_cached(sql!("DELETE FROM nodes WHERE node_uid = ?1"), [node_uid])?; check_affected_rows(affected, [1]) @@ -208,7 +142,14 @@ mod test { #[test] fn insert_get_delete() { with_test_data(|tx| { - assert_eq!(5, get_with_type(tx, NodeType::Meta).unwrap().len()); + let meta_count = || { + tx.query_one("SELECT COUNT(*) FROM meta_nodes", [], |row| { + row.get::<_, i64>(0) + }) + .unwrap() + }; + + assert_eq!(meta_count(), 5); let node = insert( tx, 1234, @@ -233,58 +174,11 @@ mod test { 10000, ) .unwrap_err(); - assert_eq!(6, get_with_type(tx, NodeType::Meta).unwrap().len()); + assert_eq!(meta_count(), 6); - delete(tx, node.uid).unwrap(); - delete(tx, node.uid).unwrap_err(); - assert_eq!(5, get_with_type(tx, NodeType::Meta).unwrap().len()); + delete(tx, &node.uid).unwrap(); + delete(tx, &node.uid).unwrap_err(); + assert_eq!(meta_count(), 5); }); } - - #[test] - fn query_by_alias() { - with_test_data(|tx| { - insert( - tx, - 11, - Some("node_1".try_into().unwrap()), - NodeType::Meta, - 10000, - ) - .unwrap(); - insert( - tx, - 12, - Some("node_2".try_into().unwrap()), - NodeType::Storage, - 10000, - ) - .unwrap(); - assert_eq!(11, get_by_alias(tx, "node_1").unwrap().id); - }) - } - - #[test] - fn delete_stale_clients() { - with_test_data(|tx| { - let deleted = super::delete_stale_clients(tx, Duration::from_secs(99999)).unwrap(); - assert_eq!(0, deleted); - - tx.execute( - r#" - UPDATE nodes - SET last_contact = DATETIME("now", "-1 hour") - WHERE node_uid IN (103001, 103002) - "#, - [], - ) - .unwrap(); - - let deleted = super::delete_stale_clients(tx, Duration::from_secs(100)).unwrap(); - assert_eq!(2, deleted); - - let clients = node::get_with_type(tx, NodeType::Client).unwrap(); - assert_eq!(2, clients.len()); - }) - } } diff --git a/mgmtd/src/db/node_nic.rs b/mgmtd/src/db/node_nic.rs index 38a92957..8b2a6b86 100644 --- a/mgmtd/src/db/node_nic.rs +++ b/mgmtd/src/db/node_nic.rs @@ -72,7 +72,7 @@ pub(crate) fn map_bee_msg_nics( } /// Retrieves all node nics for a specific node -pub(crate) fn get_with_node(tx: &Transaction, node_uid: Uid) -> Result> { +pub(crate) fn get_with_node(tx: &Transaction, node_uid: &Uid) -> Result> { tx.prepare_cached(sql!( "SELECT nn.node_uid, nn.addr, n.port, nn.nic_type, nn.name FROM node_nics AS nn @@ -107,7 +107,7 @@ pub(crate) struct ReplaceNic<'a> { /// Replaces all node nics for the given node by UID. pub(crate) fn replace<'a>( tx: &Transaction, - node_uid: Uid, + node_uid: &Uid, nics: impl IntoIterator>, ) -> Result<()> { tx.execute_cached( @@ -149,10 +149,10 @@ mod test { #[test] fn get_with_node() { with_test_data(|tx| { - let addrs = super::get_with_node(tx, MGMTD_UID).unwrap(); + let addrs = super::get_with_node(tx, &MGMTD_UID).unwrap(); assert_eq!(0, addrs.len()); - let addrs = super::get_with_node(tx, 102001).unwrap(); + let addrs = super::get_with_node(tx, &102001.into()).unwrap(); assert_eq!(4, addrs.len()); }) } @@ -161,16 +161,22 @@ mod test { fn get_update() { with_test_data(|tx| { let nics = super::get_with_type(tx, NodeType::Storage).unwrap(); - assert_eq!(4, nics.iter().filter(|e| e.node_uid == 102001).count()); + assert_eq!( + 4, + nics.iter().filter(|e| e.node_uid == 102001.into()).count() + ); - super::replace(tx, 102001i64, []).unwrap(); + super::replace(tx, &102001.into(), []).unwrap(); let nics = super::get_with_type(tx, NodeType::Storage).unwrap(); - assert_eq!(0, nics.iter().filter(|e| e.node_uid == 102001).count()); + assert_eq!( + 0, + nics.iter().filter(|e| e.node_uid == 102001.into()).count() + ); super::replace( tx, - 102001i64, + &102001.into(), [ReplaceNic { addr: &Ipv4Addr::new(1, 2, 3, 4).into(), name: "test".into(), @@ -180,7 +186,10 @@ mod test { .unwrap(); let nics = super::get_with_type(tx, NodeType::Storage).unwrap(); - assert_eq!(1, nics.iter().filter(|e| e.node_uid == 102001).count()); + assert_eq!( + 1, + nics.iter().filter(|e| e.node_uid == 102001.into()).count() + ); }) } } diff --git a/mgmtd/src/db/storage_pool.rs b/mgmtd/src/db/storage_pool.rs index 4125efb0..a51581e1 100644 --- a/mgmtd/src/db/storage_pool.rs +++ b/mgmtd/src/db/storage_pool.rs @@ -4,9 +4,15 @@ use super::*; /// Inserts a storage pool entry and assigns the given targets and buddy groups to the new pool. pub(crate) fn insert(tx: &Transaction, pool_id: PoolId, alias: &Alias) -> Result<(Uid, PoolId)> { - let pool_id = if pool_id == 0 { - misc::find_new_id(tx, "pools", "pool_id", NodeType::Storage, 1..=0xFFFF)? - } else if try_resolve_num_id(tx, EntityType::Pool, NodeType::Storage, pool_id.into())?.is_some() + let pool_id = if pool_id.is_zero() { + misc::find_new_id(tx, "pools", "pool_id", NodeType::Storage, 1..=0xFFFF)?.try_into()? + } else if try_resolve_num_id( + tx, + EntityType::Pool, + NodeType::Storage, + pool_id.raw().into(), + )? + .is_some() { bail!(TypedError::value_exists("numeric pool id", pool_id)); } else { diff --git a/mgmtd/src/db/target.rs b/mgmtd/src/db/target.rs index 1ee4fcc4..37b7e1d1 100644 --- a/mgmtd/src/db/target.rs +++ b/mgmtd/src/db/target.rs @@ -13,7 +13,7 @@ pub(crate) fn validate_ids( let count: usize = tx.query_row_cached( sql!("SELECT COUNT(*) FROM targets WHERE target_id IN rarray(?1) AND node_type = ?2"), params![ - &rarray_param(target_ids.iter().copied()), + &rarray_param(target_ids.iter().cloned()), node_type.sql_variant(), ], |row| row.get(0), @@ -44,28 +44,28 @@ pub(crate) fn insert_storage( target_id: TargetId, reg_token: Option<&str>, ) -> Result { - let target_id = if target_id == 0 { - misc::find_new_id(tx, "targets", "target_id", NodeType::Storage, 1..=0xFFFF)? + let target_id = if target_id.is_zero() { + misc::find_new_id(tx, "targets", "target_id", NodeType::Storage, 1..=0xFFFF)?.try_into()? } else { target_id }; - insert(tx, target_id, reg_token, NodeTypeServer::Storage, None)?; + insert(tx, &target_id, reg_token, NodeTypeServer::Storage, None)?; Ok(target_id) } pub fn insert( tx: &Transaction, - target_id: TargetId, + target_id: &TargetId, reg_token: Option<&str>, node_type: NodeTypeServer, // This is optional because storage targets come "unmapped" node_id: Option, ) -> Result<()> { - anyhow::ensure!(target_id > 0, "A target id must be > 0"); + anyhow::ensure!(target_id.raw() > 0, "A target id must be > 0"); - let alias = format!("target_{}_{target_id}", node_type.user_str()).try_into()?; + let alias = format!("target_{}_{}", node_type.user_str(), target_id.raw()).try_into()?; let new_uid = entity::insert(tx, EntityType::Target, &alias)?; tx.execute( @@ -115,10 +115,15 @@ pub(crate) fn update_consistency_states( /// Change the storage pool of the given targets IDs to a new one. pub(crate) fn update_storage_pools( tx: &Transaction, - new_pool_id: PoolId, + new_pool_id: &PoolId, target_ids: &[TargetId], ) -> Result<()> { - let _ = resolve_num_id(tx, EntityType::Pool, NodeType::Storage, new_pool_id.into())?; + let _ = resolve_num_id( + tx, + EntityType::Pool, + NodeType::Storage, + new_pool_id.raw().into(), + )?; validate_ids(tx, target_ids, NodeTypeServer::Storage)?; @@ -126,7 +131,7 @@ pub(crate) fn update_storage_pools( sql!("UPDATE targets SET pool_id = ?1 WHERE target_id IN rarray(?2) AND node_type = ?3"), params![ new_pool_id, - &rarray_param(target_ids.iter().copied()), + &rarray_param(target_ids.iter().cloned()), NodeType::Storage.sql_variant() ], )?; @@ -200,7 +205,7 @@ pub(crate) fn get_and_update_capacities( old_values.push( select.query_row(params![i.0, node_type.sql_variant()], |row| { Ok(( - i.0, + i.0.clone(), TargetCapacities { total_space: row.get(0)?, total_inodes: row.get(1)?, @@ -241,17 +246,19 @@ mod test { #[test] fn set_get_storage_and_map() { with_test_data(|tx| { - let new_target_id = super::insert_storage(tx, 0, Some("new_storage_target")).unwrap(); - super::insert_storage(tx, 1000, Some("new_storage_target_2")).unwrap(); + let new_target_id = + super::insert_storage(tx, 0.into(), Some("new_storage_target")).unwrap(); + super::insert_storage(tx, 1000.into(), Some("new_storage_target_2")).unwrap(); // existing id - super::insert_storage(tx, 1000, Some("new_storage_target")).unwrap_err(); + super::insert_storage(tx, 1000.into(), Some("new_storage_target")).unwrap_err(); - super::update_storage_node_mappings(tx, &[new_target_id, 1000], 1).unwrap(); + super::update_storage_node_mappings(tx, &[new_target_id.clone(), 1000.into()], 1) + .unwrap(); assert_eq!( 1, - super::update_storage_node_mappings(tx, &[9999, 1], 1).unwrap() + super::update_storage_node_mappings(tx, &[9999.into(), 1.into()], 1).unwrap() ); let targets: Vec = tx @@ -263,7 +270,7 @@ mod test { assert_eq!(19, targets.len()); assert!(targets.contains(&new_target_id)); - assert!(targets.contains(&1000)); + assert!(targets.contains(&1000.into())); }) } } diff --git a/mgmtd/src/grpc/assign_pool.rs b/mgmtd/src/grpc/assign_pool.rs index 6b9e968b..8165fe68 100644 --- a/mgmtd/src/grpc/assign_pool.rs +++ b/mgmtd/src/grpc/assign_pool.rs @@ -14,7 +14,12 @@ pub(crate) async fn assign_pool( let pool = app .write_tx(move |tx| { let pool = pool.resolve(tx, EntityType::Pool)?; - do_assign(tx, pool.num_id().try_into()?, req.targets, req.buddy_groups)?; + do_assign( + tx, + &pool.num_id().try_into()?, + req.targets, + req.buddy_groups, + )?; Ok(pool) }) .await?; @@ -35,7 +40,7 @@ pub(crate) async fn assign_pool( /// Do the actual assign work pub(super) fn do_assign( tx: &Transaction, - pool_id: PoolId, + pool_id: &PoolId, targets: Vec, groups: Vec, ) -> Result<()> { @@ -57,11 +62,11 @@ pub(super) fn do_assign( for t in targets { let eid = EntityId::try_from(t)?; let target = eid.resolve(tx, EntityType::Target)?; - if check_group_membership.query_row([target.uid], |row| row.get::<_, i64>(0))? > 0 { + if check_group_membership.query_row([&target.uid], |row| row.get::<_, i64>(0))? > 0 { bail!("Target {eid} can't be assigned directly as it's part of a buddy group"); } - assign_target.execute(params![pool_id, target.uid])?; + assign_target.execute(params![pool_id, &target.uid])?; } let mut assign_group = tx.prepare_cached(sql!( diff --git a/mgmtd/src/grpc/create_buddy_group.rs b/mgmtd/src/grpc/create_buddy_group.rs index cd561193..48cc243b 100644 --- a/mgmtd/src/grpc/create_buddy_group.rs +++ b/mgmtd/src/grpc/create_buddy_group.rs @@ -35,7 +35,7 @@ pub(crate) async fn create_buddy_group( alias, legacy_id: LegacyId { node_type: node_type.into(), - num_id: group_id.into(), + num_id: group_id.raw().into(), }, }, p_target, diff --git a/mgmtd/src/grpc/create_pool.rs b/mgmtd/src/grpc/create_pool.rs index 440c658f..77f78d06 100644 --- a/mgmtd/src/grpc/create_pool.rs +++ b/mgmtd/src/grpc/create_pool.rs @@ -20,7 +20,7 @@ pub(crate) async fn create_pool( let (pool_uid, alias, pool_id) = app .write_tx(move |tx| { let (pool_uid, pool_id) = db::storage_pool::insert(tx, num_id, &alias)?; - do_assign(tx, pool_id, req.targets, req.buddy_groups)?; + do_assign(tx, &pool_id, req.targets, req.buddy_groups)?; Ok((pool_uid, alias, pool_id)) }) .await?; @@ -30,7 +30,7 @@ pub(crate) async fn create_pool( alias, legacy_id: LegacyId { node_type: NodeType::Storage, - num_id: pool_id.into(), + num_id: pool_id.raw().into(), }, }; diff --git a/mgmtd/src/grpc/delete_buddy_group.rs b/mgmtd/src/grpc/delete_buddy_group.rs index 48d0c9f1..256fd137 100644 --- a/mgmtd/src/grpc/delete_buddy_group.rs +++ b/mgmtd/src/grpc/delete_buddy_group.rs @@ -27,7 +27,7 @@ pub(crate) async fn delete_buddy_group( } let (p_node_uid, s_node_uid) = - db::buddy_group::prepare_storage_deletion(&tx, group.num_id().try_into()?)?; + db::buddy_group::prepare_storage_deletion(&tx, &group.num_id().try_into()?)?; if execute { tx.commit()?; @@ -40,13 +40,13 @@ pub(crate) async fn delete_buddy_group( let group_id: BuddyGroupId = group.num_id().try_into()?; let remove_bee_msg = RemoveBuddyGroup { node_type: NodeType::Storage, - group_id, + group_id: group_id.clone(), check_only: if execute { 0 } else { 1 }, force: 0, }; - let p_res: RemoveBuddyGroupResp = app.request(p_node_uid, &remove_bee_msg).await?; - let s_res: RemoveBuddyGroupResp = app.request(s_node_uid, &remove_bee_msg).await?; + let p_res: RemoveBuddyGroupResp = app.request(&p_node_uid, &remove_bee_msg).await?; + let s_res: RemoveBuddyGroupResp = app.request(&s_node_uid, &remove_bee_msg).await?; if p_res.result != OpsErr::SUCCESS || s_res.result != OpsErr::SUCCESS { bail!( @@ -61,7 +61,7 @@ Primary result: {:?}, Secondary result: {:?}", app.db_conn(move |conn| { let tx = conn.transaction()?; - db::buddy_group::delete_storage(&tx, group_id)?; + db::buddy_group::delete_storage(&tx, &group_id)?; if execute { tx.commit()?; diff --git a/mgmtd/src/grpc/delete_node.rs b/mgmtd/src/grpc/delete_node.rs index 156ba947..e1a1b18a 100644 --- a/mgmtd/src/grpc/delete_node.rs +++ b/mgmtd/src/grpc/delete_node.rs @@ -54,7 +54,7 @@ pub(crate) async fn delete_node( } else { let assigned_targets: usize = tx.query_row_cached( sql!("SELECT COUNT(*) FROM targets_ext WHERE node_uid = ?1"), - [node.uid], + [&node.uid], |row| row.get(0), )?; @@ -63,7 +63,7 @@ pub(crate) async fn delete_node( } } - db::node::delete(&tx, node.uid)?; + db::node::delete(&tx, &node.uid)?; if execute { tx.commit()?; diff --git a/mgmtd/src/grpc/delete_pool.rs b/mgmtd/src/grpc/delete_pool.rs index 23b5e988..5cfc852e 100644 --- a/mgmtd/src/grpc/delete_pool.rs +++ b/mgmtd/src/grpc/delete_pool.rs @@ -37,7 +37,8 @@ are still assigned to this pool" ) } - let affected = tx.execute(sql!("DELETE FROM pools WHERE pool_uid = ?1"), [pool.uid])?; + let affected = + tx.execute(sql!("DELETE FROM pools WHERE pool_uid = ?1"), [&pool.uid])?; check_affected_rows(affected, [1])?; if execute { diff --git a/mgmtd/src/grpc/delete_target.rs b/mgmtd/src/grpc/delete_target.rs index 1439949f..d677663b 100644 --- a/mgmtd/src/grpc/delete_target.rs +++ b/mgmtd/src/grpc/delete_target.rs @@ -27,7 +27,7 @@ pub(crate) async fn delete_target( "SELECT COUNT(*) FROM buddy_groups_ext WHERE p_target_uid = ?1 OR s_target_uid = ?1" ), - [target.uid], + [&target.uid], |row| row.get(0), )?; diff --git a/mgmtd/src/grpc/get_buddy_groups.rs b/mgmtd/src/grpc/get_buddy_groups.rs index fc8fec0c..952ab1aa 100644 --- a/mgmtd/src/grpc/get_buddy_groups.rs +++ b/mgmtd/src/grpc/get_buddy_groups.rs @@ -53,7 +53,7 @@ pub(crate) async fn get_buddy_groups( }), storage_pool: if let Some(uid) = row.get::<_, Option>(10)? { Some(pb::EntityIdSet { - uid: Some(uid), + uid: Some(uid.into()), legacy_id: Some(pb::LegacyId { num_id: row.get(11)?, node_type, diff --git a/mgmtd/src/grpc/get_nodes.rs b/mgmtd/src/grpc/get_nodes.rs index dac810b7..9649fcb2 100644 --- a/mgmtd/src/grpc/get_nodes.rs +++ b/mgmtd/src/grpc/get_nodes.rs @@ -139,7 +139,7 @@ pub(crate) async fn get_nodes( for node in &mut nodes { node.nics = nics .iter() - .filter(|(uid, _)| node.id.as_ref().is_some_and(|e| e.uid == Some(*uid))) + .filter(|(uid, _)| node.id.as_ref().is_some_and(|e| e.uid == Some(uid.raw()))) .cloned() .map(|(_, mut nic)| { nic.addr = SocketAddr::new( diff --git a/mgmtd/src/grpc/get_pools.rs b/mgmtd/src/grpc/get_pools.rs index 83574406..0b5c120f 100644 --- a/mgmtd/src/grpc/get_pools.rs +++ b/mgmtd/src/grpc/get_pools.rs @@ -117,13 +117,13 @@ pub(crate) async fn get_pools( // Merge pool, target and buddy group lists together for p in &mut pools { for t in &targets { - if p.id.as_ref().is_some_and(|e| e.uid == Some(t.0)) { + if p.id.as_ref().is_some_and(|e| e.uid == Some(t.0.raw())) { p.targets.push(t.1.clone()); } } for t in &buddy_groups { - if p.id.as_ref().is_some_and(|e| e.uid == Some(t.0)) { + if p.id.as_ref().is_some_and(|e| e.uid == Some(t.0.raw())) { p.buddy_groups.push(t.1.clone()); } } diff --git a/mgmtd/src/grpc/get_targets.rs b/mgmtd/src/grpc/get_targets.rs index 906349e7..9e6b59c2 100644 --- a/mgmtd/src/grpc/get_targets.rs +++ b/mgmtd/src/grpc/get_targets.rs @@ -69,7 +69,7 @@ pub(crate) async fn get_targets( node, storage_pool: if let Some(uid) = row.get::<_, Option>(7)? { Some(pb::EntityIdSet { - uid: Some(uid), + uid: Some(uid.into()), legacy_id: Some(pb::LegacyId { num_id: row.get(9)?, node_type, @@ -148,14 +148,14 @@ pub(crate) async fn get_targets( targets.iter().filter(|t| { t.storage_pool .as_ref() - .is_some_and(|e| e.uid == Some(sp_uid)) + .is_some_and(|e| e.uid == Some(sp_uid.raw())) }), )?; for t in targets.iter_mut().filter(|t| { t.storage_pool .as_ref() - .is_some_and(|e| e.uid == Some(sp_uid)) + .is_some_and(|e| e.uid == Some(sp_uid.raw())) }) { if let Some(fs) = t.free_space_bytes && let Some(fi) = t.free_inodes diff --git a/mgmtd/src/grpc/mirror_root_inode.rs b/mgmtd/src/grpc/mirror_root_inode.rs index 8f6cb303..b2a6f282 100644 --- a/mgmtd/src/grpc/mirror_root_inode.rs +++ b/mgmtd/src/grpc/mirror_root_inode.rs @@ -76,7 +76,7 @@ communicated during the last {offline_timeout}s." }) .await?; - let resp: SetMetadataMirroringResp = app.request(meta_root, &SetMetadataMirroring {}).await?; + let resp: SetMetadataMirroringResp = app.request(&meta_root, &SetMetadataMirroring {}).await?; match resp.result { OpsErr::SUCCESS => app.write_tx(db::misc::enable_metadata_mirroring).await?, diff --git a/mgmtd/src/grpc/set_alias.rs b/mgmtd/src/grpc/set_alias.rs index 800194e6..09bf14a4 100644 --- a/mgmtd/src/grpc/set_alias.rs +++ b/mgmtd/src/grpc/set_alias.rs @@ -48,33 +48,37 @@ pub(crate) async fn set_alias( // If the entity is a node, notify all nodes about the changed alias if entity_type == EntityType::Node { - let (entity, node, nic_list) = app + let heartbeat_msg = app .write_tx(move |tx| { let entity = update_alias_fn(tx, &new_alias)?; - let node = db::node::get_by_alias(tx, new_alias.as_ref())?; - let nic_list = db::node_nic::get_with_node(tx, entity.uid)?; - - Ok((entity, node, nic_list)) + let port = tx.query_one( + sql!("SELECT port FROM nodes WHERE node_uid = ?1"), + params![entity.uid], + |row| row.get(0), + )?; + let nic_list = db::node_nic::get_with_node(tx, &entity.uid)?; + + Ok(Heartbeat { + instance_version: 0, + nic_list_version: 0, + node_type: entity.node_type(), + node_alias: new_alias.to_string().into_bytes(), + ack_id: "".into(), + node_num_id: entity.num_id(), + root_num_id: 0, + is_root_mirrored: 0, + port, + port_tcp_unused: port, + nic_list: map_bee_msg_nics(nic_list).collect(), + machine_uuid: vec![], + }) }) .await?; app.send_notifications( &[NodeType::Meta, NodeType::Storage, NodeType::Client], - &Heartbeat { - instance_version: 0, - nic_list_version: 0, - node_type: entity.node_type(), - node_alias: node.alias.into_bytes(), - ack_id: "".into(), - node_num_id: entity.num_id(), - root_num_id: 0, - is_root_mirrored: 0, - port: node.port, - port_tcp_unused: node.port, - nic_list: map_bee_msg_nics(nic_list).collect(), - machine_uuid: vec![], - }, + &heartbeat_msg, ) .await; @@ -100,7 +104,7 @@ mod test { super::set_alias( &app, pm::SetAliasRequest { - entity_id: Some(EntityId::Uid(99999999).into()), + entity_id: Some(EntityId::Uid(99999999.into()).into()), entity_type: pb::EntityType::Node.into(), new_alias: "new_alias".to_string(), }, @@ -112,7 +116,7 @@ mod test { super::set_alias( &app, pm::SetAliasRequest { - entity_id: Some(EntityId::Uid(101001).into()), + entity_id: Some(EntityId::Uid(101001.into()).into()), entity_type: pb::EntityType::Target.into(), new_alias: "new_alias".to_string(), }, @@ -154,7 +158,7 @@ mod test { super::set_alias( &app, pm::SetAliasRequest { - entity_id: Some(EntityId::Uid(101001).into()), + entity_id: Some(EntityId::Uid(101001.into()).into()), entity_type: pb::EntityType::Node.into(), new_alias: "new_alias".to_string(), }, diff --git a/mgmtd/src/grpc/set_default_quota_limits.rs b/mgmtd/src/grpc/set_default_quota_limits.rs index 127e38c6..2cbcc640 100644 --- a/mgmtd/src/grpc/set_default_quota_limits.rs +++ b/mgmtd/src/grpc/set_default_quota_limits.rs @@ -18,7 +18,7 @@ pub(crate) async fn set_default_quota_limits( fn update( tx: &Transaction, limit: i64, - pool_id: PoolId, + pool_id: &PoolId, id_type: QuotaIdType, quota_type: QuotaType, ) -> Result<()> { @@ -57,16 +57,16 @@ pub(crate) async fn set_default_quota_limits( let pool_id: PoolId = pool.num_id().try_into()?; if let Some(l) = req.user_space_limit { - update(tx, l, pool_id, QuotaIdType::User, QuotaType::Space)?; + update(tx, l, &pool_id, QuotaIdType::User, QuotaType::Space)?; } if let Some(l) = req.user_inode_limit { - update(tx, l, pool_id, QuotaIdType::User, QuotaType::Inode)?; + update(tx, l, &pool_id, QuotaIdType::User, QuotaType::Inode)?; } if let Some(l) = req.group_space_limit { - update(tx, l, pool_id, QuotaIdType::Group, QuotaType::Space)?; + update(tx, l, &pool_id, QuotaIdType::Group, QuotaType::Space)?; } if let Some(l) = req.group_inode_limit { - update(tx, l, pool_id, QuotaIdType::Group, QuotaType::Inode)?; + update(tx, l, &pool_id, QuotaIdType::Group, QuotaType::Inode)?; } Ok(()) diff --git a/mgmtd/src/grpc/set_target_state.rs b/mgmtd/src/grpc/set_target_state.rs index 5eccc39a..ca857882 100644 --- a/mgmtd/src/grpc/set_target_state.rs +++ b/mgmtd/src/grpc/set_target_state.rs @@ -18,9 +18,9 @@ pub(crate) async fn set_target_state( .write_tx(move |tx| { let target = target.resolve(tx, EntityType::Target)?; - let node: i64 = tx.query_row_cached( + let node: Uid = tx.query_row_cached( sql!("SELECT node_uid FROM targets_ext WHERE target_uid = ?1"), - [target.uid], + [&target.uid], |row| row.get(0), )?; @@ -36,7 +36,7 @@ pub(crate) async fn set_target_state( let resp: SetTargetConsistencyStatesResp = app .request( - node_uid, + &node_uid, &SetTargetConsistencyStates { node_type: target.node_type(), target_ids: vec![target.num_id().try_into().unwrap()], diff --git a/mgmtd/src/grpc/start_resync.rs b/mgmtd/src/grpc/start_resync.rs index 5a37c69a..a7fae045 100644 --- a/mgmtd/src/grpc/start_resync.rs +++ b/mgmtd/src/grpc/start_resync.rs @@ -35,7 +35,7 @@ pub(crate) async fn start_resync( ON src_t.target_id = g.p_target_id AND src_t.node_type = g.node_type WHERE group_uid = ?1" ), - [group.uid], + [&group.uid], |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)), )?; @@ -65,7 +65,7 @@ not supported." let resp: GetMetaResyncStatsResp = app .request( - src_node_uid, + &src_node_uid, &GetMetaResyncStats { target_id: src_target_id, }, @@ -80,9 +80,9 @@ not supported." if !restart { let resp: GetStorageResyncStatsResp = app .request( - src_node_uid, + &src_node_uid, &GetStorageResyncStats { - target_id: src_target_id, + target_id: src_target_id.clone(), }, ) .await?; @@ -92,7 +92,7 @@ not supported." } if timestamp > -1 { - override_last_buddy_comm(app, src_node_uid, src_target_id, &group, timestamp) + override_last_buddy_comm(app, &src_node_uid, src_target_id, &group, timestamp) .await?; } } else { @@ -100,8 +100,14 @@ not supported." bail!("Resync for storage targets can only be restarted with timestamp."); } - override_last_buddy_comm(app, src_node_uid, src_target_id, &group, timestamp) - .await?; + override_last_buddy_comm( + app, + &src_node_uid, + src_target_id.clone(), + &group, + timestamp, + ) + .await?; log::info!("Waiting for the already running resync operations to abort."); @@ -115,9 +121,9 @@ not supported." loop { let resp: GetStorageResyncStatsResp = app .request( - src_node_uid, + &src_node_uid, &GetStorageResyncStats { - target_id: src_target_id, + target_id: src_target_id.clone(), }, ) .await?; @@ -167,7 +173,7 @@ not supported." /// Note that this might be overwritten again on the storage server between async fn override_last_buddy_comm( app: &impl App, - src_node_uid: Uid, + src_node_uid: &Uid, src_target_id: TargetId, group: &EntityIdSet, timestamp: i64, diff --git a/mgmtd/src/lib.rs b/mgmtd/src/lib.rs index cac458a9..71ee1f16 100644 --- a/mgmtd/src/lib.rs +++ b/mgmtd/src/lib.rs @@ -99,12 +99,12 @@ pub async fn start(info: StaticInfo, license: LicenseVerifier) -> Result Result<()> { // Fetch quota data from storage daemons + struct Target { + id: TargetId, + alias: String, // this is only used for logging, so we don't need Alias type here + pool_id: PoolId, + node_uid: Uid, + } - let targets: Vec<(TargetId, PoolId, Uid)> = app + let targets: Vec = app .read_tx(move |tx| { tx.query_map_collect( sql!( - "SELECT target_id, pool_id, node_uid - FROM storage_targets - INNER JOIN nodes USING(node_type, node_id) - WHERE node_id IS NOT NULL" + "SELECT target_id, t.alias, pool_id, n.node_uid, n.alias + FROM targets_ext AS t + INNER JOIN nodes_ext AS n USING(node_type, node_id) + WHERE node_id IS NOT NULL AND node_type = ?1" ), - [], - |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)), + [NodeType::Storage.sql_variant()], + |row| { + Ok(Target { + id: row.get(0)?, + alias: row.get::<_, String>(1)?, + pool_id: row.get(2)?, + node_uid: Uid::with_info(row.get(3)?, row.get::<_, String>(4)?), + }) + }, ) .map_err(Into::into) }) @@ -95,7 +108,7 @@ pub(crate) async fn update_and_distribute(app: &impl App) -> Result<()> { let mut tasks = vec![]; // Sends one request per target to the respective owner node // Requesting is done concurrently. - for (target_id, pool_id, node_uid) in targets { + for target in targets { let app2 = app.clone(); let user_ids2 = user_ids.clone(); let group_ids2 = group_ids.clone(); @@ -103,15 +116,23 @@ pub(crate) async fn update_and_distribute(app: &impl App) -> Result<()> { tasks.push(tokio::spawn(async move { let resp_users: Result = app2 .request( - node_uid, - &GetQuotaInfo::with_user_ids(user_ids2, target_id, pool_id), + &target.node_uid, + &GetQuotaInfo::with_user_ids( + user_ids2, + target.id.clone(), + target.pool_id.clone(), + ), ) .await; let resp_groups: Result = app2 .request( - node_uid, - &GetQuotaInfo::with_group_ids(group_ids2, target_id, pool_id), + &target.node_uid, + &GetQuotaInfo::with_group_ids( + group_ids2, + target.id.clone(), + target.pool_id.clone(), + ), ) .await; @@ -120,7 +141,7 @@ pub(crate) async fn update_and_distribute(app: &impl App) -> Result<()> { let mut entries = u.quota_entry; entries.append(&mut g.quota_entry); - (target_id, Some(entries)) + (target, Some(entries)) } (u, g) => { let log_u = u @@ -133,11 +154,11 @@ pub(crate) async fn update_and_distribute(app: &impl App) -> Result<()> { .unwrap_or_else(|| "".into()); log::error!( - "Fetching quota info for storage target {target_id} from node with uid \ -{node_uid} failed.{log_u}{log_g}" + "Fetching quota info for {} failed. {log_u}{log_g}", + target.alias ); - (target_id, None) + (target, None) } } })); @@ -145,7 +166,7 @@ pub(crate) async fn update_and_distribute(app: &impl App) -> Result<()> { // Await all the responses for t in tasks { - let (target_id, entries) = t.await?; + let (target, entries) = t.await?; // Only process that target if there were not errors when fetching for this target if let Some(entries) = entries { @@ -155,7 +176,7 @@ pub(crate) async fn update_and_distribute(app: &impl App) -> Result<()> { // storages and we only update if there was no fetch error. tx.execute_cached( sql!("DELETE FROM quota_usage WHERE target_id = ?1"), - [target_id], + [&target.id], )?; let mut insert_stmt = tx.prepare_cached(sql!( @@ -164,8 +185,9 @@ pub(crate) async fn update_and_distribute(app: &impl App) -> Result<()> { ))?; log::debug!( - "Setting {} quota usage entries for target {target_id}", - entries.len() + "Setting {} quota usage entries for target {}", + entries.len(), + target.alias ); for e in entries { @@ -174,7 +196,7 @@ pub(crate) async fn update_and_distribute(app: &impl App) -> Result<()> { e.id, e.id_type.sql_variant(), QuotaType::Space.sql_variant(), - target_id, + target.id, e.space ])?; } @@ -184,7 +206,7 @@ pub(crate) async fn update_and_distribute(app: &impl App) -> Result<()> { e.id, e.id_type.sql_variant(), QuotaType::Inode.sql_variant(), - target_id, + target.id, e.inodes ])?; } @@ -209,7 +231,7 @@ async fn exceeded_quota(app: &impl App) -> Result<()> { let (msges, nodes) = app .read_tx(|tx| { - let pools: Vec<_> = + let pools: Vec = tx.query_map_collect(sql!("SELECT pool_id FROM pools"), [], |row| row.get(0))?; // Prepare empty messages. It is important to always send a message for each (PoolId, @@ -220,7 +242,7 @@ async fn exceeded_quota(app: &impl App) -> Result<()> { for id_type in [QuotaIdType::User, QuotaIdType::Group] { for quota_type in [QuotaType::Space, QuotaType::Inode] { msges.push(SetExceededQuota { - pool_id, + pool_id: pool_id.clone(), id_type, quota_type, exceeded_quota_ids: vec![], @@ -253,13 +275,13 @@ async fn exceeded_quota(app: &impl App) -> Result<()> { } // Get all node uids to send the messages to - let nodes: Vec = tx.query_map_collect( - sql!("SELECT node_uid FROM nodes WHERE node_type IN (?1,?2)"), + let nodes: Vec<_> = tx.query_map_collect( + sql!("SELECT node_uid, alias FROM nodes_ext WHERE node_type IN (?1,?2)"), [ NodeType::Meta.sql_variant(), NodeType::Storage.sql_variant(), ], - |row| row.get(0), + |row| Ok(Uid::with_info(row.get(0)?, row.get::<_, String>(1)?)), )?; Ok((msges, nodes)) @@ -275,11 +297,8 @@ async fn exceeded_quota(app: &impl App) -> Result<()> { let mut request_fails = 0; let mut non_success_count = 0; - for node_uid in &nodes { - match app - .request::<_, SetExceededQuotaResp>(*node_uid, &msg) - .await - { + for node in &nodes { + match app.request::<_, SetExceededQuotaResp>(node, &msg).await { Ok(resp) => { if resp.result != OpsErr::SUCCESS { non_success_count += 1; @@ -343,7 +362,7 @@ mod test { let mut quota_entry = vec![]; // Provide dummy quota values for target 1 depending on the id and type - if r.target_id == 1 { + if r.target_id.raw() == 1 { for id in r.id_list.iter().copied() { quota_entry.push(QuotaEntry { space: id as u64 * 1000 + r.id_type.sql_variant() as u64, @@ -353,7 +372,7 @@ mod test { valid: 1, }); } - } else if r.target_id == 2 && r.id_type == QuotaIdType::User { + } else if r.target_id.raw() == 2 && r.id_type == QuotaIdType::User { quota_entry.push(QuotaEntry { space: 999, inodes: 999, @@ -415,7 +434,7 @@ mod test { let r = req.downcast_ref::().unwrap(); // Fail request for target 1 user quota (only) - if r.target_id == 1 && r.id_type == QuotaIdType::User { + if r.target_id.raw() == 1 && r.id_type == QuotaIdType::User { return Err(anyhow::anyhow!("target 1 fail")); } @@ -449,7 +468,7 @@ mod test { let mut quota_entry = vec![]; - if r.target_id == 1 { + if r.target_id.raw() == 1 { quota_entry.push(QuotaEntry { space: 999, inodes: 999, @@ -495,7 +514,7 @@ mod test { app.set_request_handler(move |req| { let r = req.downcast_ref::().unwrap(); - match (r.pool_id, r.id_type, r.quota_type) { + match (r.pool_id.raw(), r.id_type, r.quota_type) { (1, QuotaIdType::User, QuotaType::Space) => { assert_eq!(r.exceeded_quota_ids.as_slice(), &[2, 4, 10]) } diff --git a/mgmtd/src/timer.rs b/mgmtd/src/timer.rs index 568caed5..82f53c01 100644 --- a/mgmtd/src/timer.rs +++ b/mgmtd/src/timer.rs @@ -4,19 +4,19 @@ use crate::App; use crate::app::RuntimeApp; use crate::db::{self}; use crate::license::LicensedFeature; -use crate::quota::update_and_distribute; +use crate::types::SqliteEnumExt; +use rusqlite::params; use shared::bee_msg::target::RefreshTargetStates; use shared::run_state::RunStateHandle; use shared::types::NodeType; +use sqlite_check::sql; +use std::time::Duration; use tokio::time::{MissedTickBehavior, sleep}; /// Starts the timed tasks. pub(crate) fn start_tasks(app: RuntimeApp, run_state: RunStateHandle) { - // TODO send out timer based RefreshTargetStates notification if a reachability - // state changed ? - - tokio::spawn(delete_stale_clients(app.clone(), run_state.clone())); - tokio::spawn(switchover(app.clone(), run_state.clone())); + tokio::spawn(delete_stale_clients_loop(app.clone(), run_state.clone())); + tokio::spawn(check_switchover_loop(app.clone(), run_state.clone())); if app.info.user_config.quota_enable { if let Err(err) = app.license.verify_licensed_feature(LicensedFeature::Quota) { @@ -24,52 +24,63 @@ pub(crate) fn start_tasks(app: RuntimeApp, run_state: RunStateHandle) { "Quota is enabled in the config, but the feature could not be verified. Continuing without quota support: {err}" ); } else { - tokio::spawn(update_quota(app, run_state)); + tokio::spawn(update_quota_loop(app, run_state)); } } } /// Deletes client nodes from the database which haven't responded for the configured time. -async fn delete_stale_clients(app: RuntimeApp, mut run_state: RunStateHandle) { - let timeout = app.info.user_config.client_auto_remove_timeout; +async fn delete_stale_clients_loop(app: impl App, mut run_state: RunStateHandle) { + let timeout = app.static_info().user_config.client_auto_remove_timeout; loop { tokio::select! { - _ = sleep(timeout) => {} + _ = sleep(timeout) => { delete_stale_clients(&app, timeout).await } _ = run_state.wait_for_pre_shutdown() => { break; } } + } + + log::debug!("Timed task delete_stale_clients exited"); +} - log::debug!("Running stale client deleter"); - - match app - .db - .write_tx(move |tx| db::node::delete_stale_clients(tx, timeout)) - .await - { - Ok(affected) => { - if affected > 0 { - log::info!("Deleted {affected} stale clients"); - } +async fn delete_stale_clients(app: &impl App, timeout: Duration) { + log::debug!("Running stale client deleter"); + + match app + .write_tx(move |tx| { + let mut stmt = tx.prepare_cached(sql!( + "DELETE FROM nodes + WHERE DATETIME(last_contact) < DATETIME('now', '-' || ?1 || ' seconds') + AND node_type = ?2" + ))?; + stmt.execute(params![timeout.as_secs(), NodeType::Client.sql_variant()]) + .map_err(|err| err.into()) + }) + .await + { + Ok(affected) => { + if affected > 0 { + log::info!("Deleted {affected} stale clients"); } - Err(err) => log::error!("Deleting stale clients failed: {err:#}"), } + Err(err) => log::error!("Deleting stale clients failed: {err:#}"), } - - log::debug!("Timed task delete_stale_clients exited"); } /// Fetches quota information for all storage targets, calculates exceeded IDs and distributes them. -async fn update_quota(app: RuntimeApp, mut run_state: RunStateHandle) { +async fn update_quota_loop(app: impl App, mut run_state: RunStateHandle) { + let interval = app.static_info().user_config.quota_update_interval; + loop { log::debug!("Running quota update"); - match update_and_distribute(&app).await { + match crate::quota::update_and_distribute(&app).await { Ok(_) => {} Err(err) => log::error!("Updating quota failed: {err:#}"), } tokio::select! { - _ = sleep(app.info.user_config.quota_update_interval) => {} + _ = sleep(interval) => {} _ = run_state.wait_for_pre_shutdown() => { break; } } } @@ -78,7 +89,7 @@ async fn update_quota(app: RuntimeApp, mut run_state: RunStateHandle) { } /// Finds buddy groups with switchover condition, swaps them and notifies nodes. -async fn switchover(app: RuntimeApp, mut run_state: RunStateHandle) { +async fn check_switchover_loop(app: impl App, mut run_state: RunStateHandle) { // On the other nodes / old management, the interval in which the switchover checks are done // is determined by "1/6 sysTargetOfflineTimeoutSecs". // This is also the interval the target states are being pushed to management. To avoid an @@ -86,7 +97,9 @@ async fn switchover(app: RuntimeApp, mut run_state: RunStateHandle) { // up-and-running primary doesn't because of their timing, this value should be the same as on // the nodes. If we delay the initial check by that time, then a running primary has enough time // to report in and update the last contact time before the check happens. - let interval = app.info.user_config.node_offline_timeout / 6; + let offline_timeout = app.static_info().user_config.node_offline_timeout; + let interval = offline_timeout / 6; + let mut timer = tokio::time::interval(interval); timer.set_missed_tick_behavior(MissedTickBehavior::Skip); @@ -95,35 +108,66 @@ async fn switchover(app: RuntimeApp, mut run_state: RunStateHandle) { loop { tokio::select! { - _ = timer.tick() => {} + _ = timer.tick() => { check_switchover(&app, offline_timeout).await } _ = run_state.wait_for_pre_shutdown() => { break; } } + } + + log::debug!("Timed task check_for_switchover exited"); +} - log::debug!("Running switchover check"); - - let timeout = app.info.user_config.node_offline_timeout; - - match app - .db - .write_tx(move |tx| db::buddy_group::check_and_swap_buddies(tx, timeout)) - .await - { - Ok(swapped) => { - if !swapped.is_empty() { - log::warn!( - "A switchover was triggered for the following buddy groups: {swapped:?}" - ); - - app.send_notifications( - &[NodeType::Meta, NodeType::Storage, NodeType::Client], - &RefreshTargetStates { ack_id: "".into() }, - ) - .await; - } +async fn check_switchover(app: &impl App, offline_timeout: Duration) { + log::debug!("Running switchover check"); + + match app + .write_tx(move |tx| db::buddy_group::check_and_swap_buddies(tx, offline_timeout)) + .await + { + Ok(switched) => { + if !switched.is_empty() { + log::warn!( + "A switchover was triggered for the following buddy groups: {switched:?}" + ); + + app.send_notifications( + &[NodeType::Meta, NodeType::Storage, NodeType::Client], + &RefreshTargetStates { ack_id: "".into() }, + ) + .await; } - Err(err) => log::error!("Switchover check failed: {err:#}"), } + Err(err) => log::error!("Switchover check failed: {err:#}"), } +} - log::debug!("Timed task check_for_switchover exited"); +#[cfg(test)] +mod test { + use crate::App; + use crate::app::test::*; + use crate::types::SqliteEnumExt; + use shared::types::NodeType; + use std::time::Duration; + + #[tokio::test] + async fn delete_stale_clients() { + let app = TestApp::new().await; + + super::delete_stale_clients(&app, Duration::from_secs(10)).await; + assert_eq_db!(app, "SELECT COUNT(*) FROM client_nodes", [], 4); + + app.write_tx(|tx| { + tx.execute( + "UPDATE nodes SET last_contact = DATETIME('now', '-' || 2 || ' seconds') + WHERE node_type = ?1", + [NodeType::Client.sql_variant()], + ) + .unwrap(); + Ok(()) + }) + .await + .unwrap(); + + super::delete_stale_clients(&app, Duration::from_secs(1)).await; + assert_eq_db!(app, "SELECT COUNT(*) FROM client_nodes", [], 0); + } } diff --git a/shared/Cargo.toml b/shared/Cargo.toml index 7cc5984d..bb1e7254 100644 --- a/shared/Cargo.toml +++ b/shared/Cargo.toml @@ -18,8 +18,9 @@ protobuf = { workspace = true, optional = true } regex = { workspace = true } # This is currently used to generate sha256 hashes, but only because tonic->rustls already depends # on it. Since this is annoying to have with its weird license, if tonic doesn't need this anymore -# at some point, we should think about removing it. +# at some point, we should think about rsqliteemoving it. ring = { workspace = true} +rusqlite = { workspace = true, optional = true } serde = { workspace = true, features = ["derive"] } thiserror = { workspace = true } tokio = { workspace = true, features = [ @@ -33,6 +34,7 @@ tonic = { workspace = true, optional = true } [features] grpc = ["dep:protobuf", "dep:tonic", "dep:tokio-stream"] +sqlite = ["dep:rusqlite"] [lints.clippy] undocumented_unsafe_blocks = "deny" diff --git a/shared/src/bee_msg.rs b/shared/src/bee_msg.rs index d176d0e4..500e6356 100644 --- a/shared/src/bee_msg.rs +++ b/shared/src/bee_msg.rs @@ -99,7 +99,7 @@ impl Default for Header { msg_flags: 0, msg_prefix: Self::MSG_PREFIX, msg_id: 0, - msg_target_id: 0, + msg_target_id: 0.into(), msg_user_id: 0, msg_seq: 0, msg_seq_done: 0, diff --git a/shared/src/conn/outgoing.rs b/shared/src/conn/outgoing.rs index 78fcd2cd..56c847c2 100644 --- a/shared/src/conn/outgoing.rs +++ b/shared/src/conn/outgoing.rs @@ -50,10 +50,10 @@ impl Pool { /// Sends a [Msg] to a node and receives the response. pub async fn request( &self, - node_uid: Uid, + node_uid: &Uid, msg: &M, ) -> Result { - log::trace!("REQUEST to {node_uid:?}: {msg:?}"); + log::trace!("REQUEST to {node_uid}: {msg:?}"); let mut buf = self.store.pop_buf_or_create(); @@ -63,14 +63,14 @@ impl Pool { self.store.push_buf(buf); - log::trace!("RESPONSE RECEIVED from {node_uid:?}: {resp_msg:?}"); + log::trace!("RESPONSE RECEIVED from {node_uid}: {resp_msg:?}"); Ok(resp_msg) } /// Sends a [Msg] to a node and does **not** receive a response. - pub async fn send(&self, node_uid: Uid, msg: &M) -> Result<()> { - log::trace!("SEND to {node_uid:?}: {msg:?}"); + pub async fn send(&self, node_uid: &Uid, msg: &M) -> Result<()> { + log::trace!("SEND to {node_uid}: {msg:?}"); let mut buf = self.store.pop_buf_or_create(); @@ -96,7 +96,7 @@ impl Pool { /// 3. Pop an open stream from the store, waiting until one gets available. async fn comm_stream( &self, - node_uid: Uid, + node_uid: &Uid, buf: &mut [u8], send_len: usize, expect_response: bool, @@ -104,7 +104,7 @@ impl Pool { debug_assert_eq!(buf.len(), TCP_BUF_LEN); // 1. Pop open streams until communication succeeds or none are left - while let Some(stream) = self.store.try_pop_stream(node_uid) { + while let Some(stream) = self.store.try_pop_stream(node_uid.clone()) { match self .write_and_read_stream(buf, stream, send_len, expect_response) .await @@ -112,20 +112,18 @@ impl Pool { Ok(header) => return Ok(header), Err(err) => { // If the stream doesn't work anymore, just discard it and try the next one - log::debug!( - "Communication using existing stream to node with uid {node_uid} failed: {err}" - ) + log::debug!("Communication using existing stream to {node_uid} failed: {err}") } } } // 2. Obtain a permit and try to open a new stream on each available address - if let Some(permit) = self.store.try_acquire_permit(node_uid) { + if let Some(permit) = self.store.try_acquire_permit(node_uid.clone()) { let Some(addrs) = self.store.get_node_addrs(node_uid) else { - bail!("No available addresses for node with uid {node_uid}"); + bail!("No available addresses for {node_uid}"); }; - log::debug!("Connecting new stream to node with uid {node_uid}"); + log::debug!("Connecting new stream to {node_uid}"); for addr in addrs.iter() { if addr.is_ipv6() && !self.use_ipv6 { @@ -136,11 +134,8 @@ impl Pool { Ok(stream) => { let mut stream = StoredStream::from_stream(stream, permit); - let err_context = || { - format!( - "Connected to node with uid {node_uid}, but communication failed" - ) - }; + let err_context = + || format!("Connected to {node_uid}, but communication failed"); // Authenticate to the peer if required if let Some(auth_secret) = self.auth_secret { @@ -169,31 +164,26 @@ impl Pool { return Ok(resp_header); } // If connecting failed, try the next address - Err(err) => log::debug!( - "Connecting to node with uid {node_uid} via {addr} failed: {err}" - ), + Err(err) => log::debug!("Connecting to {node_uid} via {addr} failed: {err}"), } } // ... but if all failed, that's it - bail!( - "Connecting to node with uid {node_uid} failed for all known addresses: {addrs:?}" - ) + bail!("Connecting to {node_uid} failed for all known addresses: {addrs:?}") } // 3. Wait for an already open stream becoming available - let stream = timeout(Duration::from_secs(2), self.store.pop_stream(node_uid)) - .await - .map_err(|_| { - anyhow::anyhow!("Popping a stream for node with uid {node_uid:?} timed out") - })?; + let stream = timeout( + Duration::from_secs(2), + self.store.pop_stream(node_uid.clone()), + ) + .await + .with_context(|| format!("Popping a stream connected to {node_uid} failed"))?; let resp_header = self .write_and_read_stream(buf, stream, send_len, expect_response) .await - .with_context(|| { - format!("Communication using existing stream to node with uid {node_uid} failed") - })?; + .with_context(|| format!("Communication using existing stream to {node_uid} failed"))?; Ok(resp_header) } @@ -235,20 +225,18 @@ impl Pool { /// not that the messages reached their destinations. pub async fn broadcast_datagram( &self, - peers: impl IntoIterator, + node_uids: impl IntoIterator, msg: &M, ) -> Result<()> { let mut buf = self.store.pop_buf_or_create(); let msg_len = serialize(msg, &mut buf)?; - for node_uid in peers { - let addrs = self.store.get_node_addrs(node_uid).unwrap_or_default(); + for uid in node_uids { + let addrs = self.store.get_node_addrs(&uid).unwrap_or_default(); if addrs.is_empty() { - log::error!( - "Failed to send datagram to node with uid {node_uid}: No known addresses" - ); + log::error!("Failed to send datagram to {uid}: No known addresses"); continue; } @@ -259,17 +247,13 @@ impl Pool { } if let Err(err) = self.udp_socket.send_to(&buf[0..msg_len], addr).await { - log::debug!( - "Sending datagram to node with uid {node_uid} using {addr} failed: {err}" - ); + log::debug!("Sending datagram to {uid} using {addr} failed: {err}"); errs.push((addr, err)); } } if errs.len() == addrs.len() { - log::error!( - "Failed to send datagram to node with uid {node_uid} on all known addresses: {errs:?}" - ); + log::error!("Failed to send datagram to {uid} on all known addresses: {errs:?}"); } } diff --git a/shared/src/conn/store.rs b/shared/src/conn/store.rs index 68e5a09d..b3d0fe01 100644 --- a/shared/src/conn/store.rs +++ b/shared/src/conn/store.rs @@ -78,7 +78,7 @@ impl Store { let mut streams = self.streams.lock().unwrap(); let (queue, _) = streams - .entry(key.clone()) + .entry(key) .or_insert_with(|| self.new_streams_entry()); queue.clone() @@ -116,8 +116,8 @@ impl Store { } /// Get a list of known addresses for the given node UID - pub fn get_node_addrs(&self, key: T) -> Option> { - self.addrs.read().unwrap().get(&key).cloned() + pub fn get_node_addrs(&self, key: &T) -> Option> { + self.addrs.read().unwrap().get(key).cloned() } /// Replace **all** addresses for the given node UID diff --git a/shared/src/types.rs b/shared/src/types.rs index 4f998c3c..8d6bcfb1 100644 --- a/shared/src/types.rs +++ b/shared/src/types.rs @@ -11,6 +11,14 @@ use std::str::FromStr; mod entity; pub use entity::*; +mod uid; +pub use uid::*; +mod target_id; +pub use target_id::*; +mod buddy_group_id; +pub use buddy_group_id::*; +mod pool_id; +pub use pool_id::*; // Type aliases for convenience. Used by BeeGFS messaging and the management. // @@ -18,17 +26,11 @@ pub use entity::*; // do not. It still has to be checked for each BeeGFS message individually which exact type is // needed for serialization. -pub type Uid = i64; -pub type TargetId = u16; -pub type BuddyGroupId = u16; pub type Port = u16; pub type NodeId = u32; -pub type PoolId = u16; pub type QuotaId = u32; pub const MGMTD_ID: NodeId = 1; -pub const MGMTD_UID: Uid = 1; -pub const DEFAULT_STORAGE_POOL: PoolId = 1; /// The BeeGFS node type #[derive(Clone, Copy, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Hash)] diff --git a/shared/src/types/buddy_group_id.rs b/shared/src/types/buddy_group_id.rs new file mode 100644 index 00000000..ccd2a206 --- /dev/null +++ b/shared/src/types/buddy_group_id.rs @@ -0,0 +1,131 @@ +use crate::bee_serde::{Deserializable, Deserializer, Serializable, Serializer}; +#[cfg(feature = "sqlite")] +use rusqlite::ToSql; +#[cfg(feature = "sqlite")] +use rusqlite::types::{FromSql, Value}; +use std::fmt::Display; +use std::hash::Hash; +use std::sync::Arc; + +#[derive(Debug, Default, Clone, Eq)] +pub struct BuddyGroupId { + id: u16, + info: Option>, +} + +impl BuddyGroupId { + pub fn with_info(id: u16, info: impl Into>) -> Self { + Self { + id, + info: Some(info.into()), + } + } + + pub fn raw(&self) -> u16 { + self.id + } + + pub fn is_zero(&self) -> bool { + self.id == 0 + } +} + +impl PartialEq for BuddyGroupId { + fn eq(&self, other: &Self) -> bool { + self.id == other.id + } +} + +impl PartialOrd for BuddyGroupId { + fn partial_cmp(&self, other: &Self) -> Option { + self.id.partial_cmp(&other.id) + } +} + +impl Hash for BuddyGroupId { + fn hash(&self, state: &mut H) { + self.id.hash(state); + } +} + +impl Display for BuddyGroupId { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + if let Some(ref info) = self.info { + Display::fmt(info, f) + } else { + write!(f, "buddy_group_id:{}", self.id) + } + } +} + +impl From for BuddyGroupId { + fn from(value: u16) -> Self { + BuddyGroupId { + id: value, + info: None, + } + } +} +impl TryFrom for BuddyGroupId { + type Error = anyhow::Error; + + fn try_from(value: u32) -> Result { + Ok(Self { + id: value.try_into()?, + info: None, + }) + } +} +impl TryFrom for BuddyGroupId { + type Error = anyhow::Error; + + fn try_from(value: i32) -> Result { + Ok(Self { + id: value.try_into()?, + info: None, + }) + } +} + +impl Serializable for BuddyGroupId { + fn serialize(&self, ser: &mut Serializer) -> anyhow::Result<()> { + self.id.serialize(ser) + } +} + +impl Deserializable for BuddyGroupId { + fn deserialize(des: &mut Deserializer) -> anyhow::Result + where + Self: Sized, + { + Ok(Self { + id: Deserializable::deserialize(des)?, + info: None, + }) + } +} + +#[cfg(feature = "sqlite")] +impl FromSql for BuddyGroupId { + fn column_result(value: rusqlite::types::ValueRef) -> rusqlite::types::FromSqlResult { + let id = value.as_i64()?; + let id = id + .try_into() + .map_err(|_| rusqlite::types::FromSqlError::OutOfRange(id))?; + Ok(BuddyGroupId { id, info: None }) + } +} + +#[cfg(feature = "sqlite")] +impl ToSql for BuddyGroupId { + fn to_sql(&self) -> rusqlite::Result> { + self.id.to_sql() + } +} + +#[cfg(feature = "sqlite")] +impl From for Value { + fn from(value: BuddyGroupId) -> Self { + Value::Integer(value.id.into()) + } +} diff --git a/shared/src/types/entity.rs b/shared/src/types/entity.rs index 2f7600f0..f5706dfc 100644 --- a/shared/src/types/entity.rs +++ b/shared/src/types/entity.rs @@ -146,7 +146,7 @@ impl TryFrom for EntityId { fn try_from(value: pb::EntityIdSet) -> Result { let variant = if let Some(uid) = value.uid { - Self::Uid(uid) + Self::Uid(uid.into()) } else if let Some(alias) = value.alias { Self::Alias(alias.try_into()?) } else if let Some(id) = value.legacy_id { @@ -172,7 +172,7 @@ impl From for pb::EntityIdSet { ..Default::default() }, EntityId::Uid(uid) => pb::EntityIdSet { - uid: Some(uid), + uid: Some(uid.into()), ..Default::default() }, } @@ -206,7 +206,7 @@ impl Display for EntityIdSet { impl From for pb::EntityIdSet { fn from(value: EntityIdSet) -> Self { Self { - uid: Some(value.uid), + uid: Some(value.uid.into()), alias: Some(value.alias.into()), legacy_id: Some(value.legacy_id.into()), } diff --git a/shared/src/types/pool_id.rs b/shared/src/types/pool_id.rs new file mode 100644 index 00000000..0775f7eb --- /dev/null +++ b/shared/src/types/pool_id.rs @@ -0,0 +1,133 @@ +use crate::bee_serde::{Deserializable, Deserializer, Serializable, Serializer}; +#[cfg(feature = "sqlite")] +use rusqlite::ToSql; +#[cfg(feature = "sqlite")] +use rusqlite::types::{FromSql, Value}; +use std::fmt::Display; +use std::hash::Hash; +use std::sync::Arc; + +pub const DEFAULT_STORAGE_POOL: PoolId = PoolId { id: 1, info: None }; + +#[derive(Debug, Default, Clone, Eq)] +pub struct PoolId { + id: u16, + info: Option>, +} + +impl PoolId { + pub fn with_info(id: u16, info: impl Into>) -> Self { + Self { + id, + info: Some(info.into()), + } + } + + pub fn raw(&self) -> u16 { + self.id + } + + pub fn is_zero(&self) -> bool { + self.id == 0 + } +} + +impl PartialEq for PoolId { + fn eq(&self, other: &Self) -> bool { + self.id == other.id + } +} + +impl PartialOrd for PoolId { + fn partial_cmp(&self, other: &Self) -> Option { + self.id.partial_cmp(&other.id) + } +} + +impl Hash for PoolId { + fn hash(&self, state: &mut H) { + self.id.hash(state); + } +} + +impl Display for PoolId { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + if let Some(ref info) = self.info { + Display::fmt(info, f) + } else { + write!(f, "pool_id:{}", self.id) + } + } +} + +impl From for PoolId { + fn from(value: u16) -> Self { + PoolId { + id: value, + info: None, + } + } +} +impl TryFrom for PoolId { + type Error = anyhow::Error; + + fn try_from(value: u32) -> Result { + Ok(Self { + id: value.try_into()?, + info: None, + }) + } +} +impl TryFrom for PoolId { + type Error = anyhow::Error; + + fn try_from(value: i32) -> Result { + Ok(Self { + id: value.try_into()?, + info: None, + }) + } +} + +impl Serializable for PoolId { + fn serialize(&self, ser: &mut Serializer) -> anyhow::Result<()> { + self.id.serialize(ser) + } +} + +impl Deserializable for PoolId { + fn deserialize(des: &mut Deserializer) -> anyhow::Result + where + Self: Sized, + { + Ok(Self { + id: Deserializable::deserialize(des)?, + info: None, + }) + } +} + +#[cfg(feature = "sqlite")] +impl FromSql for PoolId { + fn column_result(value: rusqlite::types::ValueRef) -> rusqlite::types::FromSqlResult { + let id = value.as_i64()?; + let id = id + .try_into() + .map_err(|_| rusqlite::types::FromSqlError::OutOfRange(id))?; + Ok(PoolId { id, info: None }) + } +} + +#[cfg(feature = "sqlite")] +impl ToSql for PoolId { + fn to_sql(&self) -> rusqlite::Result> { + self.id.to_sql() + } +} + +#[cfg(feature = "sqlite")] +impl From for Value { + fn from(value: PoolId) -> Self { + Value::Integer(value.id.into()) + } +} diff --git a/shared/src/types/target_id.rs b/shared/src/types/target_id.rs new file mode 100644 index 00000000..2cd338e1 --- /dev/null +++ b/shared/src/types/target_id.rs @@ -0,0 +1,131 @@ +use crate::bee_serde::{Deserializable, Deserializer, Serializable, Serializer}; +#[cfg(feature = "sqlite")] +use rusqlite::ToSql; +#[cfg(feature = "sqlite")] +use rusqlite::types::{FromSql, Value}; +use std::fmt::Display; +use std::hash::Hash; +use std::sync::Arc; + +#[derive(Debug, Default, Clone, Eq)] +pub struct TargetId { + id: u16, + info: Option>, +} + +impl TargetId { + pub fn with_info(id: u16, info: impl Into>) -> Self { + Self { + id, + info: Some(info.into()), + } + } + + pub fn raw(&self) -> u16 { + self.id + } + + pub fn is_zero(&self) -> bool { + self.id == 0 + } +} + +impl PartialEq for TargetId { + fn eq(&self, other: &Self) -> bool { + self.id == other.id + } +} + +impl PartialOrd for TargetId { + fn partial_cmp(&self, other: &Self) -> Option { + self.id.partial_cmp(&other.id) + } +} + +impl Hash for TargetId { + fn hash(&self, state: &mut H) { + self.id.hash(state); + } +} + +impl Display for TargetId { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + if let Some(ref info) = self.info { + Display::fmt(info, f) + } else { + write!(f, "target_id:{}", self.id) + } + } +} + +impl From for TargetId { + fn from(value: u16) -> Self { + TargetId { + id: value, + info: None, + } + } +} +impl TryFrom for TargetId { + type Error = anyhow::Error; + + fn try_from(value: u32) -> Result { + Ok(Self { + id: value.try_into()?, + info: None, + }) + } +} +impl TryFrom for TargetId { + type Error = anyhow::Error; + + fn try_from(value: i32) -> Result { + Ok(Self { + id: value.try_into()?, + info: None, + }) + } +} + +impl Serializable for TargetId { + fn serialize(&self, ser: &mut Serializer) -> anyhow::Result<()> { + self.id.serialize(ser) + } +} + +impl Deserializable for TargetId { + fn deserialize(des: &mut Deserializer) -> anyhow::Result + where + Self: Sized, + { + Ok(Self { + id: Deserializable::deserialize(des)?, + info: None, + }) + } +} + +#[cfg(feature = "sqlite")] +impl FromSql for TargetId { + fn column_result(value: rusqlite::types::ValueRef) -> rusqlite::types::FromSqlResult { + let id = value.as_i64()?; + let id = id + .try_into() + .map_err(|_| rusqlite::types::FromSqlError::OutOfRange(id))?; + Ok(TargetId { id, info: None }) + } +} + +#[cfg(feature = "sqlite")] +impl ToSql for TargetId { + fn to_sql(&self) -> rusqlite::Result> { + self.id.to_sql() + } +} + +#[cfg(feature = "sqlite")] +impl From for Value { + fn from(value: TargetId) -> Self { + Value::Integer(value.id.into()) + } +} diff --git a/shared/src/types/uid.rs b/shared/src/types/uid.rs new file mode 100644 index 00000000..6ab4c840 --- /dev/null +++ b/shared/src/types/uid.rs @@ -0,0 +1,87 @@ +#[cfg(feature = "sqlite")] +use rusqlite::ToSql; +#[cfg(feature = "sqlite")] +use rusqlite::types::FromSql; +use std::fmt::Display; +use std::hash::Hash; +use std::sync::Arc; + +pub const MGMTD_UID: Uid = Uid { uid: 1, info: None }; + +#[derive(Debug, Default, Clone, Eq)] +pub struct Uid { + uid: i64, + info: Option>, +} + +impl Uid { + pub fn with_info(uid: i64, info: impl Into>) -> Self { + Self { + uid, + info: Some(info.into()), + } + } + + pub fn raw(&self) -> i64 { + self.uid + } +} + +impl PartialEq for Uid { + fn eq(&self, other: &Self) -> bool { + self.uid == other.uid + } +} + +impl PartialOrd for Uid { + fn partial_cmp(&self, other: &Self) -> Option { + self.uid.partial_cmp(&other.uid) + } +} + +impl Hash for Uid { + fn hash(&self, state: &mut H) { + self.uid.hash(state); + } +} + +impl Display for Uid { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + if let Some(ref info) = self.info { + Display::fmt(info, f) + } else { + write!(f, "uid:{}", self.uid) + } + } +} + +impl From for Uid { + fn from(value: i64) -> Self { + Uid { + uid: value, + info: None, + } + } +} +impl From for i64 { + fn from(value: Uid) -> Self { + value.uid + } +} + +#[cfg(feature = "sqlite")] +impl FromSql for Uid { + fn column_result(value: rusqlite::types::ValueRef) -> rusqlite::types::FromSqlResult { + Ok(Uid { + uid: value.as_i64()?, + info: None, + }) + } +} + +#[cfg(feature = "sqlite")] +impl ToSql for Uid { + fn to_sql(&self) -> rusqlite::Result> { + self.uid.to_sql() + } +}