Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion mgmtd/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ path = "src/main.rs"
sqlite = { path = "../sqlite" }

[dependencies]
shared = { path = "../shared", features = ["grpc"] }
shared = { path = "../shared", features = ["grpc", "sqlite"] }
protobuf = { workspace = true }
sqlite = { path = "../sqlite" }
sqlite_check = { path = "../sqlite_check" }
Expand Down
2 changes: 1 addition & 1 deletion mgmtd/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ pub(crate) trait App: Debug + Clone + Send + 'static {
/// Send a [Msg] to a node via TCP and receive the response
fn request<M: Msg + Serializable, R: Msg + Deserializable>(
&self,
node_uid: Uid,
node_uid: &Uid,
msg: &M,
) -> impl Future<Output = Result<R>> + Send;

Expand Down
40 changes: 22 additions & 18 deletions mgmtd/src/app/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@ use super::*;
use crate::ClientPulledStateNotification;
use crate::bee_msg::dispatch_request;
use crate::license::LicenseVerifier;
use crate::types::SqliteEnumExt;
use anyhow::Result;
use protobuf::license::GetCertDataResult;
use rusqlite::{Connection, Transaction};
use shared::conn::msg_dispatch::{DispatchRequest, Request};
use shared::conn::outgoing::Pool;
use shared::run_state::WeakRunStateHandle;
use sqlite::Connections;
use sqlite::{Connections, TransactionExt, rarray_param};
use sqlite_check::sql;
use std::fmt::Debug;
use std::ops::Deref;
use tokio::sync::mpsc;
Expand Down Expand Up @@ -113,7 +115,7 @@ impl App for RuntimeApp {

async fn request<M: Msg + Serializable, R: Msg + Deserializable>(
&self,
node_uid: Uid,
node_uid: &Uid,
msg: &M,
) -> Result<R> {
Pool::request(&self.conn, node_uid, msg).await
Expand All @@ -126,22 +128,24 @@ impl App for RuntimeApp {
) {
log::trace!("NOTIFICATION to {node_types:?}: {msg:?}");

for t in node_types {
if let Err(err) = async {
let nodes = self
.read_tx(move |tx| crate::db::node::get_with_type(tx, *t))
.await?;

self.conn
.broadcast_datagram(nodes.into_iter().map(|e| e.uid), msg)
.await?;

Ok(()) as Result<_>
}
.await
{
log::error!("Notification could not be sent to all {t} nodes: {err:#}");
}
if let Err(err) = async {
let nodes: Vec<_> = self
.read_tx(move |tx| {
Ok(tx.query_map_collect(
sql!("SELECT node_uid, alias FROM nodes_ext WHERE node_type IN rarray(?1)"),
[&rarray_param(node_types.iter().map(|t| t.sql_variant()))],
|row| Ok(Uid::with_info(row.get(0)?, row.get::<_, String>(1)?)),
)?)
})
.await?;

self.conn.broadcast_datagram(nodes, msg).await?;

Ok(()) as Result<_>
}
.await
{
log::error!("Notification could not be sent to all {node_types:?} nodes: {err:#}");
}
}

Expand Down
2 changes: 1 addition & 1 deletion mgmtd/src/app/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ impl App for TestApp {

async fn request<M: Msg + Serializable, R: Msg + Deserializable>(
&self,
_node_uid: Uid,
_node_uid: &Uid,
msg: &M,
) -> Result<R> {
let mut d = self.data.lock().unwrap();
Expand Down
6 changes: 3 additions & 3 deletions mgmtd/src/bee_msg/change_target_consistency_states.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ mod test {
// No change of consistency states
let msg = ChangeTargetConsistencyStates {
node_type: NodeType::Storage,
target_ids: vec![1, 5],
target_ids: vec![1.into(), 5.into()],
old_states: vec![TargetConsistencyState::Good, TargetConsistencyState::Good],
new_states: vec![TargetConsistencyState::Good, TargetConsistencyState::Good],
ack_id: "".into(),
Expand All @@ -136,7 +136,7 @@ mod test {
// Change of consistency states
let msg = ChangeTargetConsistencyStates {
node_type: NodeType::Storage,
target_ids: vec![1, 5],
target_ids: vec![1.into(), 5.into()],
old_states: vec![TargetConsistencyState::Good, TargetConsistencyState::Good],
new_states: vec![
TargetConsistencyState::NeedsResync,
Expand Down Expand Up @@ -179,7 +179,7 @@ mod test {
// Mismatch of reported old state should not change the consistency states
let msg = ChangeTargetConsistencyStates {
node_type: NodeType::Storage,
target_ids: vec![1],
target_ids: vec![1.into()],
old_states: vec![TargetConsistencyState::NeedsResync],
new_states: vec![TargetConsistencyState::Bad],
ack_id: "".into(),
Expand Down
18 changes: 9 additions & 9 deletions mgmtd/src/bee_msg/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ pub(super) async fn update_node(msg: RegisterNode, app: &impl App) -> Result<Nod
};

if let Some(machine_uuid) = machine_uuid
&& db::node::count_machines(tx, machine_uuid, node.as_ref().map(|n| n.uid))?
&& db::node::count_machines(tx, machine_uuid, node.as_ref().map(|n| n.uid.clone()))?
>= licensed_machines
{
bail!("Licensed machine limit reached. Node registration denied.");
Expand All @@ -54,7 +54,7 @@ pub(super) async fn update_node(msg: RegisterNode, app: &impl App) -> Result<Nod

let (node, is_new) = if let Some(node) = node {
// Existing node, update data
db::node::update(tx, node.uid, msg.port, machine_uuid)?;
db::node::update(tx, &node.uid, msg.port, machine_uuid)?;

// If the updated node is a meta node, check if its corresponding target has a
// registration token
Expand Down Expand Up @@ -132,7 +132,7 @@ client version < 8.0)"
"target_id",
NodeType::Meta,
1..=0xFFFF,
)?,
)?.try_into()?,
n => TargetId::try_from(n).map_err(|_| anyhow!("{n} is not a valid numeric meta node id (must be between 1 and 65535)"))?,
};

Expand All @@ -147,10 +147,10 @@ client version < 8.0)"

db::target::insert(
tx,
target_id,
&target_id,
tk,
NodeTypeServer::Meta,
Some(target_id.into()),
Some(target_id.raw().into()),
)?;

// If this is the first meta target, set it as meta root
Expand All @@ -166,7 +166,7 @@ client version < 8.0)"
// Update the corresponding nic lists
db::node_nic::replace(
tx,
node.uid,
&node.uid,
msg.nics.iter().map(|e| ReplaceNic {
nic_type: e.nic_type,
addr: &e.addr,
Expand All @@ -186,7 +186,7 @@ client version < 8.0)"
.await?;

app.replace_node_addrs(
node.uid,
node.uid.clone(),
nics.clone()
.into_iter()
.map(|e| SocketAddr::new(e.addr, msg.port))
Expand Down Expand Up @@ -219,7 +219,7 @@ client version < 8.0)"
root_num_id: match meta_root {
MetaRoot::Unknown => 0,
MetaRoot::Normal(node_id, _) => node_id,
MetaRoot::Mirrored(group_id) => group_id.into(),
MetaRoot::Mirrored(ref group_id) => group_id.raw().into(),
},
is_root_mirrored: match meta_root {
MetaRoot::Unknown | MetaRoot::Normal(_, _) => 0,
Expand Down Expand Up @@ -294,7 +294,7 @@ pub(super) fn update_last_contact_times(
node_type: NodeTypeServer,
offline_timeout: Duration,
) -> Result<usize> {
let target_ids_param = sqlite::rarray_param(target_ids.iter().copied());
let target_ids_param = sqlite::rarray_param(target_ids.iter().cloned());

tx.execute_cached(
sql!(
Expand Down
20 changes: 10 additions & 10 deletions mgmtd/src/bee_msg/get_node_capacity_pools.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ impl HandleWithResponse for GetNodeCapacityPools {
res[cp].push(t.id);
}

[(0, res)].into()
[(0.into(), res)].into()
}

CapacityPoolQueryType::Storage => {
Expand All @@ -122,8 +122,8 @@ impl HandleWithResponse for GetNodeCapacityPools {
.await?;

let mut res: HashMap<PoolId, Vec<Vec<u16>>> = HashMap::new();
for sp in pools {
let f_targets = targets.iter().filter(|e| e.pool_id == Some(sp));
for sp in &pools {
let f_targets = targets.iter().filter(|e| e.pool_id.as_ref() == Some(sp));

let cp_calc = CapPoolCalculator::new(
app.static_info()
Expand All @@ -137,12 +137,12 @@ impl HandleWithResponse for GetNodeCapacityPools {
f_targets.clone(),
)?;

res.insert(sp, vec![Vec::<u16>::new(), vec![], vec![]]);
res.insert(sp.clone(), vec![Vec::<u16>::new(), vec![], vec![]]);
for t in f_targets {
let cp = cp_calc
.cap_pool(t.free_space(), t.free_inodes())
.bee_msg_vec_index();
res.get_mut(&sp).unwrap()[cp].push(t.id);
res.get_mut(sp).unwrap()[cp].push(t.id);
}
}

Expand Down Expand Up @@ -172,7 +172,7 @@ impl HandleWithResponse for GetNodeCapacityPools {
res[cp].push(e.id);
}

[(0, res)].into()
[(0.into(), res)].into()
}

CapacityPoolQueryType::StorageMirrored => {
Expand All @@ -191,8 +191,8 @@ impl HandleWithResponse for GetNodeCapacityPools {
.await?;

let mut cap_pools: HashMap<PoolId, Vec<Vec<u16>>> = HashMap::new();
for sp in pools {
let f_groups = groups.iter().filter(|e| e.pool_id == Some(sp));
for sp in &pools {
let f_groups = groups.iter().filter(|e| e.pool_id.as_ref() == Some(sp));

let cp_calc = CapPoolCalculator::new(
app.static_info()
Expand All @@ -206,12 +206,12 @@ impl HandleWithResponse for GetNodeCapacityPools {
f_groups.clone(),
)?;

cap_pools.insert(sp, vec![Vec::<u16>::new(), vec![], vec![]]);
cap_pools.insert(sp.clone(), vec![Vec::<u16>::new(), vec![], vec![]]);
for t in f_groups {
let cp = cp_calc
.cap_pool(t.free_space(), t.free_inodes())
.bee_msg_vec_index();
cap_pools.get_mut(&sp).unwrap()[cp].push(t.id);
cap_pools.get_mut(sp).unwrap()[cp].push(t.id);
}
}

Expand Down
58 changes: 32 additions & 26 deletions mgmtd/src/bee_msg/get_nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,46 +7,52 @@ impl HandleWithResponse for GetNodes {
type Response = GetNodesResp;

async fn handle(self, app: &impl App, _req: &mut impl Request) -> Result<Self::Response> {
let res = app
let (nodes, meta_root) = app
.read_tx(move |tx| {
let node_type = self.node_type;
let res = (
db::node::get_with_type(tx, node_type)?,
db::node_nic::get_with_type(tx, node_type)?,
let nics = db::node_nic::get_with_type(tx, node_type)?;

let nodes: Vec<_> = tx.query_map_collect(
sql!(
"SELECT node_uid, port, alias, node_id FROM nodes_ext
WHERE node_type = ?1 ORDER BY node_id ASC"
),
[node_type.sql_variant()],
|row| {
let uid = row.get(0)?;
let port = row.get(1)?;
Ok(Node {
alias: row.get::<_, String>(2)?.into_bytes(),
nic_list: map_bee_msg_nics(
nics.iter().filter(|e| e.node_uid == uid).cloned(),
)
.collect(),
num_id: row.get(3)?,
port,
_unused_tcp_port: port,
node_type,
})
},
)?;

Ok((
nodes,
match self.node_type {
shared::types::NodeType::Meta => db::misc::get_meta_root(tx)?,
_ => MetaRoot::Unknown,
},
);

Ok(res)
))
})
.await?;

let mut nodes: Vec<Node> = res
.0
.into_iter()
.map(|n| Node {
alias: n.alias.into_bytes(),
num_id: n.id,
nic_list: map_bee_msg_nics(res.1.iter().filter(|e| e.node_uid == n.uid).cloned())
.collect(),
port: n.port,
_unused_tcp_port: n.port,
node_type: n.node_type,
})
.collect();

nodes.sort_by(|a, b| a.num_id.cmp(&b.num_id));

let resp = GetNodesResp {
nodes,
root_num_id: match res.2 {
root_num_id: match meta_root {
MetaRoot::Unknown => 0,
MetaRoot::Normal(node_id, _) => node_id,
MetaRoot::Mirrored(group_id) => group_id.into(),
MetaRoot::Mirrored(ref group_id) => group_id.raw().into(),
},
is_root_mirrored: match res.2 {
is_root_mirrored: match meta_root {
MetaRoot::Unknown => 0,
MetaRoot::Normal(_, _) => 0,
MetaRoot::Mirrored(_) => 1,
Expand Down
10 changes: 5 additions & 5 deletions mgmtd/src/bee_msg/get_storage_pools.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,11 +133,11 @@ impl HandleWithResponse for GetStoragePools {
.cap_pool(target.free_space(), target.free_inodes())
.bee_msg_vec_index();

let target_id: TargetId = target.id;
let target_id: TargetId = target.id.into();
let node_id = target.node_id.expect("targets have a node id");

target_map.insert(target_id, node_id);
target_cap_pools[cp].push(target.id);
target_map.insert(target_id.clone(), node_id);
target_cap_pools[cp].push(target.id.into());

if let Some(node_group) = grouped_target_cap_pools[cp].get_mut(&node_id) {
node_group.push(target_id);
Expand All @@ -148,12 +148,12 @@ impl HandleWithResponse for GetStoragePools {

// Only collect buddy groups belonging to the current pool
for group in f_buddy_groups {
buddy_group_vec.push(group.id);
buddy_group_vec.push(group.id.into());

let cp = cp_buddy_groups_calc
.cap_pool(group.free_space(), group.free_inodes())
.bee_msg_vec_index();
buddy_group_cap_pools[cp].push(group.id);
buddy_group_cap_pools[cp].push(group.id.into());
}

Ok(StoragePool {
Expand Down
2 changes: 1 addition & 1 deletion mgmtd/src/bee_msg/heartbeat_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ impl HandleWithResponse for HeartbeatRequest {
Ok((
db::entity::get_alias(tx, MGMTD_UID)?
.ok_or_else(|| TypedError::value_not_found("management uid", MGMTD_UID))?,
db::node_nic::get_with_node(tx, MGMTD_UID)?,
db::node_nic::get_with_node(tx, &MGMTD_UID)?,
))
})
.await
Expand Down
Loading
Loading