New docs in bonknet_broker

This commit is contained in:
2024-07-14 14:54:32 +02:00
parent 1e4e4bdb53
commit 7c1e5122de
5 changed files with 63 additions and 50 deletions

View File

@@ -14,6 +14,9 @@ pub enum DataConnManagerError {
GenericFailure, GenericFailure,
} }
/// Ask the DataConnManager to start a new Data Connection with
/// these two TransportStream as ends of the bridge.
/// As internal ID the Client Connection ID is used.
#[derive(Message)] #[derive(Message)]
#[rtype(result = "Result<(),DataConnManagerError>")] #[rtype(result = "Result<(),DataConnManagerError>")]
pub struct StartDataBridge { pub struct StartDataBridge {
@@ -22,6 +25,7 @@ pub struct StartDataBridge {
pub client_transport: TransportStream, pub client_transport: TransportStream,
} }
/// Stop the data bridge using the Client Connection ID as reference to the bridge.
#[derive(Message)] #[derive(Message)]
#[rtype(result = "()")] #[rtype(result = "()")]
pub struct StopDataBridge { pub struct StopDataBridge {
@@ -31,23 +35,27 @@ pub struct StopDataBridge {
type ClientConnId = Uuid; type ClientConnId = Uuid;
struct Connection { struct Connection {
proxyhandler: SpawnHandle, proxyhandler: SpawnHandle, // SpawnHandle is an actix Future handler (basically a Task)
} }
/// This is the actor that manages the Data Connections.
/// When a new Data Bridge is created, that consist of a Task running indefinitely that
/// redirect information from a Client to a Server and viceversa.
/// Basically the DataBridge is where the SSH information are sent in.
pub struct DataConnManager { pub struct DataConnManager {
conns: HashMap<ClientConnId, Connection> conns: HashMap<ClientConnId, Connection>
} }
impl Actor for DataConnManager {
type Context = Context<Self>;
}
impl DataConnManager { impl DataConnManager {
pub fn new() -> Self { pub fn new() -> Self {
Self { conns: HashMap::new() } Self { conns: HashMap::new() }
} }
} }
impl Actor for DataConnManager {
type Context = Context<Self>;
}
impl Handler<StopDataBridge> for DataConnManager { impl Handler<StopDataBridge> for DataConnManager {
type Result = (); type Result = ();

View File

@@ -30,9 +30,9 @@ struct BrokerContext {
client_ca: CACertPair<'static>, client_ca: CACertPair<'static>,
server_ca: CACertPair<'static>, server_ca: CACertPair<'static>,
guestserver_ca: CACertPair<'static>, guestserver_ca: CACertPair<'static>,
scdb_addr: Addr<ServerCertDB>, servercert_db: Addr<ServerCertDB>,
pdcm_addr: Addr<PendingDataConnManager>, pendingdataconn_manager: Addr<PendingDataConnManager>,
sm_addr: Addr<ServerManager>, server_manager: Addr<ServerManager>,
} }
@@ -47,19 +47,19 @@ async fn main() {
let server_ca = CACertPair::load_from_file("certs_pem/server_root_ca.pem").unwrap(); let server_ca = CACertPair::load_from_file("certs_pem/server_root_ca.pem").unwrap();
let guestserver_ca = CACertPair::load_from_file("certs_pem/guestserver_root_ca.pem").unwrap(); let guestserver_ca = CACertPair::load_from_file("certs_pem/guestserver_root_ca.pem").unwrap();
// Load Actors // Load Actors
let scdb_addr = ServerCertDB::new().start(); let servercert_db = ServerCertDB::new().start();
let dcm_addr = DataConnManager::new().start(); let dataconn_manager = DataConnManager::new().start();
let pdcm_addr = PendingDataConnManager::new(dcm_addr).start(); let pendingdataconn_manager = PendingDataConnManager::new(dataconn_manager).start();
let sm_addr = ServerManager::new(pdcm_addr.clone()).start(); let server_manager = ServerManager::new(pendingdataconn_manager.clone()).start();
// Create Context // Create Context
let ctx = Arc::new(BrokerContext { let ctx = Arc::new(BrokerContext {
broker_leaf, broker_leaf,
client_ca, client_ca,
server_ca, server_ca,
guestserver_ca, guestserver_ca,
scdb_addr, servercert_db,
pdcm_addr, pendingdataconn_manager,
sm_addr, server_manager,
}); });
// Pki Client Verifier // Pki Client Verifier
@@ -109,7 +109,7 @@ async fn main() {
} }
Subscribe => { Subscribe => {
info!("Subscribe Stream"); info!("Subscribe Stream");
let name = match ctx.scdb_addr.send(FetchName { cert: peer_cert_bytes }).await.unwrap() { let name = match ctx.servercert_db.send(FetchName { cert: peer_cert_bytes }).await.unwrap() {
None => { None => {
error!("Cert has no name assigned!"); error!("Cert has no name assigned!");
let reply = ToServerConnTypeReply::GenericFailure; let reply = ToServerConnTypeReply::GenericFailure;
@@ -125,7 +125,7 @@ async fn main() {
OpenDataStream(conn_id) => { OpenDataStream(conn_id) => {
info!("OpenDataStream with {:?}", conn_id); info!("OpenDataStream with {:?}", conn_id);
let msg = RegisterStream::server(conn_id, transport); let msg = RegisterStream::server(conn_id, transport);
ctx.pdcm_addr.send(msg).await.unwrap().unwrap(); ctx.pendingdataconn_manager.send(msg).await.unwrap().unwrap();
} }
} }
} }
@@ -159,7 +159,7 @@ async fn main() {
} }
async fn server_subscribe_handler(ctx: &BrokerContext, transport: TransportStream, name: String) { async fn server_subscribe_handler(ctx: &BrokerContext, transport: TransportStream, name: String) {
match ctx.sm_addr.send(StartTransporter { name, transport }).await.unwrap() { match ctx.server_manager.send(StartTransporter { name, transport }).await.unwrap() {
Ok(_) => { Ok(_) => {
info!("Stream sent to the manager"); info!("Stream sent to the manager");
} }
@@ -184,7 +184,7 @@ async fn server_command_handler(ctx: &BrokerContext, mut transport: TransportStr
match msg { match msg {
ChangeName { name } => { ChangeName { name } => {
info!("Changing name to {}", name); info!("Changing name to {}", name);
match ctx.scdb_addr.send(UnregisterServer { cert: peer_cert_bytes.clone() }).await.unwrap() { match ctx.servercert_db.send(UnregisterServer { cert: peer_cert_bytes.clone() }).await.unwrap() {
None => { None => {
info!("Nothing to unregister"); info!("Nothing to unregister");
} }
@@ -192,7 +192,7 @@ async fn server_command_handler(ctx: &BrokerContext, mut transport: TransportStr
warn!("Unregistered from old name {}", old_name); warn!("Unregistered from old name {}", old_name);
} }
} }
let reply = match ctx.scdb_addr.send(RegisterServer { cert: peer_cert_bytes.clone(), name }).await.unwrap() { let reply = match ctx.servercert_db.send(RegisterServer { cert: peer_cert_bytes.clone(), name }).await.unwrap() {
Ok(_) => { Ok(_) => {
info!("Registered!"); info!("Registered!");
ToServerCommandReply::NameChanged ToServerCommandReply::NameChanged
@@ -206,7 +206,7 @@ async fn server_command_handler(ctx: &BrokerContext, mut transport: TransportStr
} }
WhoAmI => { WhoAmI => {
info!("Asked who I am"); info!("Asked who I am");
let reply = match ctx.scdb_addr.send(FetchName { cert: peer_cert_bytes.clone() }).await.unwrap() { let reply = match ctx.servercert_db.send(FetchName { cert: peer_cert_bytes.clone() }).await.unwrap() {
None => { None => {
info!("I'm not registered anymore!? WTF"); info!("I'm not registered anymore!? WTF");
ToServerCommandReply::GenericFailure ToServerCommandReply::GenericFailure
@@ -245,14 +245,14 @@ async fn guestserver_handler(ctx: &BrokerContext, mut transport: TransportStream
match msg { match msg {
Announce { name } => { Announce { name } => {
info!("Announced with name {}", name); info!("Announced with name {}", name);
if ctx.scdb_addr.send(IsNameRegistered { name: name.clone() }).await.unwrap() { if ctx.servercert_db.send(IsNameRegistered { name: name.clone() }).await.unwrap() {
info!("Name {} already registered!", name); info!("Name {} already registered!", name);
let reply = ToGuestServerMessage::FailedNameAlreadyOccupied; let reply = ToGuestServerMessage::FailedNameAlreadyOccupied;
transport.send(rmp_serde::to_vec(&reply).unwrap().into()).await.unwrap(); transport.send(rmp_serde::to_vec(&reply).unwrap().into()).await.unwrap();
break; // Stop reading break; // Stop reading
} else { } else {
let cert = ctx.server_ca.sign_new_cert(server_leaf_certparams(name.as_str())); let cert = ctx.server_ca.sign_new_cert(server_leaf_certparams(name.as_str()));
ctx.scdb_addr.send(RegisterServer { ctx.servercert_db.send(RegisterServer {
cert: cert.cert().to_vec(), cert: cert.cert().to_vec(),
name, name,
}).await.unwrap().unwrap(); }).await.unwrap().unwrap();
@@ -287,7 +287,7 @@ async fn client_handler(ctx: &BrokerContext, mut transport: TransportStream) {
match msg { match msg {
FromClientCommand::RequestServer { name } => { FromClientCommand::RequestServer { name } => {
info!("REQUESTED SERVER {}", name); info!("REQUESTED SERVER {}", name);
let data = ctx.sm_addr.send(RequestServer { name }).await.unwrap(); let data = ctx.server_manager.send(RequestServer { name }).await.unwrap();
match data { match data {
Ok(client_conn_id) => { Ok(client_conn_id) => {
let reply = ToClientResponse::OkRequest { conn_id: client_conn_id }; let reply = ToClientResponse::OkRequest { conn_id: client_conn_id };
@@ -301,14 +301,14 @@ async fn client_handler(ctx: &BrokerContext, mut transport: TransportStream) {
} }
FromClientCommand::ServerList => { FromClientCommand::ServerList => {
info!("Requested ServerList"); info!("Requested ServerList");
let data = ctx.sm_addr.send(GetServerList {}).await.unwrap(); let data = ctx.server_manager.send(GetServerList {}).await.unwrap();
let reply = ToClientResponse::OkServerList { data }; let reply = ToClientResponse::OkServerList { data };
transport.send(rmp_serde::to_vec(&reply).unwrap().into()).await.unwrap(); transport.send(rmp_serde::to_vec(&reply).unwrap().into()).await.unwrap();
} }
FromClientCommand::UpgradeToDataStream(conn_id) => { FromClientCommand::UpgradeToDataStream(conn_id) => {
info!("Upgrade to DataStream with conn_id {:?}", conn_id); info!("Upgrade to DataStream with conn_id {:?}", conn_id);
let msg = RegisterStream::client(conn_id, transport); let msg = RegisterStream::client(conn_id, transport);
ctx.pdcm_addr.send(msg).await.unwrap().unwrap(); ctx.pendingdataconn_manager.send(msg).await.unwrap().unwrap();
break; break;
} }
} }

View File

@@ -7,20 +7,15 @@ use libbonknet::*;
use crate::dataconnmanager::{DataConnManager, StartDataBridge}; use crate::dataconnmanager::{DataConnManager, StartDataBridge};
use crate::streamutils::*; use crate::streamutils::*;
/* //! Pending Data Connect Database is where we store the Data Connection in creation.
L'idea e' che il database deve avere una riga per ogni connessione dati in nascita. //! Every row contains:
In ogni "riga" deve essere presente: //! - The ServerConnID that the server will use (uuid)
- Il ServerConnID che il server dovra usare (pkey) //! - The ClientConnID that the client will use (uuid)
- Il ClientConnID che il client dovra usare (pkey) //! - If the Server is already connected and ready, the Server Socket
- Se gia connesso, il Socket del Server //! - If the Client is already connected and ready, the Client Socket
- Se gia connesso il Socket del Client //!
//! When a row contains a valid ServerSocket and ClientSocket, then the row is removed and the
Quando in una riga sono presenti sia ServerSocket che ClientSocket allora poppa la riga //! sockets moved to the Data Connection Manager
e usa i socket per lanciare un thread/actor che faccia il piping dei dati
Ricerca riga deve avvenire sia tramite ServerConnID che ClientConnID se essi sono diversi come pianifico
Quindi l'ideale e' non usare una collection ma andare direttamente di Vector!
*/
#[derive(Error, Debug)] #[derive(Error, Debug)]
pub enum PendingDataConnError { pub enum PendingDataConnError {
@@ -59,11 +54,13 @@ impl RegisterStream {
} }
} }
/// Record with connection ID and TransportStream that can represent a client or a server
struct SideRecord { struct SideRecord {
conn_id: Uuid, conn_id: Uuid,
transport: Option<TransportStream>, transport: Option<TransportStream>,
} }
/// A DB Record, with a Server and a Client record containind IDs and Transports
struct Record { struct Record {
server: SideRecord, server: SideRecord,
client: SideRecord, client: SideRecord,
@@ -78,16 +75,29 @@ impl Record {
} }
// TODO: every 2 minutes verify the Records that have at least one stream invalidated and drop them // TODO: every 2 minutes verify the Records that have at least one stream invalidated and drop them
/// This contains the database of records of Data Connections in the phase of creation.
/// The approach is that when DataConnection is needed, a new Pending is created together with
/// two IDs to send to the client and server and that they will use to create a new TransportStream
/// that will be then registered to the right Record.
/// When a Record has the two TransportStreams needed, the Record is completed and can be consumed
/// and used to create a DataConnection sending the ownership of the two streams to the
/// Data Connection Manager.
pub struct PendingDataConnManager { pub struct PendingDataConnManager {
db: Vec<Record>, db: Vec<Record>,
dcm_addr: Addr<DataConnManager>, dataconn_manager: Addr<DataConnManager>,
}
impl Actor for PendingDataConnManager {
type Context = Context<Self>;
} }
impl PendingDataConnManager { impl PendingDataConnManager {
pub fn new(dcm_addr: Addr<DataConnManager>) -> Self { pub fn new(dataconn_manager_addr: Addr<DataConnManager>) -> Self {
PendingDataConnManager { db: Vec::new(), dcm_addr } PendingDataConnManager { db: Vec::new(), dataconn_manager: dataconn_manager_addr }
} }
/// The Row retrieve need to use ServerConnID AND ClientConnID as keys, so the best approach is to
/// use a single Vec and check the two IDs while iterating.
fn retrieve_siderecord(&mut self, kind: &RegisterKind, conn_id: &Uuid) -> Option<&mut SideRecord> { fn retrieve_siderecord(&mut self, kind: &RegisterKind, conn_id: &Uuid) -> Option<&mut SideRecord> {
use RegisterKind::*; use RegisterKind::*;
let record = match match kind { let record = match match kind {
@@ -104,10 +114,6 @@ impl PendingDataConnManager {
} }
} }
impl Actor for PendingDataConnManager {
type Context = Context<Self>;
}
impl Handler<NewPendingConn> for PendingDataConnManager { impl Handler<NewPendingConn> for PendingDataConnManager {
type Result = Result<(), PendingDataConnError>; type Result = Result<(), PendingDataConnError>;
@@ -171,7 +177,7 @@ impl Handler<TryStartDataStream> for PendingDataConnManager {
server_transport: real_record.server.transport.unwrap(), // Safety: We are sure this is Some server_transport: real_record.server.transport.unwrap(), // Safety: We are sure this is Some
client_transport: real_record.client.transport.unwrap(), // Safety: We are sure this is Some client_transport: real_record.client.transport.unwrap(), // Safety: We are sure this is Some
}; };
self.dcm_addr.do_send(msg); self.dataconn_manager.do_send(msg);
} }
Ok(()) Ok(())
} else { } else {

View File

@@ -2,8 +2,6 @@ use std::collections::HashMap;
use actix::prelude::*; use actix::prelude::*;
use thiserror::Error; use thiserror::Error;
// TODO: Probably it's better to remove the pub from inside the structs and impl a new() funct
#[derive(Error, Debug)] #[derive(Error, Debug)]
pub enum DBError { pub enum DBError {
#[error("Certificate is already registered with name {0}")] #[error("Certificate is already registered with name {0}")]

View File

@@ -208,6 +208,7 @@ async fn main() -> std::io::Result<()> {
ServerContext { identity: server_ident, broker_root: broker_root.clone(), my_name: my_name.clone() } ServerContext { identity: server_ident, broker_root: broker_root.clone(), my_name: my_name.clone() }
} else if exists_serverident { } else if exists_serverident {
// Yes Server -> Use Server file as identity // Yes Server -> Use Server file as identity
info!("Server Identity found. Confirming...");
let server_ident = LeafCertPair::load_from_file(serverident_path).unwrap(); let server_ident = LeafCertPair::load_from_file(serverident_path).unwrap();
let ctx = ServerContext { identity: server_ident, broker_root: broker_root.clone(), my_name: my_name.clone() }; let ctx = ServerContext { identity: server_ident, broker_root: broker_root.clone(), my_name: my_name.clone() };
server_name_confirmation(&ctx).await.unwrap(); server_name_confirmation(&ctx).await.unwrap();