mod servercertdb; mod pendingdataconndb; mod servermanager; mod dataconnmanager; mod streamutils; use servercertdb::*; use pendingdataconndb::*; use servermanager::*; use dataconnmanager::*; use streamutils::*; use actix::prelude::*; use std::sync::Arc; use libbonknet::servermsg::*; use libbonknet::clientmsg::*; use libbonknet::cert::*; use rustls::{RootCertStore, ServerConfig}; use rustls::server::WebPkiClientVerifier; use actix_tls::accept::rustls_0_22::{Acceptor as RustlsAcceptor, TlsStream}; use actix_server::Server; use actix_rt::net::TcpStream; use actix_service::ServiceFactoryExt as _; use futures::{SinkExt, StreamExt}; use tokio_util::codec::{Framed, LengthDelimitedCodec}; use tracing::{error, info, warn}; struct BrokerContext { broker_leaf: LeafCertPair<'static>, client_ca: CACertPair<'static>, server_ca: CACertPair<'static>, guestserver_ca: CACertPair<'static>, servercert_db: Addr, pendingdataconn_manager: Addr, server_manager: Addr, } #[actix_rt::main] async fn main() { // Tracing Subscriber let subscriber = tracing_subscriber::FmtSubscriber::new(); tracing::subscriber::set_global_default(subscriber).unwrap(); // Load Identity let broker_leaf = LeafCertPair::load_from_file("certs_pem/broker.pem").unwrap(); let client_ca = CACertPair::load_from_file("certs_pem/client_root_ca.pem").unwrap(); let server_ca = CACertPair::load_from_file("certs_pem/server_root_ca.pem").unwrap(); let guestserver_ca = CACertPair::load_from_file("certs_pem/guestserver_root_ca.pem").unwrap(); // Load Actors let servercert_db = ServerCertDB::new("certsdb.sqlite").unwrap().start(); let dataconn_manager = DataConnManager::new().start(); let pendingdataconn_manager = PendingDataConnManager::new(dataconn_manager).start(); let server_manager = ServerManager::new(pendingdataconn_manager.clone()).start(); // Create Context let ctx = Arc::new(BrokerContext { broker_leaf, client_ca, server_ca, guestserver_ca, servercert_db, pendingdataconn_manager, server_manager, }); // Pki Client Verifier let mut broker_root_store = RootCertStore::empty(); broker_root_store.add(ctx.server_ca.cert().clone()).unwrap(); broker_root_store.add(ctx.client_ca.cert().clone()).unwrap(); broker_root_store.add(ctx.guestserver_ca.cert().clone()).unwrap(); let server_verifier = WebPkiClientVerifier::builder(Arc::new(broker_root_store)).build().unwrap(); // Configure TLS let server_tlsconfig = ServerConfig::builder() .with_client_cert_verifier(server_verifier) .with_single_cert(ctx.broker_leaf.fullchain(), ctx.broker_leaf.clone_key().into()) .unwrap(); let server_acceptor = RustlsAcceptor::new(server_tlsconfig); Server::build() .bind("server-command", ("localhost", 2541), move || { let ctx = Arc::clone(&ctx); // Set up TLS service factory server_acceptor .clone() .map_err(|err| println!("Rustls error: {:?}", err)) .and_then(move |stream: TlsStream| { let ctx = Arc::clone(&ctx); async move { let peer_certs = stream.get_ref().1.peer_certificates().unwrap(); let peer_cert_bytes = peer_certs.first().unwrap().to_vec(); let peer_root_cert_der = peer_certs.last().unwrap().clone(); if &peer_root_cert_der == ctx.server_ca.cert() { info!("Server connection"); let mut transport = Framed::new(stream, LengthDelimitedCodec::new()); match transport.next().await { None => { info!("Connection closed by peer"); } Some(item) => match item { Ok(buf) => { use FromServerConnTypeMessage::*; let msg: FromServerConnTypeMessage = rmp_serde::from_slice(&buf).unwrap(); info!("{:?}", msg); match msg { SendCommand => { info!("SendCommand Stream"); let reply = ToServerConnTypeReply::OkSendCommand; transport.send(rmp_serde::to_vec(&reply).unwrap().into()).await.unwrap(); server_command_handler(&ctx, transport, peer_cert_bytes).await; } Subscribe => { info!("Subscribe Stream"); let name = match ctx.servercert_db.send(FetchName { cert: peer_cert_bytes }).await.unwrap() { None => { error!("Cert has no name assigned!"); let reply = ToServerConnTypeReply::GenericFailure; transport.send(rmp_serde::to_vec(&reply).unwrap().into()).await.unwrap(); return Ok(()); } Some(name) => name, }; let reply = ToServerConnTypeReply::OkSubscribe; transport.send(rmp_serde::to_vec(&reply).unwrap().into()).await.unwrap(); server_subscribe_handler(&ctx, transport, name).await; } OpenDataStream(conn_id) => { info!("OpenDataStream with {:?}", conn_id); let msg = RegisterStream::server(conn_id, transport); ctx.pendingdataconn_manager.send(msg).await.unwrap().unwrap(); } } } Err(e) => { info!("Disconnection: {:?}", e); } } } info!("Server Task terminated!"); } else if &peer_root_cert_der == ctx.guestserver_ca.cert() { info!("GuestServer connection"); let codec = LengthDelimitedCodec::new(); let transport = Framed::new(stream, codec); guestserver_handler(&ctx, transport).await; } else if &peer_root_cert_der == ctx.client_ca.cert() { info!("Client connection"); let codec = LengthDelimitedCodec::new(); let transport = Framed::new(stream, codec); client_handler(&ctx, transport).await; } else { error!("Unknown Root Certificate"); } Ok(()) } }) }).unwrap() .workers(1) .run() .await .unwrap(); } async fn server_subscribe_handler(ctx: &BrokerContext, transport: TransportStream, name: String) { match ctx.server_manager.send(StartTransporter { name, transport }).await.unwrap() { Ok(_) => { info!("Stream sent to the manager"); } Err(e) => { error!("Error from manager: {:?}", e); } } } async fn server_command_handler(ctx: &BrokerContext, mut transport: TransportStream, peer_cert_bytes: Vec) { loop { match transport.next().await { None => { info!("Transport returned None"); break; } Some(item) => match item { Ok(buf) => { use libbonknet::servermsg::FromServerCommandMessage::*; let msg: FromServerCommandMessage = rmp_serde::from_slice(&buf).unwrap(); info!("{:?}", msg); match msg { ChangeName { name } => { info!("Changing name to {}", name); match ctx.servercert_db.send(UnregisterServer { cert: peer_cert_bytes.clone() }).await.unwrap() { None => { info!("Nothing to unregister"); } Some(old_name) => { warn!("Unregistered from old name {}", old_name); } } let reply = match ctx.servercert_db.send(RegisterServer { cert: peer_cert_bytes.clone(), name }).await.unwrap() { Ok(_) => { info!("Registered!"); ToServerCommandReply::NameChanged } Err(e) => { error!("Error registering: {:?}", e); ToServerCommandReply::GenericFailure } }; transport.send(rmp_serde::to_vec(&reply).unwrap().into()).await.unwrap(); } WhoAmI => { info!("Asked who I am"); let reply = match ctx.servercert_db.send(FetchName { cert: peer_cert_bytes.clone() }).await.unwrap() { None => { info!("I'm not registered anymore!? WTF"); ToServerCommandReply::GenericFailure } Some(name) => { info!("I am {}", name); ToServerCommandReply::YouAre { name } } }; transport.send(rmp_serde::to_vec(&reply).unwrap().into()).await.unwrap(); } } } Err(e) => { info!("Disconnection: {:?}", e); break; } } } } } async fn guestserver_handler(ctx: &BrokerContext, mut transport: TransportStream) { loop { match transport.next().await { None => { info!("Transport returned None"); break; } Some(item) => { match item { Ok(buf) => { use libbonknet::servermsg::FromGuestServerMessage::*; let msg: FromGuestServerMessage = rmp_serde::from_slice(&buf).unwrap(); info!("{:?}", msg); match msg { Announce { name } => { info!("Announced with name {}", name); if ctx.servercert_db.send(IsNameRegistered { name: name.clone() }).await.unwrap() { info!("Name {} already registered!", name); let reply = ToGuestServerMessage::FailedNameAlreadyOccupied; transport.send(rmp_serde::to_vec(&reply).unwrap().into()).await.unwrap(); break; // Stop reading } else { let cert = ctx.server_ca.sign_new_cert(server_leaf_certparams(name.as_str())); ctx.servercert_db.send(RegisterServer { cert: cert.cert().to_vec(), name, }).await.unwrap().unwrap(); let reply = ToGuestServerMessage::make_okannounce(&cert); transport.send(rmp_serde::to_vec(&reply).unwrap().into()).await.unwrap(); } } } } Err(e) => { info!("Disconnection: {:?}", e); break; } } } } } } async fn client_handler(ctx: &BrokerContext, mut transport: TransportStream) { loop { match transport.next().await { None => { info!("Transport returned None"); break; } Some(item) => { match item { Ok(buf) => { let msg: FromClientCommand = rmp_serde::from_slice(&buf).unwrap(); info!("{:?}", msg); match msg { FromClientCommand::RequestServer { name } => { info!("REQUESTED SERVER {}", name); let data = ctx.server_manager.send(RequestServer { name }).await.unwrap(); match data { Ok(client_conn_id) => { let reply = ToClientResponse::OkRequest { conn_id: client_conn_id }; transport.send(rmp_serde::to_vec(&reply).unwrap().into()).await.unwrap(); info!("Sent OkRequest"); } Err(e) => { error!("Error! {:?}", e); } } } FromClientCommand::ServerList => { info!("Requested ServerList"); let data = ctx.server_manager.send(GetServerList {}).await.unwrap(); let reply = ToClientResponse::OkServerList { data }; transport.send(rmp_serde::to_vec(&reply).unwrap().into()).await.unwrap(); } FromClientCommand::UpgradeToDataStream(conn_id) => { info!("Upgrade to DataStream with conn_id {:?}", conn_id); let msg = RegisterStream::client(conn_id, transport); ctx.pendingdataconn_manager.send(msg).await.unwrap().unwrap(); break; } } } Err(e) => { info!("Disconnection: {:?}", e); break; } } } } } }