Files
bonknet/bonknet_broker/src/main.rs

325 lines
16 KiB
Rust

mod servercertdb;
mod pendingdataconndb;
mod servermanager;
mod dataconnmanager;
mod streamutils;
use servercertdb::*;
use pendingdataconndb::*;
use servermanager::*;
use dataconnmanager::*;
use streamutils::*;
use actix::prelude::*;
use std::sync::Arc;
use libbonknet::servermsg::*;
use libbonknet::clientmsg::*;
use libbonknet::cert::*;
use rustls::{RootCertStore, ServerConfig};
use rustls::server::WebPkiClientVerifier;
use actix_tls::accept::rustls_0_22::{Acceptor as RustlsAcceptor, TlsStream};
use actix_server::Server;
use actix_rt::net::TcpStream;
use actix_service::ServiceFactoryExt as _;
use futures::{SinkExt, StreamExt};
use tokio_util::codec::{Framed, LengthDelimitedCodec};
use tracing::{error, info, warn};
struct BrokerContext {
broker_leaf: LeafCertPair<'static>,
client_ca: CACertPair<'static>,
server_ca: CACertPair<'static>,
guestserver_ca: CACertPair<'static>,
servercert_db: Addr<ServerCertDB>,
pendingdataconn_manager: Addr<PendingDataConnManager>,
server_manager: Addr<ServerManager>,
}
#[actix_rt::main]
async fn main() {
// Tracing Subscriber
let subscriber = tracing_subscriber::FmtSubscriber::new();
tracing::subscriber::set_global_default(subscriber).unwrap();
// Load Identity
let broker_leaf = LeafCertPair::load_from_file("certs_pem/broker.pem").unwrap();
let client_ca = CACertPair::load_from_file("certs_pem/client_root_ca.pem").unwrap();
let server_ca = CACertPair::load_from_file("certs_pem/server_root_ca.pem").unwrap();
let guestserver_ca = CACertPair::load_from_file("certs_pem/guestserver_root_ca.pem").unwrap();
// Load Actors
let servercert_db = ServerCertDB::new("certsdb.sqlite").unwrap().start();
let dataconn_manager = DataConnManager::new().start();
let pendingdataconn_manager = PendingDataConnManager::new(dataconn_manager).start();
let server_manager = ServerManager::new(pendingdataconn_manager.clone()).start();
// Create Context
let ctx = Arc::new(BrokerContext {
broker_leaf,
client_ca,
server_ca,
guestserver_ca,
servercert_db,
pendingdataconn_manager,
server_manager,
});
// Pki Client Verifier
let mut broker_root_store = RootCertStore::empty();
broker_root_store.add(ctx.server_ca.cert().clone()).unwrap();
broker_root_store.add(ctx.client_ca.cert().clone()).unwrap();
broker_root_store.add(ctx.guestserver_ca.cert().clone()).unwrap();
let server_verifier = WebPkiClientVerifier::builder(Arc::new(broker_root_store)).build().unwrap();
// Configure TLS
let server_tlsconfig = ServerConfig::builder()
.with_client_cert_verifier(server_verifier)
.with_single_cert(ctx.broker_leaf.fullchain(), ctx.broker_leaf.clone_key().into())
.unwrap();
let server_acceptor = RustlsAcceptor::new(server_tlsconfig);
Server::build()
.bind("server-command", ("localhost", 2541), move || {
let ctx = Arc::clone(&ctx);
// Set up TLS service factory
server_acceptor
.clone()
.map_err(|err| println!("Rustls error: {:?}", err))
.and_then(move |stream: TlsStream<TcpStream>| {
let ctx = Arc::clone(&ctx);
async move {
let peer_certs = stream.get_ref().1.peer_certificates().unwrap();
let peer_cert_bytes = peer_certs.first().unwrap().to_vec();
let peer_root_cert_der = peer_certs.last().unwrap().clone();
if &peer_root_cert_der == ctx.server_ca.cert() {
info!("Server connection");
let mut transport = Framed::new(stream, LengthDelimitedCodec::new());
match transport.next().await {
None => {
info!("Connection closed by peer");
}
Some(item) => match item {
Ok(buf) => {
use FromServerConnTypeMessage::*;
let msg: FromServerConnTypeMessage = rmp_serde::from_slice(&buf).unwrap();
info!("{:?}", msg);
match msg {
SendCommand => {
info!("SendCommand Stream");
let reply = ToServerConnTypeReply::OkSendCommand;
transport.send(rmp_serde::to_vec(&reply).unwrap().into()).await.unwrap();
server_command_handler(&ctx, transport, peer_cert_bytes).await;
}
Subscribe => {
info!("Subscribe Stream");
let name = match ctx.servercert_db.send(FetchName { cert: peer_cert_bytes }).await.unwrap() {
None => {
error!("Cert has no name assigned!");
let reply = ToServerConnTypeReply::GenericFailure;
transport.send(rmp_serde::to_vec(&reply).unwrap().into()).await.unwrap();
return Ok(());
}
Some(name) => name,
};
let reply = ToServerConnTypeReply::OkSubscribe;
transport.send(rmp_serde::to_vec(&reply).unwrap().into()).await.unwrap();
server_subscribe_handler(&ctx, transport, name).await;
}
OpenDataStream(conn_id) => {
info!("OpenDataStream with {:?}", conn_id);
let msg = RegisterStream::server(conn_id, transport);
ctx.pendingdataconn_manager.send(msg).await.unwrap().unwrap();
}
}
}
Err(e) => {
info!("Disconnection: {:?}", e);
}
}
}
info!("Server Task terminated!");
} else if &peer_root_cert_der == ctx.guestserver_ca.cert() {
info!("GuestServer connection");
let codec = LengthDelimitedCodec::new();
let transport = Framed::new(stream, codec);
guestserver_handler(&ctx, transport).await;
} else if &peer_root_cert_der == ctx.client_ca.cert() {
info!("Client connection");
let codec = LengthDelimitedCodec::new();
let transport = Framed::new(stream, codec);
client_handler(&ctx, transport).await;
} else {
error!("Unknown Root Certificate");
}
Ok(())
}
})
}).unwrap()
.workers(1)
.run()
.await
.unwrap();
}
async fn server_subscribe_handler(ctx: &BrokerContext, transport: TransportStream, name: String) {
match ctx.server_manager.send(StartTransporter { name, transport }).await.unwrap() {
Ok(_) => {
info!("Stream sent to the manager");
}
Err(e) => {
error!("Error from manager: {:?}", e);
}
}
}
async fn server_command_handler(ctx: &BrokerContext, mut transport: TransportStream, peer_cert_bytes: Vec<u8>) {
loop {
match transport.next().await {
None => {
info!("Transport returned None");
break;
}
Some(item) => match item {
Ok(buf) => {
use libbonknet::servermsg::FromServerCommandMessage::*;
let msg: FromServerCommandMessage = rmp_serde::from_slice(&buf).unwrap();
info!("{:?}", msg);
match msg {
ChangeName { name } => {
info!("Changing name to {}", name);
match ctx.servercert_db.send(UnregisterServer { cert: peer_cert_bytes.clone() }).await.unwrap() {
None => {
info!("Nothing to unregister");
}
Some(old_name) => {
warn!("Unregistered from old name {}", old_name);
}
}
let reply = match ctx.servercert_db.send(RegisterServer { cert: peer_cert_bytes.clone(), name }).await.unwrap() {
Ok(_) => {
info!("Registered!");
ToServerCommandReply::NameChanged
}
Err(e) => {
error!("Error registering: {:?}", e);
ToServerCommandReply::GenericFailure
}
};
transport.send(rmp_serde::to_vec(&reply).unwrap().into()).await.unwrap();
}
WhoAmI => {
info!("Asked who I am");
let reply = match ctx.servercert_db.send(FetchName { cert: peer_cert_bytes.clone() }).await.unwrap() {
None => {
info!("I'm not registered anymore!? WTF");
ToServerCommandReply::GenericFailure
}
Some(name) => {
info!("I am {}", name);
ToServerCommandReply::YouAre { name }
}
};
transport.send(rmp_serde::to_vec(&reply).unwrap().into()).await.unwrap();
}
}
}
Err(e) => {
info!("Disconnection: {:?}", e);
break;
}
}
}
}
}
async fn guestserver_handler(ctx: &BrokerContext, mut transport: TransportStream) {
loop {
match transport.next().await {
None => {
info!("Transport returned None");
break;
}
Some(item) => {
match item {
Ok(buf) => {
use libbonknet::servermsg::FromGuestServerMessage::*;
let msg: FromGuestServerMessage = rmp_serde::from_slice(&buf).unwrap();
info!("{:?}", msg);
match msg {
Announce { name } => {
info!("Announced with name {}", name);
if ctx.servercert_db.send(IsNameRegistered { name: name.clone() }).await.unwrap() {
info!("Name {} already registered!", name);
let reply = ToGuestServerMessage::FailedNameAlreadyOccupied;
transport.send(rmp_serde::to_vec(&reply).unwrap().into()).await.unwrap();
break; // Stop reading
} else {
let cert = ctx.server_ca.sign_new_cert(server_leaf_certparams(name.as_str()));
ctx.servercert_db.send(RegisterServer {
cert: cert.cert().to_vec(),
name,
}).await.unwrap().unwrap();
let reply = ToGuestServerMessage::make_okannounce(&cert);
transport.send(rmp_serde::to_vec(&reply).unwrap().into()).await.unwrap();
}
}
}
}
Err(e) => {
info!("Disconnection: {:?}", e);
break;
}
}
}
}
}
}
async fn client_handler(ctx: &BrokerContext, mut transport: TransportStream) {
loop {
match transport.next().await {
None => {
info!("Transport returned None");
break;
}
Some(item) => {
match item {
Ok(buf) => {
let msg: FromClientCommand = rmp_serde::from_slice(&buf).unwrap();
info!("{:?}", msg);
match msg {
FromClientCommand::RequestServer { name } => {
info!("REQUESTED SERVER {}", name);
let data = ctx.server_manager.send(RequestServer { name }).await.unwrap();
match data {
Ok(client_conn_id) => {
let reply = ToClientResponse::OkRequest { conn_id: client_conn_id };
transport.send(rmp_serde::to_vec(&reply).unwrap().into()).await.unwrap();
info!("Sent OkRequest");
}
Err(e) => {
error!("Error! {:?}", e);
}
}
}
FromClientCommand::ServerList => {
info!("Requested ServerList");
let data = ctx.server_manager.send(GetServerList {}).await.unwrap();
let reply = ToClientResponse::OkServerList { data };
transport.send(rmp_serde::to_vec(&reply).unwrap().into()).await.unwrap();
}
FromClientCommand::UpgradeToDataStream(conn_id) => {
info!("Upgrade to DataStream with conn_id {:?}", conn_id);
let msg = RegisterStream::client(conn_id, transport);
ctx.pendingdataconn_manager.send(msg).await.unwrap().unwrap();
break;
}
}
}
Err(e) => {
info!("Disconnection: {:?}", e);
break;
}
}
}
}
}
}