Complete first version of DataStream
This commit is contained in:
@@ -13,7 +13,7 @@ actix-rt = "2.9.0"
|
||||
actix-server = "2.3.0"
|
||||
actix-service = "2.0.2"
|
||||
actix-tls = { version = "3.3.0", features = ["rustls-0_22"] }
|
||||
tokio = { version = "1", features = ["io-util", "sync", "time"] }
|
||||
tokio = { version = "1", features = ["io-util", "sync", "time", "macros"] }
|
||||
rustls = "0.22.2"
|
||||
tracing = "0.1"
|
||||
tracing-subscriber = "0.3"
|
||||
|
||||
@@ -1,2 +1,108 @@
|
||||
use actix::prelude::*;
|
||||
use uuid::Uuid;
|
||||
use std::collections::HashMap;
|
||||
use std::io::Error;
|
||||
use actix_tls::accept::rustls_0_22::{TlsStream};
|
||||
use actix_rt::net::TcpStream;
|
||||
use futures::SinkExt;
|
||||
use thiserror::Error;
|
||||
use tracing::{info, error, warn, debug};
|
||||
use libbonknet::ToPeerDataStream;
|
||||
use crate::TransportStream;
|
||||
|
||||
type RawStream = TlsStream<TcpStream>;
|
||||
|
||||
#[derive(Error, Debug)]
|
||||
pub enum DataConnManagerError {
|
||||
#[error("Generic Failure")]
|
||||
GenericFailure,
|
||||
}
|
||||
|
||||
#[derive(Message)]
|
||||
#[rtype(result = "Result<(),DataConnManagerError>")]
|
||||
pub struct StartDataBridge {
|
||||
pub client_conn_id: Uuid,
|
||||
pub server_transport: TransportStream,
|
||||
pub client_transport: TransportStream,
|
||||
}
|
||||
|
||||
#[derive(Message)]
|
||||
#[rtype(result = "()")]
|
||||
pub struct StopDataBridge {
|
||||
pub client_conn_id: Uuid,
|
||||
}
|
||||
|
||||
type ClientConnId = Uuid;
|
||||
|
||||
struct Connection {
|
||||
proxyhandler: SpawnHandle,
|
||||
}
|
||||
|
||||
pub struct DataConnManager {
|
||||
conns: HashMap<ClientConnId, Connection>
|
||||
}
|
||||
|
||||
impl DataConnManager {
|
||||
pub fn new() -> Self {
|
||||
Self { conns: HashMap::new() }
|
||||
}
|
||||
}
|
||||
|
||||
impl Actor for DataConnManager {
|
||||
type Context = Context<Self>;
|
||||
}
|
||||
|
||||
impl Handler<StopDataBridge> for DataConnManager {
|
||||
type Result = ();
|
||||
|
||||
fn handle(&mut self, msg: StopDataBridge, ctx: &mut Self::Context) -> Self::Result {
|
||||
match self.conns.remove(&msg.client_conn_id) {
|
||||
None => warn!("Stopped Data Bridge {} was not in memory", msg.client_conn_id),
|
||||
Some(conn) => {
|
||||
if ctx.cancel_future(conn.proxyhandler) {
|
||||
info!("Stopped Data Bridge {}", msg.client_conn_id);
|
||||
} else {
|
||||
info!("Stopped Data Bridge {} was with dead task", msg.client_conn_id);
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Handler<StartDataBridge> for DataConnManager {
|
||||
type Result = Result<(), DataConnManagerError>;
|
||||
|
||||
fn handle(&mut self, mut msg: StartDataBridge, ctx: &mut Self::Context) -> Self::Result {
|
||||
let client_conn_id = msg.client_conn_id;
|
||||
let handler = ctx.spawn(async move {
|
||||
// Send to the streams the OK DATA OPEN message
|
||||
let okmsg = ToPeerDataStream::OkDataStreamOpen;
|
||||
if let Err(e) = tokio::try_join!(
|
||||
msg.client_transport.send(rmp_serde::to_vec(&okmsg).unwrap().into()),
|
||||
msg.server_transport.send(rmp_serde::to_vec(&okmsg).unwrap().into()),
|
||||
) {
|
||||
error!("Error during OkDataStreamOpen send: {:?}", e);
|
||||
// TODO: potrei voler trasformare questa funzione in ResponseActFuture cosi che
|
||||
// in caso ci sia fallimento su questo send l'errore possa venir riportato direttamente
|
||||
// al PendingDataConnDb senza bisogno di gestione manuale?
|
||||
// Da studiare perche non per forza c'e bisogno che il Pending sappia che c'e stato
|
||||
// fallimento in questa fase.
|
||||
} else {
|
||||
let mut client_stream = msg.client_transport.into_inner();
|
||||
let mut server_stream = msg.server_transport.into_inner();
|
||||
match tokio::io::copy_bidirectional(&mut client_stream, &mut server_stream).await {
|
||||
Ok((to_server, to_client)) => info!("DataConn closed with {to_server}B to server and {to_client}B to client"),
|
||||
Err(e) => error!("Error during DataConn: {e:?}"),
|
||||
}
|
||||
}
|
||||
msg.client_conn_id
|
||||
}.into_actor(self).map(|res, _a, c| {
|
||||
c.notify(StopDataBridge { client_conn_id: res });
|
||||
}));
|
||||
if let Some(other_conn) = self.conns.insert(client_conn_id, Connection { proxyhandler: handler }) {
|
||||
ctx.cancel_future(other_conn.proxyhandler);
|
||||
warn!("During init of Conn {client_conn_id} another connection has been found and is now closed.")
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -6,6 +6,7 @@ mod dataconnmanager;
|
||||
use servercertdb::*;
|
||||
use pendingdataconndb::*;
|
||||
use servermanager::*;
|
||||
use dataconnmanager::*;
|
||||
use actix::prelude::*;
|
||||
use std::sync::Arc;
|
||||
use libbonknet::*;
|
||||
@@ -85,7 +86,8 @@ async fn main() {
|
||||
).unwrap()).unwrap());
|
||||
|
||||
let scdb_addr = ServerCertDB::new().start();
|
||||
let pdcm_addr = PendingDataConnManager::new().start();
|
||||
let dcm_addr = DataConnManager::new().start();
|
||||
let pdcm_addr = PendingDataConnManager::new(dcm_addr.clone()).start();
|
||||
let sm_addr = ServerManager::new(pdcm_addr.clone()).start();
|
||||
|
||||
Server::build()
|
||||
@@ -114,9 +116,6 @@ async fn main() {
|
||||
let peer_certs = stream.get_ref().1.peer_certificates().unwrap();
|
||||
let peer_cert_bytes = peer_certs.first().unwrap().to_vec();
|
||||
let peer_root_cert_der = peer_certs.last().unwrap().clone();
|
||||
// let scdb_addr = scdb_addr.clone();
|
||||
// let pdcm_addr = pdcm_addr.clone();
|
||||
// let sm_addr = sm_addr.clone();
|
||||
if peer_root_cert_der == *server_root_cert_der {
|
||||
info!("Server connection");
|
||||
let mut transport = Framed::new(stream, LengthDelimitedCodec::new());
|
||||
|
||||
@@ -7,6 +7,7 @@ use tokio_util::codec::{Framed, LengthDelimitedCodec};
|
||||
use tracing::{error, info};
|
||||
use uuid::Uuid;
|
||||
use libbonknet::*;
|
||||
use crate::dataconnmanager::{DataConnManager, StartDataBridge};
|
||||
|
||||
type TransportStream = Framed<TlsStream<TcpStream>, LengthDelimitedCodec>;
|
||||
|
||||
@@ -83,11 +84,12 @@ impl Record {
|
||||
// TODO: every 2 minutes verify the Records that have at least one stream invalidated and drop them
|
||||
pub struct PendingDataConnManager {
|
||||
db: Vec<Record>,
|
||||
dcm_addr: Addr<DataConnManager>,
|
||||
}
|
||||
|
||||
impl PendingDataConnManager {
|
||||
pub fn new() -> Self {
|
||||
PendingDataConnManager { db: Vec::new() }
|
||||
pub fn new(dcm_addr: Addr<DataConnManager>) -> Self {
|
||||
PendingDataConnManager { db: Vec::new(), dcm_addr }
|
||||
}
|
||||
|
||||
fn retrieve_siderecord(&mut self, kind: &RegisterKind, conn_id: &Uuid) -> Option<&mut SideRecord> {
|
||||
@@ -161,12 +163,19 @@ impl Handler<TryStartDataStream> for PendingDataConnManager {
|
||||
Server => self.db.iter().enumerate().find(|(i, x)| x.server.conn_id == msg.conn_id),
|
||||
Client => self.db.iter().enumerate().find(|(i, x)| x.client.conn_id == msg.conn_id),
|
||||
};
|
||||
if let Some((_idx, record)) = idx {
|
||||
if let Some((idx, record)) = idx {
|
||||
if record.client.transport.is_some() && record.server.transport.is_some() {
|
||||
// TODO: Launch the "thing" that will manage the data mirroring
|
||||
info!("LAUNCHING DATA MIRRORING");
|
||||
// TODO: we can drop record and use idx to remove the record itself from the vector
|
||||
info!("Requesting Data Bridge for client_conn_id {}", record.client.conn_id);
|
||||
// We can drop record and use idx to remove the record itself from the vector
|
||||
// and then send it to another manager for the spawn of the real connection
|
||||
// This remove is necessary to drop the &Record and take full ownership of it.
|
||||
let real_record = self.db.remove(idx); // Safety: We are sure this idx is valid
|
||||
let msg = StartDataBridge {
|
||||
client_conn_id: real_record.client.conn_id,
|
||||
server_transport: real_record.server.transport.unwrap(), // Safety: We are sure this is Some
|
||||
client_transport: real_record.client.transport.unwrap(), // Safety: We are sure this is Some
|
||||
};
|
||||
self.dcm_addr.do_send(msg);
|
||||
}
|
||||
Ok(())
|
||||
} else {
|
||||
|
||||
Reference in New Issue
Block a user