From 467e2b6d211c6a6e21ec2844f3c0c8066c1dcf8d Mon Sep 17 00:00:00 2001 From: "Federico Pasqua (eisterman)" Date: Wed, 21 Feb 2024 17:54:59 +0100 Subject: [PATCH] Complete first version of DataStream --- bonknet_broker/Cargo.toml | 2 +- bonknet_broker/src/dataconnmanager.rs | 106 ++++++++++++++++++++++++ bonknet_broker/src/main.rs | 7 +- bonknet_broker/src/pendingdataconndb.rs | 21 +++-- 4 files changed, 125 insertions(+), 11 deletions(-) diff --git a/bonknet_broker/Cargo.toml b/bonknet_broker/Cargo.toml index 09a5e66..3e500dc 100644 --- a/bonknet_broker/Cargo.toml +++ b/bonknet_broker/Cargo.toml @@ -13,7 +13,7 @@ actix-rt = "2.9.0" actix-server = "2.3.0" actix-service = "2.0.2" actix-tls = { version = "3.3.0", features = ["rustls-0_22"] } -tokio = { version = "1", features = ["io-util", "sync", "time"] } +tokio = { version = "1", features = ["io-util", "sync", "time", "macros"] } rustls = "0.22.2" tracing = "0.1" tracing-subscriber = "0.3" diff --git a/bonknet_broker/src/dataconnmanager.rs b/bonknet_broker/src/dataconnmanager.rs index 0071c7e..9c276e4 100644 --- a/bonknet_broker/src/dataconnmanager.rs +++ b/bonknet_broker/src/dataconnmanager.rs @@ -1,2 +1,108 @@ use actix::prelude::*; +use uuid::Uuid; +use std::collections::HashMap; +use std::io::Error; +use actix_tls::accept::rustls_0_22::{TlsStream}; +use actix_rt::net::TcpStream; +use futures::SinkExt; +use thiserror::Error; +use tracing::{info, error, warn, debug}; +use libbonknet::ToPeerDataStream; +use crate::TransportStream; +type RawStream = TlsStream; + +#[derive(Error, Debug)] +pub enum DataConnManagerError { + #[error("Generic Failure")] + GenericFailure, +} + +#[derive(Message)] +#[rtype(result = "Result<(),DataConnManagerError>")] +pub struct StartDataBridge { + pub client_conn_id: Uuid, + pub server_transport: TransportStream, + pub client_transport: TransportStream, +} + +#[derive(Message)] +#[rtype(result = "()")] +pub struct StopDataBridge { + pub client_conn_id: Uuid, +} + +type ClientConnId = Uuid; + +struct Connection { + proxyhandler: SpawnHandle, +} + +pub struct DataConnManager { + conns: HashMap +} + +impl DataConnManager { + pub fn new() -> Self { + Self { conns: HashMap::new() } + } +} + +impl Actor for DataConnManager { + type Context = Context; +} + +impl Handler for DataConnManager { + type Result = (); + + fn handle(&mut self, msg: StopDataBridge, ctx: &mut Self::Context) -> Self::Result { + match self.conns.remove(&msg.client_conn_id) { + None => warn!("Stopped Data Bridge {} was not in memory", msg.client_conn_id), + Some(conn) => { + if ctx.cancel_future(conn.proxyhandler) { + info!("Stopped Data Bridge {}", msg.client_conn_id); + } else { + info!("Stopped Data Bridge {} was with dead task", msg.client_conn_id); + } + }, + } + } +} + +impl Handler for DataConnManager { + type Result = Result<(), DataConnManagerError>; + + fn handle(&mut self, mut msg: StartDataBridge, ctx: &mut Self::Context) -> Self::Result { + let client_conn_id = msg.client_conn_id; + let handler = ctx.spawn(async move { + // Send to the streams the OK DATA OPEN message + let okmsg = ToPeerDataStream::OkDataStreamOpen; + if let Err(e) = tokio::try_join!( + msg.client_transport.send(rmp_serde::to_vec(&okmsg).unwrap().into()), + msg.server_transport.send(rmp_serde::to_vec(&okmsg).unwrap().into()), + ) { + error!("Error during OkDataStreamOpen send: {:?}", e); + // TODO: potrei voler trasformare questa funzione in ResponseActFuture cosi che + // in caso ci sia fallimento su questo send l'errore possa venir riportato direttamente + // al PendingDataConnDb senza bisogno di gestione manuale? + // Da studiare perche non per forza c'e bisogno che il Pending sappia che c'e stato + // fallimento in questa fase. + } else { + let mut client_stream = msg.client_transport.into_inner(); + let mut server_stream = msg.server_transport.into_inner(); + match tokio::io::copy_bidirectional(&mut client_stream, &mut server_stream).await { + Ok((to_server, to_client)) => info!("DataConn closed with {to_server}B to server and {to_client}B to client"), + Err(e) => error!("Error during DataConn: {e:?}"), + } + } + msg.client_conn_id + }.into_actor(self).map(|res, _a, c| { + c.notify(StopDataBridge { client_conn_id: res }); + })); + if let Some(other_conn) = self.conns.insert(client_conn_id, Connection { proxyhandler: handler }) { + ctx.cancel_future(other_conn.proxyhandler); + warn!("During init of Conn {client_conn_id} another connection has been found and is now closed.") + } + Ok(()) + } +} \ No newline at end of file diff --git a/bonknet_broker/src/main.rs b/bonknet_broker/src/main.rs index 58428e2..5b75c30 100644 --- a/bonknet_broker/src/main.rs +++ b/bonknet_broker/src/main.rs @@ -6,6 +6,7 @@ mod dataconnmanager; use servercertdb::*; use pendingdataconndb::*; use servermanager::*; +use dataconnmanager::*; use actix::prelude::*; use std::sync::Arc; use libbonknet::*; @@ -85,7 +86,8 @@ async fn main() { ).unwrap()).unwrap()); let scdb_addr = ServerCertDB::new().start(); - let pdcm_addr = PendingDataConnManager::new().start(); + let dcm_addr = DataConnManager::new().start(); + let pdcm_addr = PendingDataConnManager::new(dcm_addr.clone()).start(); let sm_addr = ServerManager::new(pdcm_addr.clone()).start(); Server::build() @@ -114,9 +116,6 @@ async fn main() { let peer_certs = stream.get_ref().1.peer_certificates().unwrap(); let peer_cert_bytes = peer_certs.first().unwrap().to_vec(); let peer_root_cert_der = peer_certs.last().unwrap().clone(); - // let scdb_addr = scdb_addr.clone(); - // let pdcm_addr = pdcm_addr.clone(); - // let sm_addr = sm_addr.clone(); if peer_root_cert_der == *server_root_cert_der { info!("Server connection"); let mut transport = Framed::new(stream, LengthDelimitedCodec::new()); diff --git a/bonknet_broker/src/pendingdataconndb.rs b/bonknet_broker/src/pendingdataconndb.rs index 9158783..d4125eb 100644 --- a/bonknet_broker/src/pendingdataconndb.rs +++ b/bonknet_broker/src/pendingdataconndb.rs @@ -7,6 +7,7 @@ use tokio_util::codec::{Framed, LengthDelimitedCodec}; use tracing::{error, info}; use uuid::Uuid; use libbonknet::*; +use crate::dataconnmanager::{DataConnManager, StartDataBridge}; type TransportStream = Framed, LengthDelimitedCodec>; @@ -83,11 +84,12 @@ impl Record { // TODO: every 2 minutes verify the Records that have at least one stream invalidated and drop them pub struct PendingDataConnManager { db: Vec, + dcm_addr: Addr, } impl PendingDataConnManager { - pub fn new() -> Self { - PendingDataConnManager { db: Vec::new() } + pub fn new(dcm_addr: Addr) -> Self { + PendingDataConnManager { db: Vec::new(), dcm_addr } } fn retrieve_siderecord(&mut self, kind: &RegisterKind, conn_id: &Uuid) -> Option<&mut SideRecord> { @@ -161,12 +163,19 @@ impl Handler for PendingDataConnManager { Server => self.db.iter().enumerate().find(|(i, x)| x.server.conn_id == msg.conn_id), Client => self.db.iter().enumerate().find(|(i, x)| x.client.conn_id == msg.conn_id), }; - if let Some((_idx, record)) = idx { + if let Some((idx, record)) = idx { if record.client.transport.is_some() && record.server.transport.is_some() { - // TODO: Launch the "thing" that will manage the data mirroring - info!("LAUNCHING DATA MIRRORING"); - // TODO: we can drop record and use idx to remove the record itself from the vector + info!("Requesting Data Bridge for client_conn_id {}", record.client.conn_id); + // We can drop record and use idx to remove the record itself from the vector // and then send it to another manager for the spawn of the real connection + // This remove is necessary to drop the &Record and take full ownership of it. + let real_record = self.db.remove(idx); // Safety: We are sure this idx is valid + let msg = StartDataBridge { + client_conn_id: real_record.client.conn_id, + server_transport: real_record.server.transport.unwrap(), // Safety: We are sure this is Some + client_transport: real_record.client.transport.unwrap(), // Safety: We are sure this is Some + }; + self.dcm_addr.do_send(msg); } Ok(()) } else {