From 7c1e5122de1d3c1d754287c6523c10b1da3169f7 Mon Sep 17 00:00:00 2001 From: "Federico Pasqua (eisterman)" Date: Sun, 14 Jul 2024 14:54:32 +0200 Subject: [PATCH] New docs in bonknet_broker --- bonknet_broker/src/dataconnmanager.rs | 18 ++++++--- bonknet_broker/src/main.rs | 42 ++++++++++----------- bonknet_broker/src/pendingdataconndb.rs | 50 ++++++++++++++----------- bonknet_broker/src/servercertdb.rs | 2 - bonknet_server/src/main.rs | 1 + 5 files changed, 63 insertions(+), 50 deletions(-) diff --git a/bonknet_broker/src/dataconnmanager.rs b/bonknet_broker/src/dataconnmanager.rs index 81111cb..39f6252 100644 --- a/bonknet_broker/src/dataconnmanager.rs +++ b/bonknet_broker/src/dataconnmanager.rs @@ -14,6 +14,9 @@ pub enum DataConnManagerError { GenericFailure, } +/// Ask the DataConnManager to start a new Data Connection with +/// these two TransportStream as ends of the bridge. +/// As internal ID the Client Connection ID is used. #[derive(Message)] #[rtype(result = "Result<(),DataConnManagerError>")] pub struct StartDataBridge { @@ -22,6 +25,7 @@ pub struct StartDataBridge { pub client_transport: TransportStream, } +/// Stop the data bridge using the Client Connection ID as reference to the bridge. #[derive(Message)] #[rtype(result = "()")] pub struct StopDataBridge { @@ -31,23 +35,27 @@ pub struct StopDataBridge { type ClientConnId = Uuid; struct Connection { - proxyhandler: SpawnHandle, + proxyhandler: SpawnHandle, // SpawnHandle is an actix Future handler (basically a Task) } +/// This is the actor that manages the Data Connections. +/// When a new Data Bridge is created, that consist of a Task running indefinitely that +/// redirect information from a Client to a Server and viceversa. +/// Basically the DataBridge is where the SSH information are sent in. pub struct DataConnManager { conns: HashMap } +impl Actor for DataConnManager { + type Context = Context; +} + impl DataConnManager { pub fn new() -> Self { Self { conns: HashMap::new() } } } -impl Actor for DataConnManager { - type Context = Context; -} - impl Handler for DataConnManager { type Result = (); diff --git a/bonknet_broker/src/main.rs b/bonknet_broker/src/main.rs index f833918..2a12136 100644 --- a/bonknet_broker/src/main.rs +++ b/bonknet_broker/src/main.rs @@ -30,9 +30,9 @@ struct BrokerContext { client_ca: CACertPair<'static>, server_ca: CACertPair<'static>, guestserver_ca: CACertPair<'static>, - scdb_addr: Addr, - pdcm_addr: Addr, - sm_addr: Addr, + servercert_db: Addr, + pendingdataconn_manager: Addr, + server_manager: Addr, } @@ -47,19 +47,19 @@ async fn main() { let server_ca = CACertPair::load_from_file("certs_pem/server_root_ca.pem").unwrap(); let guestserver_ca = CACertPair::load_from_file("certs_pem/guestserver_root_ca.pem").unwrap(); // Load Actors - let scdb_addr = ServerCertDB::new().start(); - let dcm_addr = DataConnManager::new().start(); - let pdcm_addr = PendingDataConnManager::new(dcm_addr).start(); - let sm_addr = ServerManager::new(pdcm_addr.clone()).start(); + let servercert_db = ServerCertDB::new().start(); + let dataconn_manager = DataConnManager::new().start(); + let pendingdataconn_manager = PendingDataConnManager::new(dataconn_manager).start(); + let server_manager = ServerManager::new(pendingdataconn_manager.clone()).start(); // Create Context let ctx = Arc::new(BrokerContext { broker_leaf, client_ca, server_ca, guestserver_ca, - scdb_addr, - pdcm_addr, - sm_addr, + servercert_db, + pendingdataconn_manager, + server_manager, }); // Pki Client Verifier @@ -109,7 +109,7 @@ async fn main() { } Subscribe => { info!("Subscribe Stream"); - let name = match ctx.scdb_addr.send(FetchName { cert: peer_cert_bytes }).await.unwrap() { + let name = match ctx.servercert_db.send(FetchName { cert: peer_cert_bytes }).await.unwrap() { None => { error!("Cert has no name assigned!"); let reply = ToServerConnTypeReply::GenericFailure; @@ -125,7 +125,7 @@ async fn main() { OpenDataStream(conn_id) => { info!("OpenDataStream with {:?}", conn_id); let msg = RegisterStream::server(conn_id, transport); - ctx.pdcm_addr.send(msg).await.unwrap().unwrap(); + ctx.pendingdataconn_manager.send(msg).await.unwrap().unwrap(); } } } @@ -159,7 +159,7 @@ async fn main() { } async fn server_subscribe_handler(ctx: &BrokerContext, transport: TransportStream, name: String) { - match ctx.sm_addr.send(StartTransporter { name, transport }).await.unwrap() { + match ctx.server_manager.send(StartTransporter { name, transport }).await.unwrap() { Ok(_) => { info!("Stream sent to the manager"); } @@ -184,7 +184,7 @@ async fn server_command_handler(ctx: &BrokerContext, mut transport: TransportStr match msg { ChangeName { name } => { info!("Changing name to {}", name); - match ctx.scdb_addr.send(UnregisterServer { cert: peer_cert_bytes.clone() }).await.unwrap() { + match ctx.servercert_db.send(UnregisterServer { cert: peer_cert_bytes.clone() }).await.unwrap() { None => { info!("Nothing to unregister"); } @@ -192,7 +192,7 @@ async fn server_command_handler(ctx: &BrokerContext, mut transport: TransportStr warn!("Unregistered from old name {}", old_name); } } - let reply = match ctx.scdb_addr.send(RegisterServer { cert: peer_cert_bytes.clone(), name }).await.unwrap() { + let reply = match ctx.servercert_db.send(RegisterServer { cert: peer_cert_bytes.clone(), name }).await.unwrap() { Ok(_) => { info!("Registered!"); ToServerCommandReply::NameChanged @@ -206,7 +206,7 @@ async fn server_command_handler(ctx: &BrokerContext, mut transport: TransportStr } WhoAmI => { info!("Asked who I am"); - let reply = match ctx.scdb_addr.send(FetchName { cert: peer_cert_bytes.clone() }).await.unwrap() { + let reply = match ctx.servercert_db.send(FetchName { cert: peer_cert_bytes.clone() }).await.unwrap() { None => { info!("I'm not registered anymore!? WTF"); ToServerCommandReply::GenericFailure @@ -245,14 +245,14 @@ async fn guestserver_handler(ctx: &BrokerContext, mut transport: TransportStream match msg { Announce { name } => { info!("Announced with name {}", name); - if ctx.scdb_addr.send(IsNameRegistered { name: name.clone() }).await.unwrap() { + if ctx.servercert_db.send(IsNameRegistered { name: name.clone() }).await.unwrap() { info!("Name {} already registered!", name); let reply = ToGuestServerMessage::FailedNameAlreadyOccupied; transport.send(rmp_serde::to_vec(&reply).unwrap().into()).await.unwrap(); break; // Stop reading } else { let cert = ctx.server_ca.sign_new_cert(server_leaf_certparams(name.as_str())); - ctx.scdb_addr.send(RegisterServer { + ctx.servercert_db.send(RegisterServer { cert: cert.cert().to_vec(), name, }).await.unwrap().unwrap(); @@ -287,7 +287,7 @@ async fn client_handler(ctx: &BrokerContext, mut transport: TransportStream) { match msg { FromClientCommand::RequestServer { name } => { info!("REQUESTED SERVER {}", name); - let data = ctx.sm_addr.send(RequestServer { name }).await.unwrap(); + let data = ctx.server_manager.send(RequestServer { name }).await.unwrap(); match data { Ok(client_conn_id) => { let reply = ToClientResponse::OkRequest { conn_id: client_conn_id }; @@ -301,14 +301,14 @@ async fn client_handler(ctx: &BrokerContext, mut transport: TransportStream) { } FromClientCommand::ServerList => { info!("Requested ServerList"); - let data = ctx.sm_addr.send(GetServerList {}).await.unwrap(); + let data = ctx.server_manager.send(GetServerList {}).await.unwrap(); let reply = ToClientResponse::OkServerList { data }; transport.send(rmp_serde::to_vec(&reply).unwrap().into()).await.unwrap(); } FromClientCommand::UpgradeToDataStream(conn_id) => { info!("Upgrade to DataStream with conn_id {:?}", conn_id); let msg = RegisterStream::client(conn_id, transport); - ctx.pdcm_addr.send(msg).await.unwrap().unwrap(); + ctx.pendingdataconn_manager.send(msg).await.unwrap().unwrap(); break; } } diff --git a/bonknet_broker/src/pendingdataconndb.rs b/bonknet_broker/src/pendingdataconndb.rs index 152efad..cfecc3b 100644 --- a/bonknet_broker/src/pendingdataconndb.rs +++ b/bonknet_broker/src/pendingdataconndb.rs @@ -7,20 +7,15 @@ use libbonknet::*; use crate::dataconnmanager::{DataConnManager, StartDataBridge}; use crate::streamutils::*; -/* -L'idea e' che il database deve avere una riga per ogni connessione dati in nascita. -In ogni "riga" deve essere presente: -- Il ServerConnID che il server dovra usare (pkey) -- Il ClientConnID che il client dovra usare (pkey) -- Se gia connesso, il Socket del Server -- Se gia connesso il Socket del Client - -Quando in una riga sono presenti sia ServerSocket che ClientSocket allora poppa la riga -e usa i socket per lanciare un thread/actor che faccia il piping dei dati - -Ricerca riga deve avvenire sia tramite ServerConnID che ClientConnID se essi sono diversi come pianifico -Quindi l'ideale e' non usare una collection ma andare direttamente di Vector! - */ +//! Pending Data Connect Database is where we store the Data Connection in creation. +//! Every row contains: +//! - The ServerConnID that the server will use (uuid) +//! - The ClientConnID that the client will use (uuid) +//! - If the Server is already connected and ready, the Server Socket +//! - If the Client is already connected and ready, the Client Socket +//! +//! When a row contains a valid ServerSocket and ClientSocket, then the row is removed and the +//! sockets moved to the Data Connection Manager #[derive(Error, Debug)] pub enum PendingDataConnError { @@ -59,11 +54,13 @@ impl RegisterStream { } } +/// Record with connection ID and TransportStream that can represent a client or a server struct SideRecord { conn_id: Uuid, transport: Option, } +/// A DB Record, with a Server and a Client record containind IDs and Transports struct Record { server: SideRecord, client: SideRecord, @@ -78,16 +75,29 @@ impl Record { } // TODO: every 2 minutes verify the Records that have at least one stream invalidated and drop them +/// This contains the database of records of Data Connections in the phase of creation. +/// The approach is that when DataConnection is needed, a new Pending is created together with +/// two IDs to send to the client and server and that they will use to create a new TransportStream +/// that will be then registered to the right Record. +/// When a Record has the two TransportStreams needed, the Record is completed and can be consumed +/// and used to create a DataConnection sending the ownership of the two streams to the +/// Data Connection Manager. pub struct PendingDataConnManager { db: Vec, - dcm_addr: Addr, + dataconn_manager: Addr, +} + +impl Actor for PendingDataConnManager { + type Context = Context; } impl PendingDataConnManager { - pub fn new(dcm_addr: Addr) -> Self { - PendingDataConnManager { db: Vec::new(), dcm_addr } + pub fn new(dataconn_manager_addr: Addr) -> Self { + PendingDataConnManager { db: Vec::new(), dataconn_manager: dataconn_manager_addr } } + /// The Row retrieve need to use ServerConnID AND ClientConnID as keys, so the best approach is to + /// use a single Vec and check the two IDs while iterating. fn retrieve_siderecord(&mut self, kind: &RegisterKind, conn_id: &Uuid) -> Option<&mut SideRecord> { use RegisterKind::*; let record = match match kind { @@ -104,10 +114,6 @@ impl PendingDataConnManager { } } -impl Actor for PendingDataConnManager { - type Context = Context; -} - impl Handler for PendingDataConnManager { type Result = Result<(), PendingDataConnError>; @@ -171,7 +177,7 @@ impl Handler for PendingDataConnManager { server_transport: real_record.server.transport.unwrap(), // Safety: We are sure this is Some client_transport: real_record.client.transport.unwrap(), // Safety: We are sure this is Some }; - self.dcm_addr.do_send(msg); + self.dataconn_manager.do_send(msg); } Ok(()) } else { diff --git a/bonknet_broker/src/servercertdb.rs b/bonknet_broker/src/servercertdb.rs index c586bd7..845044e 100644 --- a/bonknet_broker/src/servercertdb.rs +++ b/bonknet_broker/src/servercertdb.rs @@ -2,8 +2,6 @@ use std::collections::HashMap; use actix::prelude::*; use thiserror::Error; -// TODO: Probably it's better to remove the pub from inside the structs and impl a new() funct - #[derive(Error, Debug)] pub enum DBError { #[error("Certificate is already registered with name {0}")] diff --git a/bonknet_server/src/main.rs b/bonknet_server/src/main.rs index df640e4..0d74cc9 100644 --- a/bonknet_server/src/main.rs +++ b/bonknet_server/src/main.rs @@ -208,6 +208,7 @@ async fn main() -> std::io::Result<()> { ServerContext { identity: server_ident, broker_root: broker_root.clone(), my_name: my_name.clone() } } else if exists_serverident { // Yes Server -> Use Server file as identity + info!("Server Identity found. Confirming..."); let server_ident = LeafCertPair::load_from_file(serverident_path).unwrap(); let ctx = ServerContext { identity: server_ident, broker_root: broker_root.clone(), my_name: my_name.clone() }; server_name_confirmation(&ctx).await.unwrap();