Compare commits
6 Commits
9e3d4c5fe3
...
467e2b6d21
| Author | SHA1 | Date | |
|---|---|---|---|
| 467e2b6d21 | |||
| 83c7a95414 | |||
| 69a37ae89a | |||
| 37cc133d7f | |||
| f8feb9db81 | |||
| 37c76aba22 |
1365
Cargo.lock
generated
1365
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
@@ -13,14 +13,18 @@ actix-rt = "2.9.0"
|
||||
actix-server = "2.3.0"
|
||||
actix-service = "2.0.2"
|
||||
actix-tls = { version = "3.3.0", features = ["rustls-0_22"] }
|
||||
tokio = { version = "1", features = ["io-util", "sync", "time", "macros"] }
|
||||
rustls = "0.22.2"
|
||||
tracing = "0.1"
|
||||
tracing-subscriber = "0.3"
|
||||
futures = "0.3"
|
||||
thiserror = "1.0.56"
|
||||
tokio-util = { version = "0.7.10", features = ["codec"] }
|
||||
serde = "1"
|
||||
rmp-serde = "1.1.2"
|
||||
rcgen = { version = "0.12.1", features = ["x509-parser"] }
|
||||
rand = "0.8.5"
|
||||
uuid = { version = "1.7.0", features = ["v4", "serde"] }
|
||||
|
||||
[[bin]]
|
||||
name = "init_certs"
|
||||
|
||||
108
bonknet_broker/src/dataconnmanager.rs
Normal file
108
bonknet_broker/src/dataconnmanager.rs
Normal file
@@ -0,0 +1,108 @@
|
||||
use actix::prelude::*;
|
||||
use uuid::Uuid;
|
||||
use std::collections::HashMap;
|
||||
use std::io::Error;
|
||||
use actix_tls::accept::rustls_0_22::{TlsStream};
|
||||
use actix_rt::net::TcpStream;
|
||||
use futures::SinkExt;
|
||||
use thiserror::Error;
|
||||
use tracing::{info, error, warn, debug};
|
||||
use libbonknet::ToPeerDataStream;
|
||||
use crate::TransportStream;
|
||||
|
||||
type RawStream = TlsStream<TcpStream>;
|
||||
|
||||
#[derive(Error, Debug)]
|
||||
pub enum DataConnManagerError {
|
||||
#[error("Generic Failure")]
|
||||
GenericFailure,
|
||||
}
|
||||
|
||||
#[derive(Message)]
|
||||
#[rtype(result = "Result<(),DataConnManagerError>")]
|
||||
pub struct StartDataBridge {
|
||||
pub client_conn_id: Uuid,
|
||||
pub server_transport: TransportStream,
|
||||
pub client_transport: TransportStream,
|
||||
}
|
||||
|
||||
#[derive(Message)]
|
||||
#[rtype(result = "()")]
|
||||
pub struct StopDataBridge {
|
||||
pub client_conn_id: Uuid,
|
||||
}
|
||||
|
||||
type ClientConnId = Uuid;
|
||||
|
||||
struct Connection {
|
||||
proxyhandler: SpawnHandle,
|
||||
}
|
||||
|
||||
pub struct DataConnManager {
|
||||
conns: HashMap<ClientConnId, Connection>
|
||||
}
|
||||
|
||||
impl DataConnManager {
|
||||
pub fn new() -> Self {
|
||||
Self { conns: HashMap::new() }
|
||||
}
|
||||
}
|
||||
|
||||
impl Actor for DataConnManager {
|
||||
type Context = Context<Self>;
|
||||
}
|
||||
|
||||
impl Handler<StopDataBridge> for DataConnManager {
|
||||
type Result = ();
|
||||
|
||||
fn handle(&mut self, msg: StopDataBridge, ctx: &mut Self::Context) -> Self::Result {
|
||||
match self.conns.remove(&msg.client_conn_id) {
|
||||
None => warn!("Stopped Data Bridge {} was not in memory", msg.client_conn_id),
|
||||
Some(conn) => {
|
||||
if ctx.cancel_future(conn.proxyhandler) {
|
||||
info!("Stopped Data Bridge {}", msg.client_conn_id);
|
||||
} else {
|
||||
info!("Stopped Data Bridge {} was with dead task", msg.client_conn_id);
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Handler<StartDataBridge> for DataConnManager {
|
||||
type Result = Result<(), DataConnManagerError>;
|
||||
|
||||
fn handle(&mut self, mut msg: StartDataBridge, ctx: &mut Self::Context) -> Self::Result {
|
||||
let client_conn_id = msg.client_conn_id;
|
||||
let handler = ctx.spawn(async move {
|
||||
// Send to the streams the OK DATA OPEN message
|
||||
let okmsg = ToPeerDataStream::OkDataStreamOpen;
|
||||
if let Err(e) = tokio::try_join!(
|
||||
msg.client_transport.send(rmp_serde::to_vec(&okmsg).unwrap().into()),
|
||||
msg.server_transport.send(rmp_serde::to_vec(&okmsg).unwrap().into()),
|
||||
) {
|
||||
error!("Error during OkDataStreamOpen send: {:?}", e);
|
||||
// TODO: potrei voler trasformare questa funzione in ResponseActFuture cosi che
|
||||
// in caso ci sia fallimento su questo send l'errore possa venir riportato direttamente
|
||||
// al PendingDataConnDb senza bisogno di gestione manuale?
|
||||
// Da studiare perche non per forza c'e bisogno che il Pending sappia che c'e stato
|
||||
// fallimento in questa fase.
|
||||
} else {
|
||||
let mut client_stream = msg.client_transport.into_inner();
|
||||
let mut server_stream = msg.server_transport.into_inner();
|
||||
match tokio::io::copy_bidirectional(&mut client_stream, &mut server_stream).await {
|
||||
Ok((to_server, to_client)) => info!("DataConn closed with {to_server}B to server and {to_client}B to client"),
|
||||
Err(e) => error!("Error during DataConn: {e:?}"),
|
||||
}
|
||||
}
|
||||
msg.client_conn_id
|
||||
}.into_actor(self).map(|res, _a, c| {
|
||||
c.notify(StopDataBridge { client_conn_id: res });
|
||||
}));
|
||||
if let Some(other_conn) = self.conns.insert(client_conn_id, Connection { proxyhandler: handler }) {
|
||||
ctx.cancel_future(other_conn.proxyhandler);
|
||||
warn!("During init of Conn {client_conn_id} another connection has been found and is now closed.")
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@@ -1,7 +1,15 @@
|
||||
mod servercertdb;
|
||||
mod pendingdataconndb;
|
||||
mod servermanager;
|
||||
mod dataconnmanager;
|
||||
|
||||
use servercertdb::*;
|
||||
use pendingdataconndb::*;
|
||||
use servermanager::*;
|
||||
use dataconnmanager::*;
|
||||
use actix::prelude::*;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use libbonknet::{load_cert, load_prkey, FromServerMessage, RequiredReplyValues, FromGuestServerMessage, ToGuestServerMessage};
|
||||
use libbonknet::*;
|
||||
use rustls::{RootCertStore, ServerConfig};
|
||||
use rustls::server::WebPkiClientVerifier;
|
||||
use actix_tls::accept::rustls_0_22::{Acceptor as RustlsAcceptor, TlsStream};
|
||||
@@ -9,10 +17,14 @@ use actix_server::Server;
|
||||
use actix_rt::net::TcpStream;
|
||||
use actix_service::{ServiceFactoryExt as _};
|
||||
use futures::{StreamExt, SinkExt};
|
||||
use thiserror::Error;
|
||||
use tokio_util::codec::{Framed, LengthDelimitedCodec};
|
||||
use tracing::{info, error};
|
||||
use tokio_util::codec::{Framed, FramedRead, FramedWrite, LengthDelimitedCodec};
|
||||
use tracing::{info, error, warn};
|
||||
use rcgen::{Certificate, CertificateParams, DnType, KeyPair};
|
||||
use tokio::io::{ReadHalf, WriteHalf};
|
||||
|
||||
type TransportStream = Framed<TlsStream<TcpStream>, LengthDelimitedCodec>;
|
||||
type TransportStreamTx = FramedWrite<WriteHalf<TlsStream<TcpStream>>, LengthDelimitedCodec>;
|
||||
type TransportStreamRx = FramedRead<ReadHalf<TlsStream<TcpStream>>, LengthDelimitedCodec>;
|
||||
|
||||
struct ServerCert {
|
||||
cert: Vec<u8>,
|
||||
@@ -21,7 +33,7 @@ struct ServerCert {
|
||||
|
||||
fn generate_server_cert(root_cert: &Certificate, name: &str) -> ServerCert {
|
||||
let mut params = CertificateParams::new(vec!["entity.other.host".into(), format!("bonk.server.{name}")]);
|
||||
params.distinguished_name.push(DnType::CommonName, format!("{name}"));
|
||||
params.distinguished_name.push(DnType::CommonName, name);
|
||||
params.use_authority_key_identifier_extension = true;
|
||||
params.key_usages.push(rcgen::KeyUsagePurpose::DigitalSignature);
|
||||
params
|
||||
@@ -33,86 +45,6 @@ fn generate_server_cert(root_cert: &Certificate, name: &str) -> ServerCert {
|
||||
ServerCert { cert, prkey }
|
||||
}
|
||||
|
||||
#[derive(Error, Debug)]
|
||||
enum DBError {
|
||||
#[error("Certificate is already registered with name {0}")]
|
||||
CertAlreadyRegistered(String),
|
||||
// #[error("Generic Failure")]
|
||||
// GenericFailure,
|
||||
}
|
||||
|
||||
#[derive(Message)]
|
||||
#[rtype(result = "bool")]
|
||||
struct IsNameRegistered {
|
||||
name: String,
|
||||
}
|
||||
|
||||
#[derive(Message)]
|
||||
#[rtype(result = "Result<(), DBError>")]
|
||||
struct RegisterServer {
|
||||
cert: Vec<u8>,
|
||||
name: String,
|
||||
}
|
||||
|
||||
// TODO: Move into Sqlite DB with unique check on col1 and col2!!!! Right now name is not unique
|
||||
struct ServerCertDB {
|
||||
db: HashMap<Vec<u8>, String>, // Cert to Name
|
||||
}
|
||||
|
||||
impl Actor for ServerCertDB {
|
||||
type Context = Context<Self>;
|
||||
}
|
||||
|
||||
impl Handler<RegisterServer> for ServerCertDB {
|
||||
type Result = Result<(), DBError>;
|
||||
|
||||
fn handle(&mut self, msg: RegisterServer, _ctx: &mut Self::Context) -> Self::Result {
|
||||
match self.db.get(&msg.cert) {
|
||||
None => {
|
||||
self.db.insert(msg.cert, msg.name);
|
||||
Ok(())
|
||||
}
|
||||
Some(name) => {
|
||||
Err(DBError::CertAlreadyRegistered(name.clone()))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Handler<IsNameRegistered> for ServerCertDB {
|
||||
type Result = bool;
|
||||
|
||||
fn handle(&mut self, msg: IsNameRegistered, _ctx: &mut Self::Context) -> Self::Result {
|
||||
self.db.values().any(|x| *x == msg.name)
|
||||
}
|
||||
}
|
||||
|
||||
struct GuestServerConnection {
|
||||
stream: TlsStream<TcpStream>,
|
||||
}
|
||||
|
||||
impl Actor for GuestServerConnection {
|
||||
type Context = Context<Self>;
|
||||
}
|
||||
|
||||
struct ServerConnection<T: 'static> {
|
||||
stream: Framed<TlsStream<TcpStream>, T>,
|
||||
name: String
|
||||
}
|
||||
|
||||
impl<T> ServerConnection<T> {
|
||||
fn new(stream: TlsStream<TcpStream>, codec: T) -> Self {
|
||||
let stream = Framed::new(stream, codec);
|
||||
ServerConnection {
|
||||
stream,
|
||||
name: "Polnareffland1".into(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Actor for ServerConnection<T> {
|
||||
type Context = Context<Self>;
|
||||
}
|
||||
|
||||
#[actix_rt::main]
|
||||
async fn main() {
|
||||
@@ -149,13 +81,14 @@ async fn main() {
|
||||
let client_root_cert_der = Arc::new(client_root_cert_der);
|
||||
let guestserver_root_cert_der = Arc::new(guestserver_root_cert_der);
|
||||
let server_root_cert = Arc::new(Certificate::from_params(CertificateParams::from_ca_cert_der(
|
||||
&*server_root_cert_der,
|
||||
&server_root_cert_der,
|
||||
server_root_prkey
|
||||
).unwrap()).unwrap());
|
||||
|
||||
let server_db_addr = ServerCertDB {
|
||||
db: HashMap::new(),
|
||||
}.start();
|
||||
let scdb_addr = ServerCertDB::new().start();
|
||||
let dcm_addr = DataConnManager::new().start();
|
||||
let pdcm_addr = PendingDataConnManager::new(dcm_addr.clone()).start();
|
||||
let sm_addr = ServerManager::new(pdcm_addr.clone()).start();
|
||||
|
||||
Server::build()
|
||||
.bind("server-command", ("localhost", 2541), move || {
|
||||
@@ -163,7 +96,9 @@ async fn main() {
|
||||
let client_root_cert_der = Arc::clone(&client_root_cert_der);
|
||||
let guestserver_root_cert_der = Arc::clone(&guestserver_root_cert_der);
|
||||
let server_root_cert = Arc::clone(&server_root_cert);
|
||||
let server_db_addr = server_db_addr.clone();
|
||||
let scdb_addr = scdb_addr.clone();
|
||||
let pdcm_addr = pdcm_addr.clone();
|
||||
let sm_addr = sm_addr.clone();
|
||||
|
||||
// Set up TLS service factory
|
||||
server_acceptor
|
||||
@@ -174,50 +109,162 @@ async fn main() {
|
||||
let client_root_cert_der = Arc::clone(&client_root_cert_der);
|
||||
let guestserver_root_cert_der = Arc::clone(&guestserver_root_cert_der);
|
||||
let server_root_cert = Arc::clone(&server_root_cert);
|
||||
let server_db_addr = server_db_addr.clone();
|
||||
let scdb_addr = scdb_addr.clone();
|
||||
let pdcm_addr = pdcm_addr.clone();
|
||||
let sm_addr = sm_addr.clone();
|
||||
async move {
|
||||
let peer_cert_der = stream.get_ref().1.peer_certificates().unwrap().last().unwrap().clone();
|
||||
if peer_cert_der == *server_root_cert_der {
|
||||
let peer_certs = stream.get_ref().1.peer_certificates().unwrap();
|
||||
let peer_cert_bytes = peer_certs.first().unwrap().to_vec();
|
||||
let peer_root_cert_der = peer_certs.last().unwrap().clone();
|
||||
if peer_root_cert_der == *server_root_cert_der {
|
||||
info!("Server connection");
|
||||
let framed = Framed::new(stream, LengthDelimitedCodec::new());
|
||||
framed.for_each(|item| async move {
|
||||
match item {
|
||||
let mut transport = Framed::new(stream, LengthDelimitedCodec::new());
|
||||
match transport.next().await {
|
||||
None => {
|
||||
info!("Connection closed by peer");
|
||||
}
|
||||
Some(item) => match item {
|
||||
Ok(buf) => {
|
||||
use FromServerMessage::*;
|
||||
let msg: FromServerMessage = rmp_serde::from_slice(&buf).unwrap();
|
||||
use FromServerConnTypeMessage::*;
|
||||
let msg: FromServerConnTypeMessage = rmp_serde::from_slice(&buf).unwrap();
|
||||
info!("{:?}", msg);
|
||||
match msg {
|
||||
RequiredReply(v) => match v {
|
||||
RequiredReplyValues::Ok => {
|
||||
info!("Required Reply OK")
|
||||
SendCommand => {
|
||||
info!("SendCommand Stream");
|
||||
let reply = ToServerConnTypeReply::OkSendCommand;
|
||||
transport.send(rmp_serde::to_vec(&reply).unwrap().into()).await.unwrap();
|
||||
server_command_handler(transport, peer_cert_bytes, scdb_addr).await;
|
||||
}
|
||||
RequiredReplyValues::GenericFailure { .. } => {
|
||||
info!("Required Reply Generic Failure")
|
||||
Subscribe => {
|
||||
info!("Subscribe Stream");
|
||||
let name = match scdb_addr.send(FetchName { cert: peer_cert_bytes }).await.unwrap() {
|
||||
None => {
|
||||
error!("Cert has no name assigned!");
|
||||
let reply = ToServerConnTypeReply::GenericFailure;
|
||||
transport.send(rmp_serde::to_vec(&reply).unwrap().into()).await.unwrap();
|
||||
return Ok(());
|
||||
}
|
||||
Some(name) => name,
|
||||
};
|
||||
let reply = ToServerConnTypeReply::OkSubscribe;
|
||||
transport.send(rmp_serde::to_vec(&reply).unwrap().into()).await.unwrap();
|
||||
server_subscribe_handler(transport, name, sm_addr).await;
|
||||
}
|
||||
OpenDataStream(conn_id) => {
|
||||
info!("OpenDataStream with {:?}", conn_id);
|
||||
let msg = RegisterStream::server(conn_id, transport);
|
||||
pdcm_addr.send(msg).await.unwrap().unwrap();
|
||||
}
|
||||
}
|
||||
ChangeName { name } => {
|
||||
info!("Requested Change Name to Name {}", name);
|
||||
}
|
||||
WhoAmI => {
|
||||
info!("Requested WhoAmI");
|
||||
}
|
||||
}
|
||||
},
|
||||
Err(e) => {
|
||||
info!("Disconnection: {:?}", e);
|
||||
}
|
||||
}
|
||||
}).await;
|
||||
info!("Disconnection!");
|
||||
} else if peer_cert_der == *guestserver_root_cert_der {
|
||||
}
|
||||
info!("Server Task terminated!");
|
||||
} else if peer_root_cert_der == *guestserver_root_cert_der {
|
||||
info!("GuestServer connection");
|
||||
let server_root_cert = Arc::clone(&server_root_cert);
|
||||
let codec = LengthDelimitedCodec::new();
|
||||
let mut transport = Framed::new(stream, codec);
|
||||
let transport = Framed::new(stream, codec);
|
||||
guestserver_handler(transport, scdb_addr, &server_root_cert).await;
|
||||
} else if peer_root_cert_der == *client_root_cert_der {
|
||||
info!("Client connection");
|
||||
let codec = LengthDelimitedCodec::new();
|
||||
let transport = Framed::new(stream, codec);
|
||||
client_handler(transport, sm_addr, pdcm_addr).await;
|
||||
} else {
|
||||
error!("Unknown Root Certificate");
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
})
|
||||
}).unwrap()
|
||||
.workers(1)
|
||||
.run()
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
async fn server_subscribe_handler(transport: TransportStream, name: String, sm_addr: Addr<ServerManager>) {
|
||||
match sm_addr.send(StartTransporter { name, transport }).await.unwrap() {
|
||||
Ok(_) => {
|
||||
info!("Stream sent to the manager");
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Error from manager: {:?}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn server_command_handler(mut transport: TransportStream, peer_cert_bytes: Vec<u8>, server_db_addr: Addr<ServerCertDB>) {
|
||||
loop {
|
||||
match transport.next().await {
|
||||
None => {
|
||||
info!("Transport returned None");
|
||||
break;
|
||||
}
|
||||
Some(item) => match item {
|
||||
Ok(buf) => {
|
||||
use FromServerCommandMessage::*;
|
||||
let msg: FromServerCommandMessage = rmp_serde::from_slice(&buf).unwrap();
|
||||
info!("{:?}", msg);
|
||||
match msg {
|
||||
ChangeName { name } => {
|
||||
info!("Changing name to {}", name);
|
||||
match server_db_addr.send(UnregisterServer { cert: peer_cert_bytes.clone() }).await.unwrap() {
|
||||
None => {
|
||||
info!("Nothing to unregister");
|
||||
}
|
||||
Some(old_name) => {
|
||||
warn!("Unregistered from old name {}", old_name);
|
||||
}
|
||||
}
|
||||
let reply = match server_db_addr.send(RegisterServer { cert: peer_cert_bytes.clone(), name }).await.unwrap() {
|
||||
Ok(_) => {
|
||||
info!("Registered!");
|
||||
ToServerCommandReply::NameChanged
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Error registering: {:?}", e);
|
||||
ToServerCommandReply::GenericFailure
|
||||
}
|
||||
};
|
||||
transport.send(rmp_serde::to_vec(&reply).unwrap().into()).await.unwrap();
|
||||
}
|
||||
WhoAmI => {
|
||||
info!("Asked who I am");
|
||||
let reply = match server_db_addr.send(FetchName { cert: peer_cert_bytes.clone() }).await.unwrap() {
|
||||
None => {
|
||||
info!("I'm not registered anymore!? WTF");
|
||||
ToServerCommandReply::GenericFailure
|
||||
}
|
||||
Some(name) => {
|
||||
info!("I am {}", name);
|
||||
ToServerCommandReply::YouAre { name }
|
||||
}
|
||||
};
|
||||
transport.send(rmp_serde::to_vec(&reply).unwrap().into()).await.unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
info!("Disconnection: {:?}", e);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: Considera creare un context dove vengono contenute tutte le chiavi e gli address da dare a tutti gli handler
|
||||
async fn guestserver_handler(mut transport: TransportStream, server_db_addr: Addr<ServerCertDB>, server_root_cert: &Arc<Certificate>) {
|
||||
loop {
|
||||
match transport.next().await {
|
||||
None => {
|
||||
info!("Transport returned None");
|
||||
break;
|
||||
}
|
||||
Some(item) => {
|
||||
match item {
|
||||
@@ -234,7 +281,7 @@ async fn main() {
|
||||
transport.send(rmp_serde::to_vec(&reply).unwrap().into()).await.unwrap();
|
||||
break; // Stop reading
|
||||
} else {
|
||||
let cert = generate_server_cert(&server_root_cert, name.as_str());
|
||||
let cert = generate_server_cert(server_root_cert, name.as_str());
|
||||
server_db_addr.send(RegisterServer {
|
||||
cert: cert.cert.clone(),
|
||||
name,
|
||||
@@ -256,17 +303,55 @@ async fn main() {
|
||||
}
|
||||
}
|
||||
}
|
||||
} else if peer_cert_der == *client_root_cert_der {
|
||||
info!("Client connection");
|
||||
} else {
|
||||
error!("Unknown Root Certificate");
|
||||
}
|
||||
Ok(())
|
||||
|
||||
async fn client_handler(mut transport: TransportStream, sm_addr: Addr<ServerManager>, pdcm_addr: Addr<PendingDataConnManager>) {
|
||||
loop {
|
||||
match transport.next().await {
|
||||
None => {
|
||||
info!("Transport returned None");
|
||||
break;
|
||||
}
|
||||
Some(item) => {
|
||||
match item {
|
||||
Ok(buf) => {
|
||||
let msg: FromClientCommand = rmp_serde::from_slice(&buf).unwrap();
|
||||
info!("{:?}", msg);
|
||||
match msg {
|
||||
FromClientCommand::RequestServer { name } => {
|
||||
info!("REQUESTED SERVER {}", name);
|
||||
let data = sm_addr.send(RequestServer { name }).await.unwrap();
|
||||
match data {
|
||||
Ok(client_conn_id) => {
|
||||
let reply = ToClientResponse::OkRequest { conn_id: client_conn_id };
|
||||
transport.send(rmp_serde::to_vec(&reply).unwrap().into()).await.unwrap();
|
||||
info!("Sent OkRequest");
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Error! {:?}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
FromClientCommand::ServerList => {
|
||||
info!("Requested ServerList");
|
||||
let data = sm_addr.send(GetServerList {}).await.unwrap();
|
||||
let reply = ToClientResponse::OkServerList { data };
|
||||
transport.send(rmp_serde::to_vec(&reply).unwrap().into()).await.unwrap();
|
||||
}
|
||||
FromClientCommand::UpgradeToDataStream(conn_id) => {
|
||||
info!("Upgrade to DataStream with conn_id {:?}", conn_id);
|
||||
let msg = RegisterStream::client(conn_id, transport);
|
||||
pdcm_addr.send(msg).await.unwrap().unwrap();
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
info!("Disconnection: {:?}", e);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
}).unwrap()
|
||||
.workers(1)
|
||||
.run()
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
241
bonknet_broker/src/pendingdataconndb.rs
Normal file
241
bonknet_broker/src/pendingdataconndb.rs
Normal file
@@ -0,0 +1,241 @@
|
||||
use actix::prelude::*;
|
||||
use actix_tls::accept::rustls_0_22::TlsStream;
|
||||
use futures::SinkExt;
|
||||
use thiserror::Error;
|
||||
use tokio::net::TcpStream;
|
||||
use tokio_util::codec::{Framed, LengthDelimitedCodec};
|
||||
use tracing::{error, info};
|
||||
use uuid::Uuid;
|
||||
use libbonknet::*;
|
||||
use crate::dataconnmanager::{DataConnManager, StartDataBridge};
|
||||
|
||||
type TransportStream = Framed<TlsStream<TcpStream>, LengthDelimitedCodec>;
|
||||
|
||||
/*
|
||||
L'idea e' che il database deve avere una riga per ogni connessione dati in nascita.
|
||||
In ogni "riga" deve essere presente:
|
||||
- Il ServerConnID che il server dovra usare (pkey)
|
||||
- Il ClientConnID che il client dovra usare (pkey)
|
||||
- Se gia connesso, il Socket del Server
|
||||
- Se gia connesso il Socket del Client
|
||||
|
||||
Quando in una riga sono presenti sia ServerSocket che ClientSocket allora poppa la riga
|
||||
e usa i socket per lanciare un thread/actor che faccia il piping dei dati
|
||||
|
||||
Ricerca riga deve avvenire sia tramite ServerConnID che ClientConnID se essi sono diversi come pianifico
|
||||
Quindi l'ideale e' non usare una collection ma andare direttamente di Vector!
|
||||
*/
|
||||
|
||||
#[derive(Error, Debug)]
|
||||
pub enum PendingDataConnError {
|
||||
#[error("Generic Failure")]
|
||||
GenericFailure,
|
||||
}
|
||||
|
||||
#[derive(Message)]
|
||||
#[rtype(result = "Result<(),PendingDataConnError>")]
|
||||
pub struct NewPendingConn {
|
||||
pub server_conn_id: Uuid,
|
||||
pub client_conn_id: Uuid,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
enum RegisterKind {
|
||||
Server,
|
||||
Client
|
||||
}
|
||||
|
||||
#[derive(Message)]
|
||||
#[rtype(result = "Result<(),PendingDataConnError>")]
|
||||
pub struct RegisterStream {
|
||||
kind: RegisterKind,
|
||||
conn_id: Uuid,
|
||||
transport: TransportStream,
|
||||
}
|
||||
|
||||
impl RegisterStream {
|
||||
pub fn server(conn_id: Uuid, transport: TransportStream) -> Self {
|
||||
RegisterStream { kind: RegisterKind::Server, conn_id, transport }
|
||||
}
|
||||
|
||||
pub fn client(conn_id: Uuid, transport: TransportStream) -> Self {
|
||||
RegisterStream { kind: RegisterKind::Client, conn_id, transport }
|
||||
}
|
||||
}
|
||||
|
||||
struct SideRecord {
|
||||
conn_id: Uuid,
|
||||
transport: Option<TransportStream>,
|
||||
}
|
||||
|
||||
struct Record {
|
||||
server: SideRecord,
|
||||
client: SideRecord,
|
||||
}
|
||||
|
||||
impl Record {
|
||||
fn new(server_conn_id: Uuid, client_conn_id: Uuid) -> Self {
|
||||
let server = SideRecord { conn_id: server_conn_id, transport: None };
|
||||
let client = SideRecord { conn_id: client_conn_id, transport: None };
|
||||
Record { server, client }
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: every 2 minutes verify the Records that have at least one stream invalidated and drop them
|
||||
pub struct PendingDataConnManager {
|
||||
db: Vec<Record>,
|
||||
dcm_addr: Addr<DataConnManager>,
|
||||
}
|
||||
|
||||
impl PendingDataConnManager {
|
||||
pub fn new(dcm_addr: Addr<DataConnManager>) -> Self {
|
||||
PendingDataConnManager { db: Vec::new(), dcm_addr }
|
||||
}
|
||||
|
||||
fn retrieve_siderecord(&mut self, kind: &RegisterKind, conn_id: &Uuid) -> Option<&mut SideRecord> {
|
||||
use RegisterKind::*;
|
||||
let record = match match kind {
|
||||
Server => self.db.iter_mut().find(|x| x.server.conn_id == *conn_id),
|
||||
Client => self.db.iter_mut().find(|x| x.client.conn_id == *conn_id),
|
||||
} {
|
||||
None => return None,
|
||||
Some(item) => item,
|
||||
};
|
||||
Some(match kind {
|
||||
Server => &mut record.server,
|
||||
Client => &mut record.client,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl Actor for PendingDataConnManager {
|
||||
type Context = Context<Self>;
|
||||
}
|
||||
|
||||
impl Handler<NewPendingConn> for PendingDataConnManager {
|
||||
type Result = Result<(), PendingDataConnError>;
|
||||
|
||||
fn handle(&mut self, msg: NewPendingConn, _ctx: &mut Self::Context) -> Self::Result {
|
||||
let server_conn_id = msg.server_conn_id;
|
||||
let client_conn_id = msg.client_conn_id;
|
||||
let new_record = Record::new(server_conn_id, client_conn_id);
|
||||
if self.db.iter().any(|x| {
|
||||
x.server.conn_id == new_record.server.conn_id || x.client.conn_id == new_record.client.conn_id
|
||||
}) {
|
||||
error!("Conflicting UUIDs: server {} - client {}", server_conn_id, client_conn_id);
|
||||
Err(PendingDataConnError::GenericFailure)
|
||||
} else {
|
||||
self.db.push(new_record);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
Esegui tutti i test normali in Sync.
|
||||
Quando devi inviare il Accepted, notificati la cosa come AcceptStream con lo stream in suo possesso
|
||||
ma stavolta con un ResponseActFuture.
|
||||
Se c'e un fallimento, sposta il transport in un ctx::spawn che invii un FAILED.
|
||||
Se tutto OK, checka DI NUOVO tutto, e se i check sono positivi, registra lo stream nell'Actor.
|
||||
|
||||
Per gestire lo Spawn di una connection, l'unica risposta e' gestire lo spawn connection come un
|
||||
Message Handler a sua volta. Quando uno stream completa l'invio del suo Accepted, esso appare nel Record.
|
||||
Quando il secondo stream arriva e completa il suo accepted, anch'esso viene registrato nel Record, quindi
|
||||
siamo nella condizione di spawn, perche ci sono entrambi i transport nel Record.
|
||||
Quindi se alla fine di un check and register ci sono entrambi gli stream, spostali entrambi fuori, droppa
|
||||
il record e invia i due transport ad un terzo actor, su un altro Arbiter, che esegua il tokio::io::copy e
|
||||
gestisca le connessioni aperte.
|
||||
*/
|
||||
|
||||
#[derive(Message)]
|
||||
#[rtype(result = "Result<(),PendingDataConnError>")]
|
||||
struct TryStartDataStream {
|
||||
kind: RegisterKind,
|
||||
conn_id: Uuid,
|
||||
}
|
||||
|
||||
impl Handler<TryStartDataStream> for PendingDataConnManager {
|
||||
type Result = Result<(), PendingDataConnError>;
|
||||
|
||||
fn handle(&mut self, msg: TryStartDataStream, _ctx: &mut Self::Context) -> Self::Result {
|
||||
use RegisterKind::*;
|
||||
let idx = match msg.kind {
|
||||
Server => self.db.iter().enumerate().find(|(i, x)| x.server.conn_id == msg.conn_id),
|
||||
Client => self.db.iter().enumerate().find(|(i, x)| x.client.conn_id == msg.conn_id),
|
||||
};
|
||||
if let Some((idx, record)) = idx {
|
||||
if record.client.transport.is_some() && record.server.transport.is_some() {
|
||||
info!("Requesting Data Bridge for client_conn_id {}", record.client.conn_id);
|
||||
// We can drop record and use idx to remove the record itself from the vector
|
||||
// and then send it to another manager for the spawn of the real connection
|
||||
// This remove is necessary to drop the &Record and take full ownership of it.
|
||||
let real_record = self.db.remove(idx); // Safety: We are sure this idx is valid
|
||||
let msg = StartDataBridge {
|
||||
client_conn_id: real_record.client.conn_id,
|
||||
server_transport: real_record.server.transport.unwrap(), // Safety: We are sure this is Some
|
||||
client_transport: real_record.client.transport.unwrap(), // Safety: We are sure this is Some
|
||||
};
|
||||
self.dcm_addr.do_send(msg);
|
||||
}
|
||||
Ok(())
|
||||
} else {
|
||||
Err(PendingDataConnError::GenericFailure)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Handler<RegisterStream> for PendingDataConnManager {
|
||||
type Result = ResponseActFuture<Self, Result<(), PendingDataConnError>>;
|
||||
|
||||
fn handle(&mut self, msg: RegisterStream, _ctx: &mut Self::Context) -> Self::Result {
|
||||
let side_record = match self.retrieve_siderecord(&msg.kind, &msg.conn_id) {
|
||||
None => {
|
||||
error!("Found no connection with {:?} conn_id {:?}", msg.kind, msg.conn_id);
|
||||
return Box::pin(fut::err(PendingDataConnError::GenericFailure));
|
||||
}
|
||||
Some(item) => item,
|
||||
};
|
||||
if side_record.transport.is_some() {
|
||||
// TODO: It can be good to check if the connection is still open, if not, drop and use the new one.
|
||||
error!("Connection already with a socket!");
|
||||
Box::pin(fut::err(PendingDataConnError::GenericFailure))
|
||||
} else {
|
||||
// This Fut will send the Accepted and only then, register the transport stream
|
||||
// in the Manager. If during the registration there are all the transport in places,
|
||||
// you can start the datapiping
|
||||
Box::pin(async move {
|
||||
let mut transport = msg.transport;
|
||||
let reply = ToPeerDataStream::OkDataStreamRequestAccepted;
|
||||
let res = transport.send(rmp_serde::to_vec(&reply).unwrap().into()).await;
|
||||
(transport, res)
|
||||
}.into_actor(self).map(move |(transport, res), a, c| {
|
||||
match res {
|
||||
Ok(_) => {
|
||||
// TODO: to not do the check twice I can put a "lock variable" inside the record
|
||||
// to prevent the put in Accept of another stream while we are waiting to send the
|
||||
// accept message
|
||||
let side_record = match a.retrieve_siderecord(&msg.kind, &msg.conn_id) {
|
||||
None => {
|
||||
error!("Found no connection with {:?} conn_id {:?}", msg.kind, msg.conn_id);
|
||||
return Err(PendingDataConnError::GenericFailure);
|
||||
}
|
||||
Some(item) => item,
|
||||
};
|
||||
if side_record.transport.is_some() {
|
||||
// TODO: It can be good to check if the connection is still open, if not, drop and use the new one.
|
||||
error!("Connection already with a socket!");
|
||||
return Err(PendingDataConnError::GenericFailure);
|
||||
}
|
||||
side_record.transport = Some(transport);
|
||||
c.notify(TryStartDataStream { kind: msg.kind, conn_id: msg.conn_id });
|
||||
Ok(())
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Error during OkDataStreamRequestAccepted sending: {:?}", e);
|
||||
Err(PendingDataConnError::GenericFailure)
|
||||
}
|
||||
}
|
||||
}))
|
||||
}
|
||||
}
|
||||
}
|
||||
93
bonknet_broker/src/servercertdb.rs
Normal file
93
bonknet_broker/src/servercertdb.rs
Normal file
@@ -0,0 +1,93 @@
|
||||
use std::collections::HashMap;
|
||||
use actix::prelude::*;
|
||||
use thiserror::Error;
|
||||
|
||||
// TODO: Probably it's better to remove the pub from inside the structs and impl a new() funct
|
||||
|
||||
#[derive(Error, Debug)]
|
||||
pub enum DBError {
|
||||
#[error("Certificate is already registered with name {0}")]
|
||||
CertAlreadyRegistered(String),
|
||||
// #[error("Generic Failure")]
|
||||
// GenericFailure,
|
||||
}
|
||||
|
||||
#[derive(Message)]
|
||||
#[rtype(result = "bool")]
|
||||
pub struct IsNameRegistered {
|
||||
pub name: String,
|
||||
}
|
||||
|
||||
#[derive(Message)]
|
||||
#[rtype(result = "Result<(), DBError>")]
|
||||
pub struct RegisterServer {
|
||||
pub cert: Vec<u8>,
|
||||
pub name: String,
|
||||
}
|
||||
|
||||
#[derive(Message)]
|
||||
#[rtype(result = "Option<String>")] // None if nothing to unregister, Some if unregistered
|
||||
pub struct UnregisterServer {
|
||||
pub cert: Vec<u8>,
|
||||
}
|
||||
|
||||
#[derive(Message)]
|
||||
#[rtype(result = "Option<String>")]
|
||||
pub struct FetchName {
|
||||
pub cert: Vec<u8>,
|
||||
}
|
||||
|
||||
// TODO: Move into Sqlite DB with unique check on col1 and col2!!!! Right now name is not unique
|
||||
pub struct ServerCertDB {
|
||||
db: HashMap<Vec<u8>, String>, // Cert to Name
|
||||
}
|
||||
|
||||
impl ServerCertDB {
|
||||
pub fn new() -> Self {
|
||||
ServerCertDB { db: HashMap::new() }
|
||||
}
|
||||
}
|
||||
|
||||
impl Actor for ServerCertDB {
|
||||
type Context = Context<Self>;
|
||||
}
|
||||
|
||||
impl Handler<RegisterServer> for ServerCertDB {
|
||||
type Result = Result<(), DBError>;
|
||||
|
||||
fn handle(&mut self, msg: RegisterServer, _ctx: &mut Self::Context) -> Self::Result {
|
||||
match self.db.get(&msg.cert) {
|
||||
None => {
|
||||
self.db.insert(msg.cert, msg.name);
|
||||
Ok(())
|
||||
}
|
||||
Some(name) => {
|
||||
Err(DBError::CertAlreadyRegistered(name.clone()))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Handler<IsNameRegistered> for ServerCertDB {
|
||||
type Result = bool;
|
||||
|
||||
fn handle(&mut self, msg: IsNameRegistered, _ctx: &mut Self::Context) -> Self::Result {
|
||||
self.db.values().any(|x| *x == msg.name)
|
||||
}
|
||||
}
|
||||
|
||||
impl Handler<FetchName> for ServerCertDB {
|
||||
type Result = Option<String>;
|
||||
|
||||
fn handle(&mut self, msg: FetchName, _ctx: &mut Self::Context) -> Self::Result {
|
||||
self.db.get(&msg.cert).map(|s| s.to_owned())
|
||||
}
|
||||
}
|
||||
|
||||
impl Handler<UnregisterServer> for ServerCertDB {
|
||||
type Result = Option<String>;
|
||||
|
||||
fn handle(&mut self, msg: UnregisterServer, _ctx: &mut Self::Context) -> Self::Result {
|
||||
self.db.remove(&msg.cert)
|
||||
}
|
||||
}
|
||||
296
bonknet_broker/src/servermanager.rs
Normal file
296
bonknet_broker/src/servermanager.rs
Normal file
@@ -0,0 +1,296 @@
|
||||
use std::collections::HashMap;
|
||||
use std::io::Error;
|
||||
use std::sync::{Arc};
|
||||
use std::time::{Duration, Instant};
|
||||
use actix::prelude::*;
|
||||
use rand::random;
|
||||
use thiserror::Error;
|
||||
use futures::{SinkExt};
|
||||
use tokio::sync::{Mutex, oneshot};
|
||||
use tokio_util::bytes::{Bytes, BytesMut};
|
||||
use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec};
|
||||
use tracing::{debug, error, info};
|
||||
use libbonknet::*;
|
||||
use crate::{TransportStream, TransportStreamRx, TransportStreamTx};
|
||||
use uuid::Uuid;
|
||||
use crate::pendingdataconndb::*;
|
||||
|
||||
#[derive(MessageResponse)]
|
||||
enum SendMsgResult {
|
||||
Accepted,
|
||||
Rejected,
|
||||
}
|
||||
|
||||
#[derive(Error, Debug)]
|
||||
pub enum SendMsgError {
|
||||
#[error("Generic Failure")]
|
||||
GenericFailure,
|
||||
}
|
||||
|
||||
#[derive(Message)]
|
||||
#[rtype(result = "Result<FromServerReplyBody, SendMsgError>")]
|
||||
struct SendMsg {
|
||||
msg: ToServerMessageBody,
|
||||
}
|
||||
|
||||
struct ServerTransporter {
|
||||
rx: Option<TransportStreamRx>,
|
||||
tx: Arc<tokio::sync::Mutex<TransportStreamTx>>,
|
||||
timeout: Option<SpawnHandle>,
|
||||
reply_channels: HashMap<u64, oneshot::Sender<FromServerReplyBody>>,
|
||||
}
|
||||
|
||||
impl ServerTransporter {
|
||||
fn new(transport: TransportStream) -> Self {
|
||||
let internal = transport.into_inner();
|
||||
let (srx, stx) = tokio::io::split(internal);
|
||||
let codec = LengthDelimitedCodec::new();
|
||||
let rx = FramedRead::new(srx, codec.clone());
|
||||
let tx = FramedWrite::new(stx, codec.clone());
|
||||
ServerTransporter {
|
||||
rx: Some(rx),
|
||||
tx: Arc::new(tokio::sync::Mutex::new(tx)),
|
||||
timeout: None,
|
||||
reply_channels: HashMap::new(),
|
||||
}
|
||||
}
|
||||
|
||||
fn actor_close(&mut self, ctx: &mut Context<Self>) {
|
||||
ctx.stop();
|
||||
}
|
||||
}
|
||||
|
||||
impl Actor for ServerTransporter {
|
||||
type Context = Context<Self>;
|
||||
|
||||
fn started(&mut self, ctx: &mut Self::Context) {
|
||||
// Register Read Stream
|
||||
let rx = self.rx.take().expect("Rx Stream not found");
|
||||
ctx.add_stream(rx);
|
||||
// Register Timeout task
|
||||
self.timeout = Some(ctx.run_interval(Duration::from_secs(120), |s, c| {
|
||||
s.actor_close(c)
|
||||
}));
|
||||
// Register Send Ping Task
|
||||
ctx.run_interval(Duration::from_secs(60), |s, c| {
|
||||
let msg = ToServerMessage::Ping;
|
||||
let payload: Bytes = rmp_serde::to_vec(&msg).unwrap().into();
|
||||
let arc_tx = Arc::clone(&s.tx);
|
||||
c.spawn(async move {
|
||||
arc_tx.lock().await.send(payload).await
|
||||
}.into_actor(s).map(|res, a, ctx| {
|
||||
match res {
|
||||
Ok(_) => {
|
||||
info!("Ping sent!");
|
||||
}
|
||||
Err(_) => {
|
||||
ctx.stop();
|
||||
}
|
||||
}
|
||||
}));
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
impl Handler<SendMsg> for ServerTransporter {
|
||||
type Result = ResponseFuture<Result<FromServerReplyBody, SendMsgError>>;
|
||||
|
||||
fn handle(&mut self, msg: SendMsg, _ctx: &mut Self::Context) -> Self::Result {
|
||||
let (reply_channel_tx, reply_channel_rx) = oneshot::channel();
|
||||
let mut reply_id: u64;
|
||||
if self.reply_channels.len() == u64::MAX as usize {
|
||||
return Box::pin(fut::ready(Err(SendMsgError::GenericFailure)));
|
||||
}
|
||||
loop {
|
||||
reply_id = random();
|
||||
if !self.reply_channels.contains_key(&reply_id) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
self.reply_channels.insert(reply_id, reply_channel_tx);
|
||||
let msg = ToServerMessage::Msg {
|
||||
reply_id,
|
||||
body: msg.msg,
|
||||
};
|
||||
let payload: Bytes = rmp_serde::to_vec(&msg).unwrap().into();
|
||||
let arc_tx = self.tx.clone();
|
||||
Box::pin(async move {
|
||||
arc_tx.lock().await.send(payload).await.map_err(|e| SendMsgError::GenericFailure)?;
|
||||
info!("msg sent");
|
||||
let r = reply_channel_rx.await.unwrap();
|
||||
info!("reply received");
|
||||
Ok(r)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl StreamHandler<Result<BytesMut, Error>> for ServerTransporter {
|
||||
fn handle(&mut self, item: Result<BytesMut, Error>, ctx: &mut Self::Context) {
|
||||
match item {
|
||||
Ok(buf) => {
|
||||
use FromServerReply::*;
|
||||
let msg: FromServerReply = rmp_serde::from_slice(&buf).unwrap();
|
||||
match msg {
|
||||
Pong => {
|
||||
info!("Pong received");
|
||||
if let Some(spawn_handle) = self.timeout {
|
||||
ctx.cancel_future(spawn_handle);
|
||||
} else {
|
||||
error!("There were no spawn handle configured!");
|
||||
}
|
||||
self.timeout = Some(ctx.run_interval(Duration::from_secs(120), |s, c| {
|
||||
s.actor_close(c)
|
||||
}));
|
||||
}
|
||||
Msg { reply_id, body } => match self.reply_channels.remove(&reply_id) {
|
||||
None => {}
|
||||
Some(reply_tx) => {
|
||||
if let Err(_e) = reply_tx.send(body) {
|
||||
error!("Oneshot channel {} got invalidated! No reply sent.", reply_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
error!("ERROR {:?}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Error, Debug)]
|
||||
pub enum ServerManagerError {
|
||||
#[error("Generic Failure")]
|
||||
GenericFailure,
|
||||
}
|
||||
|
||||
#[derive(Message)]
|
||||
#[rtype(result = "Result<(),ServerManagerError>")]
|
||||
pub struct StartTransporter {
|
||||
pub name: String,
|
||||
pub transport: TransportStream,
|
||||
}
|
||||
|
||||
#[derive(Message)]
|
||||
#[rtype(result = "Vec<String>")]
|
||||
pub struct GetServerList {}
|
||||
|
||||
#[derive(Message)]
|
||||
#[rtype(result = "Result<Uuid,ServerManagerError>")] // TODO: Return Client ID with struct to give it a name
|
||||
pub struct RequestServer {
|
||||
pub name: String
|
||||
}
|
||||
|
||||
pub struct ServerManager {
|
||||
entries: Arc<Mutex<HashMap<String, Addr<ServerTransporter>>>>,
|
||||
// Name -> Addr to Actor
|
||||
pdcdb_addr: Addr<PendingDataConnManager>,
|
||||
}
|
||||
|
||||
impl ServerManager {
|
||||
pub fn new(pdcdb_addr: Addr<PendingDataConnManager>) -> Self {
|
||||
ServerManager { entries: Arc::new(Mutex::new(HashMap::new())), pdcdb_addr }
|
||||
}
|
||||
}
|
||||
|
||||
impl Actor for ServerManager {
|
||||
type Context = Context<Self>;
|
||||
|
||||
fn started(&mut self, ctx: &mut Self::Context) {
|
||||
// Remove all the ServerTransporters that are not running
|
||||
// TODO: This is a "Pull" from entries, but we can have the entries that when killed tell
|
||||
// the Manager making it more reactive!
|
||||
ctx.run_interval(Duration::from_secs(10), |s, c| {
|
||||
let start = Instant::now();
|
||||
let entries = Arc::clone(&s.entries);
|
||||
c.spawn(async move {
|
||||
let mut entries_mg = entries.lock().await;
|
||||
let mut keys_to_delete = vec![];
|
||||
for (name, st_addr) in entries_mg.iter() {
|
||||
if !st_addr.connected() {
|
||||
keys_to_delete.push(name.clone())
|
||||
}
|
||||
}
|
||||
for name in keys_to_delete {
|
||||
entries_mg.remove(&name);
|
||||
info!("Closed ServerTransporter {} for actor death", name);
|
||||
}
|
||||
debug!("Cleaned ServerManager in {:?}", Instant::now() - start);
|
||||
}.into_actor(s));
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
impl Handler<StartTransporter> for ServerManager {
|
||||
type Result = ResponseFuture<Result<(), ServerManagerError>>;
|
||||
|
||||
fn handle(&mut self, msg: StartTransporter, _ctx: &mut Self::Context) -> Self::Result {
|
||||
let mut entries = Arc::clone(&self.entries);
|
||||
Box::pin(async move {
|
||||
let mut entries_mg = entries.lock().await;
|
||||
if entries_mg.contains_key(&msg.name) {
|
||||
error!("A server called {} is already connected!", msg.name);
|
||||
return Err(ServerManagerError::GenericFailure);
|
||||
}
|
||||
let st_addr = ServerTransporter::new(msg.transport).start();
|
||||
entries_mg.insert(msg.name, st_addr);
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl Handler<GetServerList> for ServerManager {
|
||||
type Result = ResponseFuture<Vec<String>>;
|
||||
|
||||
fn handle(&mut self, _msg: GetServerList, _ctx: &mut Self::Context) -> Self::Result {
|
||||
let entries = Arc::clone(&self.entries);
|
||||
Box::pin(async move {
|
||||
let entries_mg = entries.lock().await;
|
||||
entries_mg.keys().cloned().collect()
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl Handler<RequestServer> for ServerManager {
|
||||
type Result = ResponseFuture<Result<Uuid, ServerManagerError>>;
|
||||
|
||||
fn handle(&mut self, msg: RequestServer, _ctx: &mut Self::Context) -> Self::Result {
|
||||
let name = msg.name.clone();
|
||||
let entries = Arc::clone(&self.entries);
|
||||
let pdcdb_addr = self.pdcdb_addr.clone();
|
||||
Box::pin(async move {
|
||||
let lock = entries.lock().await;
|
||||
let sh_addr = match lock.get(&name) {
|
||||
None => {
|
||||
error!("Requested server {} that isn't registered", name);
|
||||
return Err(ServerManagerError::GenericFailure);
|
||||
}
|
||||
Some(item) => item,
|
||||
};
|
||||
let server_conn_id = Uuid::new_v4();
|
||||
let client_conn_id = Uuid::new_v4();
|
||||
match pdcdb_addr.send(NewPendingConn { server_conn_id, client_conn_id }).await.unwrap() {
|
||||
Ok(_) => {
|
||||
let msg = ToServerMessageBody::Request { conn_id: server_conn_id };
|
||||
match sh_addr.send(SendMsg { msg }).await.unwrap() {
|
||||
Ok(reply) => match reply {
|
||||
FromServerReplyBody::RequestAccepted => {
|
||||
Ok(client_conn_id)
|
||||
}
|
||||
FromServerReplyBody::RequestFailed => {
|
||||
error!("Request Failed!");
|
||||
Err(ServerManagerError::GenericFailure)
|
||||
}
|
||||
FromServerReplyBody::Pong => unreachable!(),
|
||||
}
|
||||
Err(e) => {
|
||||
panic!("ERROR: {:?}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(_e) => Err(ServerManagerError::GenericFailure),
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -7,10 +7,13 @@ edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
libbonknet = { path = "../libbonknet" }
|
||||
tokio = { version = "1", features = ["full"] }
|
||||
tokio-rustls = "0.25.0"
|
||||
tokio = { version = "1", features = ["full", "tracing"] }
|
||||
tokio-util = { version = "0.7.10", features = ["codec"] }
|
||||
futures = "0.3"
|
||||
rcgen = "0.12.0"
|
||||
tokio-rustls = "0.25.0"
|
||||
rustls-pemfile = "2.0.0"
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
rmp-serde = "1.1.2"
|
||||
tracing = "0.1"
|
||||
tracing-subscriber = "0.3"
|
||||
uuid = { version = "1.7.0", features = ["serde"] }
|
||||
|
||||
@@ -1,74 +1,173 @@
|
||||
use std::io::{Error, ErrorKind};
|
||||
use std::sync::Arc;
|
||||
use futures::SinkExt;
|
||||
use std::time::Duration;
|
||||
use futures::{StreamExt, SinkExt};
|
||||
use tokio::net::TcpStream;
|
||||
use tokio_rustls::rustls::{ClientConfig, RootCertStore};
|
||||
use tokio_rustls::rustls::pki_types::{ServerName};
|
||||
use tokio_rustls::TlsConnector;
|
||||
use tokio_util::codec::{Framed, LengthDelimitedCodec};
|
||||
use serde::{Serialize, Deserialize};
|
||||
use libbonknet::{load_cert, load_prkey};
|
||||
use libbonknet::*;
|
||||
use uuid::Uuid;
|
||||
use tracing::{info, error};
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
enum ClientMessage {
|
||||
Response { status_code: u32, msg: Option<String> },
|
||||
Announce { name: String },
|
||||
Required { id: String },
|
||||
NotRequired { id: String },
|
||||
|
||||
async fn datastream(tlsconfig: ClientConfig, conn_id: Uuid) -> std::io::Result<()> {
|
||||
let connector = TlsConnector::from(Arc::new(tlsconfig.clone()));
|
||||
let dnsname = ServerName::try_from("localhost").unwrap();
|
||||
let stream = TcpStream::connect("localhost:2541").await?;
|
||||
let stream = connector.connect(dnsname, stream).await?;
|
||||
let mut transport = Framed::new(stream, LengthDelimitedCodec::new());
|
||||
|
||||
let msg = FromClientCommand::UpgradeToDataStream(conn_id);
|
||||
transport.send(rmp_serde::to_vec(&msg).unwrap().into()).await.unwrap();
|
||||
match transport.next().await {
|
||||
None => panic!("None in the transport"),
|
||||
Some(item) => match item {
|
||||
Ok(buf) => {
|
||||
use ToPeerDataStream::*;
|
||||
let msg: ToPeerDataStream = rmp_serde::from_slice(&buf).unwrap();
|
||||
match msg {
|
||||
OkDataStreamRequestAccepted => {
|
||||
info!("Data Stream Accepted. Waiting for Open...");
|
||||
}
|
||||
Refused => {
|
||||
error!("Refused");
|
||||
return Err(Error::new(ErrorKind::ConnectionRefused, "Refused"));
|
||||
}
|
||||
other => {
|
||||
error!("Unexpected response: {:?}", other);
|
||||
return Err(Error::new(ErrorKind::ConnectionRefused, "Unexpected response"));
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Error: {:?}", e);
|
||||
return Err(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
match transport.next().await {
|
||||
None => panic!("None in the transport"),
|
||||
Some(item) => match item {
|
||||
Ok(buf) => {
|
||||
use ToPeerDataStream::*;
|
||||
let msg: ToPeerDataStream = rmp_serde::from_slice(&buf).unwrap();
|
||||
match msg {
|
||||
OkDataStreamOpen => {
|
||||
info!("Data Stream Open!. Connecting Streams.");
|
||||
}
|
||||
Revoked => {
|
||||
error!("Data Stream Revoked!");
|
||||
return Err(Error::new(ErrorKind::ConnectionAborted, "Revoked"));
|
||||
}
|
||||
Refused => {
|
||||
error!("Refused");
|
||||
return Err(Error::new(ErrorKind::ConnectionRefused, "Refused"));
|
||||
}
|
||||
other => {
|
||||
error!("Unexpected response: {:?}", other);
|
||||
return Err(Error::new(ErrorKind::ConnectionRefused, "Unexpected response"));
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Error: {:?}", e);
|
||||
return Err(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
let (mut rx, mut tx) = tokio::io::split(transport.into_inner());
|
||||
let mut stdout = tokio::io::stdout();
|
||||
let mut stdin = tokio::io::stdin();
|
||||
let stdout_task = async move {
|
||||
match tokio::io::copy(&mut rx, &mut stdout).await {
|
||||
Ok(bytes_copied) => info!("{bytes_copied}"),
|
||||
Err(e) => error!("Error during copy: {e}"),
|
||||
}
|
||||
};
|
||||
let stdin_task = async move {
|
||||
match tokio::io::copy(&mut stdin, &mut tx).await {
|
||||
Ok(bytes_copied) => info!("{bytes_copied}"),
|
||||
Err(e) => error!("Error during copy: {e}"),
|
||||
}
|
||||
};
|
||||
tokio::join!(stdout_task, stdin_task);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// TODO: This is an old examples
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> std::io::Result<()> {
|
||||
let client_name = "Polnareffland1";
|
||||
// Tracing Subscriber
|
||||
let subscriber = tracing_subscriber::FmtSubscriber::new();
|
||||
tracing::subscriber::set_global_default(subscriber).unwrap();
|
||||
// Root certs to verify the server is the right one
|
||||
let mut server_root_cert_store = RootCertStore::empty();
|
||||
let server_root_cert_der = load_cert("server_root_cert.pem").unwrap();
|
||||
server_root_cert_store.add(server_root_cert_der).unwrap();
|
||||
// Auth Cert to send the server who am I
|
||||
let root_client_cert = load_cert("client_root_cert.pem").unwrap();
|
||||
let client_cert = load_cert("client_cert.pem").unwrap();
|
||||
let client_prkey = load_prkey("client_key.pem").unwrap();
|
||||
let mut broker_root_cert_store = RootCertStore::empty();
|
||||
let broker_root_cert_der = load_cert("certs/broker_root_cert.pem").unwrap();
|
||||
broker_root_cert_store.add(broker_root_cert_der).unwrap();
|
||||
// Public CA for Clients
|
||||
let root_client_cert = load_cert("certs/client_root_cert.pem").unwrap();
|
||||
// My Client Certificate for authentication
|
||||
let client_cert = load_cert("certs/client_cert.pem").unwrap();
|
||||
let client_cert_prkey = load_prkey("certs/client_key.pem").unwrap();
|
||||
// Load TLS Config
|
||||
let tlsconfig = ClientConfig::builder()
|
||||
.with_root_certificates(server_root_cert_store)
|
||||
// .with_no_client_auth();
|
||||
.with_client_auth_cert(vec![client_cert, root_client_cert], client_prkey.into())
|
||||
.with_root_certificates(broker_root_cert_store.clone())
|
||||
.with_client_auth_cert(vec![client_cert, root_client_cert], client_cert_prkey.into())
|
||||
.unwrap();
|
||||
let connector = TlsConnector::from(Arc::new(tlsconfig));
|
||||
let connector = TlsConnector::from(Arc::new(tlsconfig.clone()));
|
||||
let dnsname = ServerName::try_from("localhost").unwrap();
|
||||
|
||||
let stream = TcpStream::connect("localhost:2541").await?;
|
||||
let stream = connector.connect(dnsname, stream).await?;
|
||||
|
||||
let mut transport = Framed::new(stream, LengthDelimitedCodec::new());
|
||||
|
||||
let msg1 = ClientMessage::Announce { name: client_name.into() };
|
||||
transport.send(rmp_serde::to_vec(&msg1).unwrap().into()).await.unwrap();
|
||||
for i in 0..10 {
|
||||
let msg = ClientMessage::Response { status_code: 100+i, msg: Some(format!("yay {}", i)) };
|
||||
let msg = FromClientCommand::ServerList;
|
||||
transport.send(rmp_serde::to_vec(&msg).unwrap().into()).await.unwrap();
|
||||
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
|
||||
match transport.next().await {
|
||||
None => panic!("None in the transport"),
|
||||
Some(item) => match item {
|
||||
Ok(buf) => {
|
||||
use ToClientResponse::*;
|
||||
let msg: ToClientResponse = rmp_serde::from_slice(&buf).unwrap();
|
||||
match msg {
|
||||
OkServerList { data } => info!("{}", data.join("\n")),
|
||||
GenericError => error!("Generic Error during remote command execution"),
|
||||
others => {
|
||||
panic!("Unexpected Message type: {:?}", others);
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Error: {:?}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
tokio::time::sleep(Duration::from_secs(5)).await;
|
||||
let msg = FromClientCommand::RequestServer { name: "cicciopizza".into() };
|
||||
transport.send(rmp_serde::to_vec(&msg).unwrap().into()).await.unwrap();
|
||||
match transport.next().await {
|
||||
None => panic!("None in the transport"),
|
||||
Some(item) => match item {
|
||||
Ok(buf) => {
|
||||
use ToClientResponse::*;
|
||||
let msg: ToClientResponse = rmp_serde::from_slice(&buf).unwrap();
|
||||
match msg {
|
||||
OkRequest { conn_id } => {
|
||||
info!("Received Client Connection ID: {:?}", conn_id);
|
||||
datastream(tlsconfig, conn_id).await.unwrap();
|
||||
}
|
||||
GenericError => error!("Generic Error during remote command execution"),
|
||||
others => {
|
||||
panic!("Unexpected Message type: {:?}", others);
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Error: {:?}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// transport.for_each(|item| async move {
|
||||
// let a: ClientMessage = rmp_serde::from_slice(&item.unwrap()).unwrap();
|
||||
// println!("{:?}", a);
|
||||
// }).await;
|
||||
|
||||
// let mut buf = vec![0;1024];
|
||||
// let (mut rd,mut tx) = split(stream);
|
||||
//
|
||||
//
|
||||
// tokio::spawn(async move {
|
||||
// let mut stdout = tokio::io::stdout();
|
||||
// tokio::io::copy(&mut rd, &mut stdout).await.unwrap();
|
||||
// });
|
||||
//
|
||||
// let mut reader = tokio::io::BufReader::new(tokio::io::stdin()).lines();
|
||||
//
|
||||
// while let Some(line) = reader.next_line().await.unwrap() {
|
||||
// tx.write_all(line.as_bytes()).await.unwrap();
|
||||
// }
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -16,3 +16,4 @@ rustls-pemfile = "2.0.0"
|
||||
rmp-serde = "1.1.2"
|
||||
tracing = "0.1"
|
||||
tracing-subscriber = "0.3"
|
||||
uuid = { version = "1.7.0", features = ["serde"] }
|
||||
@@ -1,14 +1,91 @@
|
||||
use std::io::{Error, ErrorKind};
|
||||
use std::sync::Arc;
|
||||
use futures::{StreamExt, SinkExt};
|
||||
use tokio::net::TcpStream;
|
||||
use tokio_rustls::rustls::{ClientConfig, RootCertStore};
|
||||
use tokio_rustls::rustls::pki_types::{ServerName, CertificateDer, PrivatePkcs8KeyDer};
|
||||
use tokio_rustls::TlsConnector;
|
||||
use tokio_util::bytes::BytesMut;
|
||||
use tokio_util::codec::{Framed, LengthDelimitedCodec};
|
||||
use libbonknet::*;
|
||||
use uuid::Uuid;
|
||||
use tracing::{info, error};
|
||||
use libbonknet::ToPeerDataStream::{OkDataStreamOpen, OkDataStreamRequestAccepted, Refused, Revoked};
|
||||
|
||||
|
||||
async fn datastream(tlsconfig: Arc<ClientConfig>, conn_id: Uuid) -> std::io::Result<()> {
|
||||
let connector = TlsConnector::from(tlsconfig);
|
||||
let dnsname = ServerName::try_from("localhost").unwrap();
|
||||
let stream = TcpStream::connect("localhost:2541").await?;
|
||||
let stream = connector.connect(dnsname, stream).await?;
|
||||
let mut transport = Framed::new(stream, LengthDelimitedCodec::new());
|
||||
|
||||
let msg = FromServerConnTypeMessage::OpenDataStream(conn_id);
|
||||
transport.send(rmp_serde::to_vec(&msg).unwrap().into()).await.unwrap();
|
||||
match transport.next().await {
|
||||
None => panic!("None in the transport"),
|
||||
Some(item) => match item {
|
||||
Ok(buf) => {
|
||||
use ToPeerDataStream::*;
|
||||
let msg: ToPeerDataStream = rmp_serde::from_slice(&buf).unwrap();
|
||||
match msg {
|
||||
OkDataStreamRequestAccepted => {
|
||||
info!("Data Stream Accepted. Waiting for Open...");
|
||||
}
|
||||
Refused => {
|
||||
error!("Refused");
|
||||
return Err(Error::new(ErrorKind::ConnectionRefused, "Refused"));
|
||||
}
|
||||
other => {
|
||||
error!("Unexpected response: {:?}", other);
|
||||
return Err(Error::new(ErrorKind::ConnectionRefused, "Unexpected response"));
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Error: {:?}", e);
|
||||
return Err(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
match transport.next().await {
|
||||
None => panic!("None in the transport"),
|
||||
Some(item) => match item {
|
||||
Ok(buf) => {
|
||||
use ToPeerDataStream::*;
|
||||
let msg: ToPeerDataStream = rmp_serde::from_slice(&buf).unwrap();
|
||||
match msg {
|
||||
OkDataStreamOpen => {
|
||||
info!("Data Stream Open!. Connecting Streams.");
|
||||
}
|
||||
Revoked => {
|
||||
error!("Data Stream Revoked!");
|
||||
return Err(Error::new(ErrorKind::ConnectionAborted, "Revoked"));
|
||||
}
|
||||
Refused => {
|
||||
error!("Refused");
|
||||
return Err(Error::new(ErrorKind::ConnectionRefused, "Refused"));
|
||||
}
|
||||
other => {
|
||||
error!("Unexpected response: {:?}", other);
|
||||
return Err(Error::new(ErrorKind::ConnectionRefused, "Unexpected response"));
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Error: {:?}", e);
|
||||
return Err(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
let (mut rx, mut tx) = tokio::io::split(transport.into_inner());
|
||||
match tokio::io::copy(&mut rx, &mut tx).await {
|
||||
Ok(bytes_copied) => info!("{bytes_copied}"),
|
||||
Err(e) => error!("Error during copy: {e}"),
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> std::io::Result<()> {
|
||||
// Tracing Subscriber
|
||||
@@ -44,7 +121,7 @@ async fn main() -> std::io::Result<()> {
|
||||
let mut myserver_prkey: Option<PrivatePkcs8KeyDer> = None;
|
||||
match transport.next().await {
|
||||
None => {
|
||||
info!("None in the transport.next() ???");
|
||||
panic!("None in the transport");
|
||||
}
|
||||
Some(item) => match item {
|
||||
Ok(buf) => {
|
||||
@@ -69,33 +146,141 @@ async fn main() -> std::io::Result<()> {
|
||||
}
|
||||
}
|
||||
}
|
||||
transport.close().await.unwrap();
|
||||
if let (Some(server_cert), Some(server_prkey)) = (myserver_cert, myserver_prkey) {
|
||||
let tlsconfig = ClientConfig::builder()
|
||||
let tlsconfig = Arc::new(ClientConfig::builder()
|
||||
.with_root_certificates(broker_root_cert_store)
|
||||
.with_client_auth_cert(vec![server_cert, root_server_cert], server_prkey.into())
|
||||
.unwrap();
|
||||
let connector = TlsConnector::from(Arc::new(tlsconfig));
|
||||
.unwrap());
|
||||
let connector = TlsConnector::from(Arc::clone(&tlsconfig));
|
||||
let dnsname = ServerName::try_from("localhost").unwrap();
|
||||
|
||||
let stream = TcpStream::connect("localhost:2541").await?;
|
||||
let stream = connector.connect(dnsname, stream).await?;
|
||||
let transport = Framed::new(stream, LengthDelimitedCodec::new());
|
||||
transport.for_each(|item| async move {
|
||||
let mut transport = Framed::new(stream, LengthDelimitedCodec::new());
|
||||
let msg = FromServerConnTypeMessage::SendCommand;
|
||||
transport.send(rmp_serde::to_vec(&msg).unwrap().into()).await.unwrap();
|
||||
match transport.next().await {
|
||||
None => {
|
||||
panic!("None in the transport");
|
||||
}
|
||||
Some(item) => match item {
|
||||
Ok(buf) => {
|
||||
use ToServerConnTypeReply::*;
|
||||
let msg: ToServerConnTypeReply = rmp_serde::from_slice(&buf).unwrap();
|
||||
info!("{:?}", msg);
|
||||
match msg {
|
||||
OkSendCommand => {
|
||||
info!("Stream set in SendCommand mode");
|
||||
}
|
||||
GenericFailure => {
|
||||
panic!("Generic Failure during SendCommand");
|
||||
}
|
||||
others => {
|
||||
panic!("Unexpected Message type: {:?}", others);
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
info!("Disconnection: {:?}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
// Begin WhoAmI
|
||||
let msg = FromServerCommandMessage::WhoAmI;
|
||||
transport.send(rmp_serde::to_vec(&msg).unwrap().into()).await.unwrap();
|
||||
match transport.next().await {
|
||||
None => {
|
||||
panic!("None in the transport");
|
||||
}
|
||||
Some(item) => match item {
|
||||
Ok(buf) => {
|
||||
use ToServerCommandReply::*;
|
||||
let msg: ToServerCommandReply = rmp_serde::from_slice(&buf).unwrap();
|
||||
info!("{:?}", msg);
|
||||
match msg {
|
||||
YouAre { name } => {
|
||||
info!("I am {}", name);
|
||||
}
|
||||
GenericFailure => {
|
||||
panic!("Generic failure during WhoAmI");
|
||||
}
|
||||
_ => {
|
||||
panic!("Unexpected reply");
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
info!("Disconnection: {:?}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
transport.close().await.expect("Error during transport stream close");
|
||||
// Start Subscribe Stream
|
||||
let connector = TlsConnector::from(Arc::clone(&tlsconfig));
|
||||
let dnsname = ServerName::try_from("localhost").unwrap();
|
||||
|
||||
let stream = TcpStream::connect("localhost:2541").await?;
|
||||
let stream = connector.connect(dnsname, stream).await?;
|
||||
let mut transport = Framed::new(stream, LengthDelimitedCodec::new());
|
||||
let msg = FromServerConnTypeMessage::Subscribe;
|
||||
transport.send(rmp_serde::to_vec(&msg).unwrap().into()).await.unwrap();
|
||||
match transport.next().await {
|
||||
None => {
|
||||
panic!("None in the transport");
|
||||
}
|
||||
Some(item) => match item {
|
||||
Ok(buf) => {
|
||||
use ToServerConnTypeReply::*;
|
||||
let msg: ToServerConnTypeReply = rmp_serde::from_slice(&buf).unwrap();
|
||||
info!("{:?}", msg);
|
||||
match msg {
|
||||
OkSubscribe => {
|
||||
info!("Stream set in Subscribe mode");
|
||||
}
|
||||
GenericFailure => {
|
||||
panic!("Generic Failure during SendCommand");
|
||||
}
|
||||
others => {
|
||||
panic!("Unexpected Message type: {:?}", others);
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
info!("Disconnection: {:?}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
// Subscribe consume
|
||||
loop {
|
||||
match transport.next().await {
|
||||
None => {
|
||||
info!("Empty Buffer");
|
||||
}
|
||||
Some(item) => {
|
||||
let mut out: Option<FromServerReply> = None;
|
||||
match item {
|
||||
Ok(buf) => {
|
||||
use ToServerMessage::*;
|
||||
let msg: ToServerMessage = rmp_serde::from_slice(&buf).unwrap();
|
||||
match msg {
|
||||
Required { id } => {
|
||||
info!("I'm required with Connection ID {}", id);
|
||||
Msg { reply_id, body } => {
|
||||
use ToServerMessageBody::*;
|
||||
match body {
|
||||
Request { conn_id } => {
|
||||
info!("I'm required with Connection ID {}", conn_id);
|
||||
out = Some(FromServerReply::Msg {
|
||||
reply_id,
|
||||
body: FromServerReplyBody::RequestAccepted,
|
||||
});
|
||||
// TODO: SPAWN DATASTREAM
|
||||
tokio::spawn(datastream(tlsconfig.clone(), conn_id));
|
||||
}
|
||||
YouAre(name) => match name {
|
||||
YouAreValues::Registered { name } => {
|
||||
info!("I am {}", name);
|
||||
}
|
||||
YouAreValues::NotRegistered => {
|
||||
info!("I'm not registered");
|
||||
}
|
||||
Ping => {
|
||||
info!("Ping!");
|
||||
out = Some(FromServerReply::Pong);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -103,7 +288,12 @@ async fn main() -> std::io::Result<()> {
|
||||
error!("Error: {:?}", e);
|
||||
}
|
||||
}
|
||||
}).await;
|
||||
if let Some(msg) = out {
|
||||
transport.send(rmp_serde::to_vec(&msg).unwrap().into()).await.unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
@@ -9,3 +9,4 @@ edition = "2021"
|
||||
tokio-rustls = "0.25.0"
|
||||
rustls-pemfile = "2.0.0"
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
uuid = { version = "1.7.0", features = ["serde"] }
|
||||
|
||||
@@ -2,10 +2,11 @@ use std::io::{BufReader, Error, ErrorKind};
|
||||
use rustls_pemfile::{read_one, Item};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio_rustls::rustls::pki_types::{CertificateDer, PrivatePkcs8KeyDer};
|
||||
use uuid::Uuid;
|
||||
|
||||
pub fn load_cert(filename: &str) -> std::io::Result<CertificateDer> {
|
||||
let cert_file = std::fs::File::open(filename).unwrap();
|
||||
let mut buf = std::io::BufReader::new(cert_file);
|
||||
let mut buf = BufReader::new(cert_file);
|
||||
if let Item::X509Certificate(cert) = read_one(&mut buf).unwrap().unwrap() {
|
||||
Ok(cert)
|
||||
} else {
|
||||
@@ -26,28 +27,71 @@ pub fn load_prkey(filename: &str) -> std::io::Result<PrivatePkcs8KeyDer> {
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub enum RequiredReplyValues {
|
||||
Ok,
|
||||
GenericFailure { status_code: u32, msg: Option<String> },
|
||||
pub enum FromServerConnTypeMessage {
|
||||
SendCommand,
|
||||
Subscribe,
|
||||
OpenDataStream(Uuid),
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub enum FromServerMessage {
|
||||
RequiredReply(RequiredReplyValues),
|
||||
pub enum ToServerConnTypeReply {
|
||||
OkSendCommand,
|
||||
OkSubscribe,
|
||||
// You are now a DataStream, wait the Open message
|
||||
OkDataStreamRequestAccepted,
|
||||
// The stream is open, you can pipe in-out the content you want!
|
||||
OkDataStreamOpen,
|
||||
GenericFailure,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub enum FromServerCommandMessage {
|
||||
ChangeName { name: String },
|
||||
WhoAmI,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub enum ToServerCommandReply {
|
||||
NameChanged,
|
||||
NameNotAvailable,
|
||||
YouAre { name: String },
|
||||
GenericFailure,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub enum YouAreValues {
|
||||
Registered { name: String },
|
||||
NotRegistered,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub enum ToServerMessageBody {
|
||||
Request { conn_id: Uuid },
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub enum ToServerMessage {
|
||||
Required { id: String },
|
||||
YouAre(YouAreValues),
|
||||
Ping,
|
||||
Msg {
|
||||
reply_id: u64,
|
||||
body: ToServerMessageBody,
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub enum FromServerReplyBody {
|
||||
RequestAccepted,
|
||||
RequestFailed,
|
||||
Pong,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub enum FromServerReply {
|
||||
Pong,
|
||||
Msg {
|
||||
reply_id: u64,
|
||||
body: FromServerReplyBody
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
@@ -75,3 +119,33 @@ impl ToGuestServerMessage {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Client things
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub enum FromClientCommand {
|
||||
RequestServer { name: String },
|
||||
ServerList,
|
||||
UpgradeToDataStream(Uuid),
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub enum ToClientResponse {
|
||||
OkRequest { conn_id: Uuid },
|
||||
OkServerList { data: Vec<String> },
|
||||
// You are now a DataStream, wait the Open message
|
||||
OkDataStreamRequestAccepted,
|
||||
// The stream is open, you can pipe in-out the content you want!
|
||||
OkDataStreamOpen,
|
||||
GenericError,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub enum ToPeerDataStream {
|
||||
// You are now a DataStream, wait the Open message
|
||||
OkDataStreamRequestAccepted,
|
||||
// The stream is open, you can pipe in-out the content you want!
|
||||
OkDataStreamOpen,
|
||||
Refused,
|
||||
Revoked,
|
||||
GenericError,
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user