Files
bonknet/bonknet_broker/src/pendingdataconndb.rs

242 lines
9.6 KiB
Rust

use actix::prelude::*;
use actix_tls::accept::rustls_0_22::TlsStream;
use futures::SinkExt;
use thiserror::Error;
use tokio::net::TcpStream;
use tokio_util::codec::{Framed, LengthDelimitedCodec};
use tracing::{error, info};
use uuid::Uuid;
use libbonknet::*;
use crate::dataconnmanager::{DataConnManager, StartDataBridge};
type TransportStream = Framed<TlsStream<TcpStream>, LengthDelimitedCodec>;
/*
L'idea e' che il database deve avere una riga per ogni connessione dati in nascita.
In ogni "riga" deve essere presente:
- Il ServerConnID che il server dovra usare (pkey)
- Il ClientConnID che il client dovra usare (pkey)
- Se gia connesso, il Socket del Server
- Se gia connesso il Socket del Client
Quando in una riga sono presenti sia ServerSocket che ClientSocket allora poppa la riga
e usa i socket per lanciare un thread/actor che faccia il piping dei dati
Ricerca riga deve avvenire sia tramite ServerConnID che ClientConnID se essi sono diversi come pianifico
Quindi l'ideale e' non usare una collection ma andare direttamente di Vector!
*/
#[derive(Error, Debug)]
pub enum PendingDataConnError {
#[error("Generic Failure")]
GenericFailure,
}
#[derive(Message)]
#[rtype(result = "Result<(),PendingDataConnError>")]
pub struct NewPendingConn {
pub server_conn_id: Uuid,
pub client_conn_id: Uuid,
}
#[derive(Debug)]
enum RegisterKind {
Server,
Client
}
#[derive(Message)]
#[rtype(result = "Result<(),PendingDataConnError>")]
pub struct RegisterStream {
kind: RegisterKind,
conn_id: Uuid,
transport: TransportStream,
}
impl RegisterStream {
pub fn server(conn_id: Uuid, transport: TransportStream) -> Self {
RegisterStream { kind: RegisterKind::Server, conn_id, transport }
}
pub fn client(conn_id: Uuid, transport: TransportStream) -> Self {
RegisterStream { kind: RegisterKind::Client, conn_id, transport }
}
}
struct SideRecord {
conn_id: Uuid,
transport: Option<TransportStream>,
}
struct Record {
server: SideRecord,
client: SideRecord,
}
impl Record {
fn new(server_conn_id: Uuid, client_conn_id: Uuid) -> Self {
let server = SideRecord { conn_id: server_conn_id, transport: None };
let client = SideRecord { conn_id: client_conn_id, transport: None };
Record { server, client }
}
}
// TODO: every 2 minutes verify the Records that have at least one stream invalidated and drop them
pub struct PendingDataConnManager {
db: Vec<Record>,
dcm_addr: Addr<DataConnManager>,
}
impl PendingDataConnManager {
pub fn new(dcm_addr: Addr<DataConnManager>) -> Self {
PendingDataConnManager { db: Vec::new(), dcm_addr }
}
fn retrieve_siderecord(&mut self, kind: &RegisterKind, conn_id: &Uuid) -> Option<&mut SideRecord> {
use RegisterKind::*;
let record = match match kind {
Server => self.db.iter_mut().find(|x| x.server.conn_id == *conn_id),
Client => self.db.iter_mut().find(|x| x.client.conn_id == *conn_id),
} {
None => return None,
Some(item) => item,
};
Some(match kind {
Server => &mut record.server,
Client => &mut record.client,
})
}
}
impl Actor for PendingDataConnManager {
type Context = Context<Self>;
}
impl Handler<NewPendingConn> for PendingDataConnManager {
type Result = Result<(), PendingDataConnError>;
fn handle(&mut self, msg: NewPendingConn, _ctx: &mut Self::Context) -> Self::Result {
let server_conn_id = msg.server_conn_id;
let client_conn_id = msg.client_conn_id;
let new_record = Record::new(server_conn_id, client_conn_id);
if self.db.iter().any(|x| {
x.server.conn_id == new_record.server.conn_id || x.client.conn_id == new_record.client.conn_id
}) {
error!("Conflicting UUIDs: server {} - client {}", server_conn_id, client_conn_id);
Err(PendingDataConnError::GenericFailure)
} else {
self.db.push(new_record);
Ok(())
}
}
}
/*
Esegui tutti i test normali in Sync.
Quando devi inviare il Accepted, notificati la cosa come AcceptStream con lo stream in suo possesso
ma stavolta con un ResponseActFuture.
Se c'e un fallimento, sposta il transport in un ctx::spawn che invii un FAILED.
Se tutto OK, checka DI NUOVO tutto, e se i check sono positivi, registra lo stream nell'Actor.
Per gestire lo Spawn di una connection, l'unica risposta e' gestire lo spawn connection come un
Message Handler a sua volta. Quando uno stream completa l'invio del suo Accepted, esso appare nel Record.
Quando il secondo stream arriva e completa il suo accepted, anch'esso viene registrato nel Record, quindi
siamo nella condizione di spawn, perche ci sono entrambi i transport nel Record.
Quindi se alla fine di un check and register ci sono entrambi gli stream, spostali entrambi fuori, droppa
il record e invia i due transport ad un terzo actor, su un altro Arbiter, che esegua il tokio::io::copy e
gestisca le connessioni aperte.
*/
#[derive(Message)]
#[rtype(result = "Result<(),PendingDataConnError>")]
struct TryStartDataStream {
kind: RegisterKind,
conn_id: Uuid,
}
impl Handler<TryStartDataStream> for PendingDataConnManager {
type Result = Result<(), PendingDataConnError>;
fn handle(&mut self, msg: TryStartDataStream, _ctx: &mut Self::Context) -> Self::Result {
use RegisterKind::*;
let idx = match msg.kind {
Server => self.db.iter().enumerate().find(|(_i, x)| x.server.conn_id == msg.conn_id),
Client => self.db.iter().enumerate().find(|(_i, x)| x.client.conn_id == msg.conn_id),
};
if let Some((idx, record)) = idx {
if record.client.transport.is_some() && record.server.transport.is_some() {
info!("Requesting Data Bridge for client_conn_id {}", record.client.conn_id);
// We can drop record and use idx to remove the record itself from the vector
// and then send it to another manager for the spawn of the real connection
// This remove is necessary to drop the &Record and take full ownership of it.
let real_record = self.db.remove(idx); // Safety: We are sure this idx is valid
let msg = StartDataBridge {
client_conn_id: real_record.client.conn_id,
server_transport: real_record.server.transport.unwrap(), // Safety: We are sure this is Some
client_transport: real_record.client.transport.unwrap(), // Safety: We are sure this is Some
};
self.dcm_addr.do_send(msg);
}
Ok(())
} else {
Err(PendingDataConnError::GenericFailure)
}
}
}
impl Handler<RegisterStream> for PendingDataConnManager {
type Result = ResponseActFuture<Self, Result<(), PendingDataConnError>>;
fn handle(&mut self, msg: RegisterStream, _ctx: &mut Self::Context) -> Self::Result {
let side_record = match self.retrieve_siderecord(&msg.kind, &msg.conn_id) {
None => {
error!("Found no connection with {:?} conn_id {:?}", msg.kind, msg.conn_id);
return Box::pin(fut::err(PendingDataConnError::GenericFailure));
}
Some(item) => item,
};
if side_record.transport.is_some() {
// TODO: It can be good to check if the connection is still open, if not, drop and use the new one.
error!("Connection already with a socket!");
Box::pin(fut::err(PendingDataConnError::GenericFailure))
} else {
// This Fut will send the Accepted and only then, register the transport stream
// in the Manager. If during the registration there are all the transport in places,
// you can start the datapiping
Box::pin(async move {
let mut transport = msg.transport;
let reply = ToPeerDataStream::OkDataStreamRequestAccepted;
let res = transport.send(rmp_serde::to_vec(&reply).unwrap().into()).await;
(transport, res)
}.into_actor(self).map(move |(transport, res), a, c| {
match res {
Ok(_) => {
// TODO: to not do the check twice I can put a "lock variable" inside the record
// to prevent the put in Accept of another stream while we are waiting to send the
// accept message
let side_record = match a.retrieve_siderecord(&msg.kind, &msg.conn_id) {
None => {
error!("Found no connection with {:?} conn_id {:?}", msg.kind, msg.conn_id);
return Err(PendingDataConnError::GenericFailure);
}
Some(item) => item,
};
if side_record.transport.is_some() {
// TODO: It can be good to check if the connection is still open, if not, drop and use the new one.
error!("Connection already with a socket!");
return Err(PendingDataConnError::GenericFailure);
}
side_record.transport = Some(transport);
c.notify(TryStartDataStream { kind: msg.kind, conn_id: msg.conn_id });
Ok(())
}
Err(e) => {
error!("Error during OkDataStreamRequestAccepted sending: {:?}", e);
Err(PendingDataConnError::GenericFailure)
}
}
}))
}
}
}