Compare commits

7 Commits

21 changed files with 1195 additions and 207 deletions

4
.gitignore vendored
View File

@@ -1,3 +1,5 @@
/target /target
# Experiments with certificates # Experiments with certificates
certs certs
certs_pem
/server

View File

@@ -25,7 +25,14 @@ rmp-serde = "1.1.2"
rcgen = { version = "0.12.1", features = ["x509-parser"] } rcgen = { version = "0.12.1", features = ["x509-parser"] }
rand = "0.8.5" rand = "0.8.5"
uuid = { version = "1.7.0", features = ["v4", "serde"] } uuid = { version = "1.7.0", features = ["v4", "serde"] }
rustls-pemfile = "2.0.0"
x509-parser = "0.16.0"
rusqlite = { version = "0.31.0", features = ["bundled"] }
[[bin]] [[bin]]
name = "init_certs" name = "init_certs"
path = "src/bin/init_certs.rs" path = "src/bin/init_certs.rs"
[[bin]]
name = "init_certs_2"
path = "src/bin/init_certs_2.rs"

View File

@@ -0,0 +1,201 @@
use std::fs::File;
use std::io::Write;
use rcgen::{self, BasicConstraints, Certificate, CertificateParams, DnType};
fn server_root_cert() -> Certificate {
let subject_alt_names = vec!["hello.world.example".into()];
let mut certparams = CertificateParams::new(subject_alt_names);
certparams.is_ca = rcgen::IsCa::Ca(BasicConstraints::Unconstrained);
let mut distname = rcgen::DistinguishedName::new();
distname.push(DnType::OrganizationName, "Eister Corporation");
distname.push(DnType::CommonName, "Bonknet Server Root Cert CA");
certparams.distinguished_name = distname;
Certificate::from_params(certparams).unwrap()
}
fn server_cert() -> Certificate {
let mut params = CertificateParams::new(vec!["entity.other.host".into(), "bonk.server.1".into()]);
params.distinguished_name.push(DnType::CommonName, "Server 1");
params.use_authority_key_identifier_extension = true;
params.key_usages.push(rcgen::KeyUsagePurpose::DigitalSignature);
params
.extended_key_usages
.push(rcgen::ExtendedKeyUsagePurpose::ClientAuth);
Certificate::from_params(params).unwrap()
}
fn guestserver_root_cert() -> Certificate {
let subject_alt_names = vec!["hello.world.example".into()];
let mut certparams = CertificateParams::new(subject_alt_names);
certparams.is_ca = rcgen::IsCa::Ca(BasicConstraints::Unconstrained);
let mut distname = rcgen::DistinguishedName::new();
distname.push(DnType::OrganizationName, "Eister Corporation");
distname.push(DnType::CommonName, "Bonknet Guest Server Root Cert CA");
certparams.distinguished_name = distname;
Certificate::from_params(certparams).unwrap()
}
fn guestserver_cert() -> Certificate {
let mut params = CertificateParams::new(vec!["entity.other.host".into(), "bonk.guestserver.1".into()]);
params.distinguished_name.push(DnType::CommonName, "Guest Server 1");
params.use_authority_key_identifier_extension = true;
params.key_usages.push(rcgen::KeyUsagePurpose::DigitalSignature);
params
.extended_key_usages
.push(rcgen::ExtendedKeyUsagePurpose::ClientAuth);
Certificate::from_params(params).unwrap()
}
fn client_root_cert() -> Certificate {
let subject_alt_names = vec!["hello.world.example".into()];
let mut certparams = CertificateParams::new(subject_alt_names);
certparams.is_ca = rcgen::IsCa::Ca(BasicConstraints::Unconstrained);
let mut distname = rcgen::DistinguishedName::new();
distname.push(DnType::OrganizationName, "Eister Corporation");
distname.push(DnType::CommonName, "Bonknet Client Root Cert CA");
certparams.distinguished_name = distname;
Certificate::from_params(certparams).unwrap()
}
fn client_cert() -> Certificate {
let mut params = CertificateParams::new(vec!["entity.other.host".into(), "bonk.client.1".into()]);
params.distinguished_name.push(DnType::CommonName, "Client 1");
params.use_authority_key_identifier_extension = true;
params.key_usages.push(rcgen::KeyUsagePurpose::DigitalSignature);
params
.extended_key_usages
.push(rcgen::ExtendedKeyUsagePurpose::ClientAuth);
Certificate::from_params(params).unwrap()
}
fn broker_root_cert() -> Certificate {
let subject_alt_names = vec!["hello.world.example".into()];
let mut certparams = CertificateParams::new(subject_alt_names);
certparams.is_ca = rcgen::IsCa::Ca(BasicConstraints::Unconstrained);
let mut distname = rcgen::DistinguishedName::new();
distname.push(DnType::OrganizationName, "Eister Corporation");
distname.push(DnType::CommonName, "Bonknet Broker Root Cert CA");
certparams.distinguished_name = distname;
Certificate::from_params(certparams).unwrap()
}
fn broker_cert() -> Certificate {
let mut params = CertificateParams::new(vec!["entity.other.host".into(), "localhost".into()]);
params.distinguished_name.push(DnType::CommonName, "localhost");
params.use_authority_key_identifier_extension = true;
params.key_usages.push(rcgen::KeyUsagePurpose::DigitalSignature);
params
.extended_key_usages
.push(rcgen::ExtendedKeyUsagePurpose::ServerAuth);
Certificate::from_params(params).unwrap()
}
fn main() -> std::io::Result<()> {
// Generate Root CA Certificates
let server_root_cert = server_root_cert();
let guestserver_root_cert = guestserver_root_cert();
let client_root_cert = client_root_cert();
let broker_root_cert = broker_root_cert();
// Generate Leafs
let server_leaf_cert = server_cert();
let guestserver_leaf_cert = guestserver_cert();
let client_leaf_cert = client_cert();
let broker_leaf_cert = broker_cert();
// Generate PEMs
// every time you generate one, a new random number is taken, so different cert hashes!
// and we need this PEMs to appear in multiple files
// We don't need this for the pvkey because we generate them only one time for each cert
// IF YOU NEED TO WRITE PVKEY IN MULTIPLE FILES, PLEASE DO IT LIKE THESE LINES FOR THE x509!!!
let server_root_cert_pem = server_root_cert.serialize_pem().unwrap();
let guestserver_root_cert_pem = guestserver_root_cert.serialize_pem().unwrap();
let client_root_cert_pem = client_root_cert.serialize_pem().unwrap();
let broker_root_cert_pem = broker_root_cert.serialize_pem().unwrap();
let server_leaf_cert_pem = server_leaf_cert.serialize_pem_with_signer(&server_root_cert).unwrap();
let guestserver_leaf_cert_pem = guestserver_leaf_cert.serialize_pem_with_signer(&guestserver_root_cert).unwrap();
let client_leaf_cert_pem = client_leaf_cert.serialize_pem_with_signer(&client_root_cert).unwrap();
let broker_leaf_cert_pem = broker_leaf_cert.serialize_pem_with_signer(&broker_root_cert).unwrap();
// Root CA PEMs
/*
1 - CA Cert
2 - CA Prkey
*/
// Generate Server Root CA PEM
{
let mut pemfile = File::create("certs_pem/server_root_ca.pem")?;
pemfile.write_all(server_root_cert_pem.as_bytes())?;
pemfile.write_all(server_root_cert.serialize_private_key_pem().as_bytes())?;
}
// Generate GuestServer Root CA PEM
{
let mut pemfile = File::create("certs_pem/guestserver_root_ca.pem")?;
pemfile.write_all(guestserver_root_cert_pem.as_bytes())?;
pemfile.write_all(guestserver_root_cert.serialize_private_key_pem().as_bytes())?;
}
// Generate Client Root CA PEM
{
let mut pemfile = File::create("certs_pem/client_root_ca.pem")?;
pemfile.write_all(client_root_cert_pem.as_bytes())?;
pemfile.write_all(client_root_cert.serialize_private_key_pem().as_bytes())?;
}
// Generate Broker Root CA PEM
{
let mut pemfile = File::create("certs_pem/broker_root_ca.pem")?;
pemfile.write_all(broker_root_cert_pem.as_bytes())?;
pemfile.write_all(broker_root_cert.serialize_private_key_pem().as_bytes())?;
}
// Generate Broker CA Cert PEM for Server Authentication
{
let mut pemfile = File::create("certs_pem/broker_root_ca_cert.pem")?;
pemfile.write_all(broker_root_cert_pem.as_bytes())?;
}
// Generate Server Leaf PEM
/*
1 - Server Leaf Cert
2 - Server CA Cert chain
3 - Server Leaf Prkey
*/
{
let mut pemfile = File::create("certs_pem/server.pem")?;
pemfile.write_all(server_leaf_cert_pem.as_bytes())?;
pemfile.write_all(server_root_cert_pem.as_bytes())?;
pemfile.write_all(server_leaf_cert.serialize_private_key_pem().as_bytes())?;
}
// Generate GuestServer Leaf PEM
/*
1 - GuestServer Leaf Cert
2 - GuestServer CA Cert chain
3 - GuestServer Leaf Prkey
*/
{
let mut pemfile = File::create("certs_pem/guestserver.pem")?;
pemfile.write_all(guestserver_leaf_cert_pem.as_bytes())?;
pemfile.write_all(guestserver_root_cert_pem.as_bytes())?;
pemfile.write_all(guestserver_leaf_cert.serialize_private_key_pem().as_bytes())?;
}
// Generate Client Leaf PEM
/*
1 - Client Leaf Cert
2 - Client CA Cert chain
3 - Client Leaf Prkey
*/
{
let mut pemfile = File::create("certs_pem/client.pem")?;
pemfile.write_all(client_leaf_cert_pem.as_bytes())?;
pemfile.write_all(client_root_cert_pem.as_bytes())?;
pemfile.write_all(client_leaf_cert.serialize_private_key_pem().as_bytes())?;
}
// Generate Broker Leaf PEM
/*
1 - Broker Leaf Cert
2 - Broker CA Cert
3 - Broker Leaf Prkey
*/
{
let mut pemfile = File::create("certs_pem/broker.pem")?;
pemfile.write_all(broker_leaf_cert_pem.as_bytes())?;
pemfile.write_all(broker_root_cert_pem.as_bytes())?;
pemfile.write_all(broker_leaf_cert.serialize_private_key_pem().as_bytes())?;
}
println!("Certificates created");
Ok(())
}

View File

@@ -5,7 +5,7 @@ use futures::SinkExt;
use thiserror::Error; use thiserror::Error;
use tracing::{info, error, warn}; use tracing::{info, error, warn};
use libbonknet::ToPeerDataStream; use libbonknet::ToPeerDataStream;
use crate::TransportStream; use crate::streamutils::*;
#[allow(dead_code)] #[allow(dead_code)]
#[derive(Error, Debug)] #[derive(Error, Debug)]
@@ -14,6 +14,9 @@ pub enum DataConnManagerError {
GenericFailure, GenericFailure,
} }
/// Ask the DataConnManager to start a new Data Connection with
/// these two TransportStream as ends of the bridge.
/// As internal ID the Client Connection ID is used.
#[derive(Message)] #[derive(Message)]
#[rtype(result = "Result<(),DataConnManagerError>")] #[rtype(result = "Result<(),DataConnManagerError>")]
pub struct StartDataBridge { pub struct StartDataBridge {
@@ -22,6 +25,7 @@ pub struct StartDataBridge {
pub client_transport: TransportStream, pub client_transport: TransportStream,
} }
/// Stop the data bridge using the Client Connection ID as reference to the bridge.
#[derive(Message)] #[derive(Message)]
#[rtype(result = "()")] #[rtype(result = "()")]
pub struct StopDataBridge { pub struct StopDataBridge {
@@ -31,23 +35,27 @@ pub struct StopDataBridge {
type ClientConnId = Uuid; type ClientConnId = Uuid;
struct Connection { struct Connection {
proxyhandler: SpawnHandle, proxyhandler: SpawnHandle, // SpawnHandle is an actix Future handler (basically a Task)
} }
/// This is the actor that manages the Data Connections.
/// When a new Data Bridge is created, that consist of a Task running indefinitely that
/// redirect information from a Client to a Server and viceversa.
/// Basically the DataBridge is where the SSH information are sent in.
pub struct DataConnManager { pub struct DataConnManager {
conns: HashMap<ClientConnId, Connection> conns: HashMap<ClientConnId, Connection>
} }
impl Actor for DataConnManager {
type Context = Context<Self>;
}
impl DataConnManager { impl DataConnManager {
pub fn new() -> Self { pub fn new() -> Self {
Self { conns: HashMap::new() } Self { conns: HashMap::new() }
} }
} }
impl Actor for DataConnManager {
type Context = Context<Self>;
}
impl Handler<StopDataBridge> for DataConnManager { impl Handler<StopDataBridge> for DataConnManager {
type Result = (); type Result = ();

View File

@@ -2,16 +2,18 @@ mod servercertdb;
mod pendingdataconndb; mod pendingdataconndb;
mod servermanager; mod servermanager;
mod dataconnmanager; mod dataconnmanager;
mod streamutils;
use servercertdb::*; use servercertdb::*;
use pendingdataconndb::*; use pendingdataconndb::*;
use servermanager::*; use servermanager::*;
use dataconnmanager::*; use dataconnmanager::*;
use streamutils::*;
use actix::prelude::*; use actix::prelude::*;
use std::sync::Arc; use std::sync::Arc;
use libbonknet::*;
use libbonknet::servermsg::*; use libbonknet::servermsg::*;
use libbonknet::clientmsg::*; use libbonknet::clientmsg::*;
use libbonknet::cert::*;
use rustls::{RootCertStore, ServerConfig}; use rustls::{RootCertStore, ServerConfig};
use rustls::server::WebPkiClientVerifier; use rustls::server::WebPkiClientVerifier;
use actix_tls::accept::rustls_0_22::{Acceptor as RustlsAcceptor, TlsStream}; use actix_tls::accept::rustls_0_22::{Acceptor as RustlsAcceptor, TlsStream};
@@ -19,32 +21,18 @@ use actix_server::Server;
use actix_rt::net::TcpStream; use actix_rt::net::TcpStream;
use actix_service::ServiceFactoryExt as _; use actix_service::ServiceFactoryExt as _;
use futures::{SinkExt, StreamExt}; use futures::{SinkExt, StreamExt};
use tokio_util::codec::{Framed, FramedRead, FramedWrite, LengthDelimitedCodec}; use tokio_util::codec::{Framed, LengthDelimitedCodec};
use tracing::{error, info, warn}; use tracing::{error, info, warn};
use rcgen::{Certificate, CertificateParams, DnType, KeyPair};
use tokio::io::{ReadHalf, WriteHalf};
type TransportStream = Framed<TlsStream<TcpStream>, LengthDelimitedCodec>;
type TransportStreamTx = FramedWrite<WriteHalf<TlsStream<TcpStream>>, LengthDelimitedCodec>;
type TransportStreamRx = FramedRead<ReadHalf<TlsStream<TcpStream>>, LengthDelimitedCodec>;
struct ServerCert { struct BrokerContext {
cert: Vec<u8>, broker_leaf: LeafCertPair<'static>,
prkey: Vec<u8>, client_ca: CACertPair<'static>,
} server_ca: CACertPair<'static>,
guestserver_ca: CACertPair<'static>,
fn generate_server_cert(root_cert: &Certificate, name: &str) -> ServerCert { servercert_db: Addr<ServerCertDB>,
let mut params = CertificateParams::new(vec!["entity.other.host".into(), format!("bonk.server.{name}")]); pendingdataconn_manager: Addr<PendingDataConnManager>,
params.distinguished_name.push(DnType::CommonName, name); server_manager: Addr<ServerManager>,
params.use_authority_key_identifier_extension = true;
params.key_usages.push(rcgen::KeyUsagePurpose::DigitalSignature);
params
.extended_key_usages
.push(rcgen::ExtendedKeyUsagePurpose::ClientAuth);
let certificate = Certificate::from_params(params).unwrap();
let cert = certificate.serialize_der_with_signer(root_cert).unwrap();
let prkey = certificate.serialize_private_key_der();
ServerCert { cert, prkey }
} }
@@ -53,72 +41,54 @@ async fn main() {
// Tracing Subscriber // Tracing Subscriber
let subscriber = tracing_subscriber::FmtSubscriber::new(); let subscriber = tracing_subscriber::FmtSubscriber::new();
tracing::subscriber::set_global_default(subscriber).unwrap(); tracing::subscriber::set_global_default(subscriber).unwrap();
// BROKER CERTS // Load Identity
let broker_root_cert_der = load_cert("certs/broker_root_cert.pem").unwrap(); let broker_leaf = LeafCertPair::load_from_file("certs_pem/broker.pem").unwrap();
let broker_cert_der = load_cert("certs/broker_cert.pem").unwrap(); let client_ca = CACertPair::load_from_file("certs_pem/client_root_ca.pem").unwrap();
let broker_prkey_der = load_prkey("certs/broker_key.pem").unwrap(); let server_ca = CACertPair::load_from_file("certs_pem/server_root_ca.pem").unwrap();
// SERVER ROOT let guestserver_ca = CACertPair::load_from_file("certs_pem/guestserver_root_ca.pem").unwrap();
let server_root_cert_der = load_cert("certs/server_root_cert.pem").unwrap(); // Load Actors
let server_root_prkey_der = load_prkey("certs/server_root_key.pem").unwrap(); let servercert_db = ServerCertDB::new("certsdb.sqlite").unwrap().start();
// GUESTSERVER ROOT let dataconn_manager = DataConnManager::new().start();
let guestserver_root_cert_der = load_cert("certs/guestserver_root_cert.pem").unwrap(); let pendingdataconn_manager = PendingDataConnManager::new(dataconn_manager).start();
// CLIENT ROOT let server_manager = ServerManager::new(pendingdataconn_manager.clone()).start();
let client_root_cert_der = load_cert("certs/client_root_cert.pem").unwrap(); // Create Context
// Client Verifier let ctx = Arc::new(BrokerContext {
broker_leaf,
client_ca,
server_ca,
guestserver_ca,
servercert_db,
pendingdataconn_manager,
server_manager,
});
// Pki Client Verifier
let mut broker_root_store = RootCertStore::empty(); let mut broker_root_store = RootCertStore::empty();
broker_root_store.add(server_root_cert_der.clone()).unwrap(); broker_root_store.add(ctx.server_ca.cert().clone()).unwrap();
broker_root_store.add(client_root_cert_der.clone()).unwrap(); broker_root_store.add(ctx.client_ca.cert().clone()).unwrap();
broker_root_store.add(guestserver_root_cert_der.clone()).unwrap(); broker_root_store.add(ctx.guestserver_ca.cert().clone()).unwrap();
let server_verifier = WebPkiClientVerifier::builder(Arc::new(broker_root_store)).build().unwrap(); let server_verifier = WebPkiClientVerifier::builder(Arc::new(broker_root_store)).build().unwrap();
// Configure TLS // Configure TLS
let server_tlsconfig = ServerConfig::builder() let server_tlsconfig = ServerConfig::builder()
// .with_no_client_auth()
.with_client_cert_verifier(server_verifier) .with_client_cert_verifier(server_verifier)
.with_single_cert(vec![broker_cert_der.clone(), broker_root_cert_der.clone()], broker_prkey_der.into()) .with_single_cert(ctx.broker_leaf.fullchain(), ctx.broker_leaf.clone_key().into())
.unwrap(); .unwrap();
let server_acceptor = RustlsAcceptor::new(server_tlsconfig); let server_acceptor = RustlsAcceptor::new(server_tlsconfig);
let server_root_cert_der = Arc::new(server_root_cert_der);
let server_root_prkey = KeyPair::from_der(server_root_prkey_der.secret_pkcs8_der()).unwrap();
let client_root_cert_der = Arc::new(client_root_cert_der);
let guestserver_root_cert_der = Arc::new(guestserver_root_cert_der);
let server_root_cert = Arc::new(Certificate::from_params(CertificateParams::from_ca_cert_der(
&server_root_cert_der,
server_root_prkey
).unwrap()).unwrap());
let scdb_addr = ServerCertDB::new().start();
let dcm_addr = DataConnManager::new().start();
let pdcm_addr = PendingDataConnManager::new(dcm_addr.clone()).start();
let sm_addr = ServerManager::new(pdcm_addr.clone()).start();
Server::build() Server::build()
.bind("server-command", ("localhost", 2541), move || { .bind("server-command", ("localhost", 2541), move || {
let server_root_cert_der = Arc::clone(&server_root_cert_der); let ctx = Arc::clone(&ctx);
let client_root_cert_der = Arc::clone(&client_root_cert_der);
let guestserver_root_cert_der = Arc::clone(&guestserver_root_cert_der);
let server_root_cert = Arc::clone(&server_root_cert);
let scdb_addr = scdb_addr.clone();
let pdcm_addr = pdcm_addr.clone();
let sm_addr = sm_addr.clone();
// Set up TLS service factory // Set up TLS service factory
server_acceptor server_acceptor
.clone() .clone()
.map_err(|err| println!("Rustls error: {:?}", err)) .map_err(|err| println!("Rustls error: {:?}", err))
.and_then(move |stream: TlsStream<TcpStream>| { .and_then(move |stream: TlsStream<TcpStream>| {
let server_root_cert_der = Arc::clone(&server_root_cert_der); let ctx = Arc::clone(&ctx);
let client_root_cert_der = Arc::clone(&client_root_cert_der);
let guestserver_root_cert_der = Arc::clone(&guestserver_root_cert_der);
let server_root_cert = Arc::clone(&server_root_cert);
let scdb_addr = scdb_addr.clone();
let pdcm_addr = pdcm_addr.clone();
let sm_addr = sm_addr.clone();
async move { async move {
let peer_certs = stream.get_ref().1.peer_certificates().unwrap(); let peer_certs = stream.get_ref().1.peer_certificates().unwrap();
let peer_cert_bytes = peer_certs.first().unwrap().to_vec(); let peer_cert_bytes = peer_certs.first().unwrap().to_vec();
let peer_root_cert_der = peer_certs.last().unwrap().clone(); let peer_root_cert_der = peer_certs.last().unwrap().clone();
if peer_root_cert_der == *server_root_cert_der { if &peer_root_cert_der == ctx.server_ca.cert() {
info!("Server connection"); info!("Server connection");
let mut transport = Framed::new(stream, LengthDelimitedCodec::new()); let mut transport = Framed::new(stream, LengthDelimitedCodec::new());
match transport.next().await { match transport.next().await {
@@ -127,8 +97,7 @@ async fn main() {
} }
Some(item) => match item { Some(item) => match item {
Ok(buf) => { Ok(buf) => {
use libbonknet::servermsg::{FromServerConnTypeMessage, ToServerConnTypeReply}; use FromServerConnTypeMessage::*;
use libbonknet::servermsg::FromServerConnTypeMessage::*;
let msg: FromServerConnTypeMessage = rmp_serde::from_slice(&buf).unwrap(); let msg: FromServerConnTypeMessage = rmp_serde::from_slice(&buf).unwrap();
info!("{:?}", msg); info!("{:?}", msg);
match msg { match msg {
@@ -136,11 +105,11 @@ async fn main() {
info!("SendCommand Stream"); info!("SendCommand Stream");
let reply = ToServerConnTypeReply::OkSendCommand; let reply = ToServerConnTypeReply::OkSendCommand;
transport.send(rmp_serde::to_vec(&reply).unwrap().into()).await.unwrap(); transport.send(rmp_serde::to_vec(&reply).unwrap().into()).await.unwrap();
server_command_handler(transport, peer_cert_bytes, scdb_addr).await; server_command_handler(&ctx, transport, peer_cert_bytes).await;
} }
Subscribe => { Subscribe => {
info!("Subscribe Stream"); info!("Subscribe Stream");
let name = match scdb_addr.send(FetchName { cert: peer_cert_bytes }).await.unwrap() { let name = match ctx.servercert_db.send(FetchName { cert: peer_cert_bytes }).await.unwrap() {
None => { None => {
error!("Cert has no name assigned!"); error!("Cert has no name assigned!");
let reply = ToServerConnTypeReply::GenericFailure; let reply = ToServerConnTypeReply::GenericFailure;
@@ -151,12 +120,12 @@ async fn main() {
}; };
let reply = ToServerConnTypeReply::OkSubscribe; let reply = ToServerConnTypeReply::OkSubscribe;
transport.send(rmp_serde::to_vec(&reply).unwrap().into()).await.unwrap(); transport.send(rmp_serde::to_vec(&reply).unwrap().into()).await.unwrap();
server_subscribe_handler(transport, name, sm_addr).await; server_subscribe_handler(&ctx, transport, name).await;
} }
OpenDataStream(conn_id) => { OpenDataStream(conn_id) => {
info!("OpenDataStream with {:?}", conn_id); info!("OpenDataStream with {:?}", conn_id);
let msg = RegisterStream::server(conn_id, transport); let msg = RegisterStream::server(conn_id, transport);
pdcm_addr.send(msg).await.unwrap().unwrap(); ctx.pendingdataconn_manager.send(msg).await.unwrap().unwrap();
} }
} }
} }
@@ -166,17 +135,16 @@ async fn main() {
} }
} }
info!("Server Task terminated!"); info!("Server Task terminated!");
} else if peer_root_cert_der == *guestserver_root_cert_der { } else if &peer_root_cert_der == ctx.guestserver_ca.cert() {
info!("GuestServer connection"); info!("GuestServer connection");
let server_root_cert = Arc::clone(&server_root_cert);
let codec = LengthDelimitedCodec::new(); let codec = LengthDelimitedCodec::new();
let transport = Framed::new(stream, codec); let transport = Framed::new(stream, codec);
guestserver_handler(transport, scdb_addr, &server_root_cert).await; guestserver_handler(&ctx, transport).await;
} else if peer_root_cert_der == *client_root_cert_der { } else if &peer_root_cert_der == ctx.client_ca.cert() {
info!("Client connection"); info!("Client connection");
let codec = LengthDelimitedCodec::new(); let codec = LengthDelimitedCodec::new();
let transport = Framed::new(stream, codec); let transport = Framed::new(stream, codec);
client_handler(transport, sm_addr, pdcm_addr).await; client_handler(&ctx, transport).await;
} else { } else {
error!("Unknown Root Certificate"); error!("Unknown Root Certificate");
} }
@@ -190,8 +158,8 @@ async fn main() {
.unwrap(); .unwrap();
} }
async fn server_subscribe_handler(transport: TransportStream, name: String, sm_addr: Addr<ServerManager>) { async fn server_subscribe_handler(ctx: &BrokerContext, transport: TransportStream, name: String) {
match sm_addr.send(StartTransporter { name, transport }).await.unwrap() { match ctx.server_manager.send(StartTransporter { name, transport }).await.unwrap() {
Ok(_) => { Ok(_) => {
info!("Stream sent to the manager"); info!("Stream sent to the manager");
} }
@@ -201,7 +169,7 @@ async fn server_subscribe_handler(transport: TransportStream, name: String, sm_a
} }
} }
async fn server_command_handler(mut transport: TransportStream, peer_cert_bytes: Vec<u8>, server_db_addr: Addr<ServerCertDB>) { async fn server_command_handler(ctx: &BrokerContext, mut transport: TransportStream, peer_cert_bytes: Vec<u8>) {
loop { loop {
match transport.next().await { match transport.next().await {
None => { None => {
@@ -216,7 +184,7 @@ async fn server_command_handler(mut transport: TransportStream, peer_cert_bytes:
match msg { match msg {
ChangeName { name } => { ChangeName { name } => {
info!("Changing name to {}", name); info!("Changing name to {}", name);
match server_db_addr.send(UnregisterServer { cert: peer_cert_bytes.clone() }).await.unwrap() { match ctx.servercert_db.send(UnregisterServer { cert: peer_cert_bytes.clone() }).await.unwrap() {
None => { None => {
info!("Nothing to unregister"); info!("Nothing to unregister");
} }
@@ -224,7 +192,7 @@ async fn server_command_handler(mut transport: TransportStream, peer_cert_bytes:
warn!("Unregistered from old name {}", old_name); warn!("Unregistered from old name {}", old_name);
} }
} }
let reply = match server_db_addr.send(RegisterServer { cert: peer_cert_bytes.clone(), name }).await.unwrap() { let reply = match ctx.servercert_db.send(RegisterServer { cert: peer_cert_bytes.clone(), name }).await.unwrap() {
Ok(_) => { Ok(_) => {
info!("Registered!"); info!("Registered!");
ToServerCommandReply::NameChanged ToServerCommandReply::NameChanged
@@ -238,7 +206,7 @@ async fn server_command_handler(mut transport: TransportStream, peer_cert_bytes:
} }
WhoAmI => { WhoAmI => {
info!("Asked who I am"); info!("Asked who I am");
let reply = match server_db_addr.send(FetchName { cert: peer_cert_bytes.clone() }).await.unwrap() { let reply = match ctx.servercert_db.send(FetchName { cert: peer_cert_bytes.clone() }).await.unwrap() {
None => { None => {
info!("I'm not registered anymore!? WTF"); info!("I'm not registered anymore!? WTF");
ToServerCommandReply::GenericFailure ToServerCommandReply::GenericFailure
@@ -261,8 +229,7 @@ async fn server_command_handler(mut transport: TransportStream, peer_cert_bytes:
} }
} }
// TODO: Considera creare un context dove vengono contenute tutte le chiavi e gli address da dare a tutti gli handler async fn guestserver_handler(ctx: &BrokerContext, mut transport: TransportStream) {
async fn guestserver_handler(mut transport: TransportStream, server_db_addr: Addr<ServerCertDB>, server_root_cert: &Arc<Certificate>) {
loop { loop {
match transport.next().await { match transport.next().await {
None => { None => {
@@ -278,21 +245,18 @@ async fn guestserver_handler(mut transport: TransportStream, server_db_addr: Add
match msg { match msg {
Announce { name } => { Announce { name } => {
info!("Announced with name {}", name); info!("Announced with name {}", name);
if server_db_addr.send(IsNameRegistered { name: name.clone() }).await.unwrap() { if ctx.servercert_db.send(IsNameRegistered { name: name.clone() }).await.unwrap() {
info!("Name {} already registered!", name); info!("Name {} already registered!", name);
let reply = ToGuestServerMessage::FailedNameAlreadyOccupied; let reply = ToGuestServerMessage::FailedNameAlreadyOccupied;
transport.send(rmp_serde::to_vec(&reply).unwrap().into()).await.unwrap(); transport.send(rmp_serde::to_vec(&reply).unwrap().into()).await.unwrap();
break; // Stop reading break; // Stop reading
} else { } else {
let cert = generate_server_cert(server_root_cert, name.as_str()); let cert = ctx.server_ca.sign_new_cert(server_leaf_certparams(name.as_str()));
server_db_addr.send(RegisterServer { ctx.servercert_db.send(RegisterServer {
cert: cert.cert.clone(), cert: cert.cert().to_vec(),
name, name,
}).await.unwrap().unwrap(); }).await.unwrap().unwrap();
let reply = ToGuestServerMessage::OkAnnounce { let reply = ToGuestServerMessage::make_okannounce(&cert);
server_cert: cert.cert,
server_prkey: cert.prkey
};
transport.send(rmp_serde::to_vec(&reply).unwrap().into()).await.unwrap(); transport.send(rmp_serde::to_vec(&reply).unwrap().into()).await.unwrap();
} }
} }
@@ -308,7 +272,7 @@ async fn guestserver_handler(mut transport: TransportStream, server_db_addr: Add
} }
} }
async fn client_handler(mut transport: TransportStream, sm_addr: Addr<ServerManager>, pdcm_addr: Addr<PendingDataConnManager>) { async fn client_handler(ctx: &BrokerContext, mut transport: TransportStream) {
loop { loop {
match transport.next().await { match transport.next().await {
None => { None => {
@@ -323,7 +287,7 @@ async fn client_handler(mut transport: TransportStream, sm_addr: Addr<ServerMana
match msg { match msg {
FromClientCommand::RequestServer { name } => { FromClientCommand::RequestServer { name } => {
info!("REQUESTED SERVER {}", name); info!("REQUESTED SERVER {}", name);
let data = sm_addr.send(RequestServer { name }).await.unwrap(); let data = ctx.server_manager.send(RequestServer { name }).await.unwrap();
match data { match data {
Ok(client_conn_id) => { Ok(client_conn_id) => {
let reply = ToClientResponse::OkRequest { conn_id: client_conn_id }; let reply = ToClientResponse::OkRequest { conn_id: client_conn_id };
@@ -337,14 +301,14 @@ async fn client_handler(mut transport: TransportStream, sm_addr: Addr<ServerMana
} }
FromClientCommand::ServerList => { FromClientCommand::ServerList => {
info!("Requested ServerList"); info!("Requested ServerList");
let data = sm_addr.send(GetServerList {}).await.unwrap(); let data = ctx.server_manager.send(GetServerList {}).await.unwrap();
let reply = ToClientResponse::OkServerList { data }; let reply = ToClientResponse::OkServerList { data };
transport.send(rmp_serde::to_vec(&reply).unwrap().into()).await.unwrap(); transport.send(rmp_serde::to_vec(&reply).unwrap().into()).await.unwrap();
} }
FromClientCommand::UpgradeToDataStream(conn_id) => { FromClientCommand::UpgradeToDataStream(conn_id) => {
info!("Upgrade to DataStream with conn_id {:?}", conn_id); info!("Upgrade to DataStream with conn_id {:?}", conn_id);
let msg = RegisterStream::client(conn_id, transport); let msg = RegisterStream::client(conn_id, transport);
pdcm_addr.send(msg).await.unwrap().unwrap(); ctx.pendingdataconn_manager.send(msg).await.unwrap().unwrap();
break; break;
} }
} }

View File

@@ -1,30 +1,21 @@
//! Pending Data Connect Database is where we store the Data Connection in creation.
//! Every row contains:
//! - The ServerConnID that the server will use (uuid)
//! - The ClientConnID that the client will use (uuid)
//! - If the Server is already connected and ready, the Server Socket
//! - If the Client is already connected and ready, the Client Socket
//!
//! When a row contains a valid ServerSocket and ClientSocket, then the row is removed and the
//! sockets moved to the Data Connection Manager
use actix::prelude::*; use actix::prelude::*;
use actix_tls::accept::rustls_0_22::TlsStream;
use futures::SinkExt; use futures::SinkExt;
use thiserror::Error; use thiserror::Error;
use tokio::net::TcpStream;
use tokio_util::codec::{Framed, LengthDelimitedCodec};
use tracing::{error, info}; use tracing::{error, info};
use uuid::Uuid; use uuid::Uuid;
use libbonknet::*; use libbonknet::*;
use crate::dataconnmanager::{DataConnManager, StartDataBridge}; use crate::dataconnmanager::{DataConnManager, StartDataBridge};
use crate::streamutils::*;
type TransportStream = Framed<TlsStream<TcpStream>, LengthDelimitedCodec>;
/*
L'idea e' che il database deve avere una riga per ogni connessione dati in nascita.
In ogni "riga" deve essere presente:
- Il ServerConnID che il server dovra usare (pkey)
- Il ClientConnID che il client dovra usare (pkey)
- Se gia connesso, il Socket del Server
- Se gia connesso il Socket del Client
Quando in una riga sono presenti sia ServerSocket che ClientSocket allora poppa la riga
e usa i socket per lanciare un thread/actor che faccia il piping dei dati
Ricerca riga deve avvenire sia tramite ServerConnID che ClientConnID se essi sono diversi come pianifico
Quindi l'ideale e' non usare una collection ma andare direttamente di Vector!
*/
#[derive(Error, Debug)] #[derive(Error, Debug)]
pub enum PendingDataConnError { pub enum PendingDataConnError {
@@ -63,11 +54,13 @@ impl RegisterStream {
} }
} }
/// Record with connection ID and TransportStream that can represent a client or a server
struct SideRecord { struct SideRecord {
conn_id: Uuid, conn_id: Uuid,
transport: Option<TransportStream>, transport: Option<TransportStream>,
} }
/// A DB Record, with a Server and a Client record containind IDs and Transports
struct Record { struct Record {
server: SideRecord, server: SideRecord,
client: SideRecord, client: SideRecord,
@@ -82,16 +75,29 @@ impl Record {
} }
// TODO: every 2 minutes verify the Records that have at least one stream invalidated and drop them // TODO: every 2 minutes verify the Records that have at least one stream invalidated and drop them
/// This contains the database of records of Data Connections in the phase of creation.
/// The approach is that when DataConnection is needed, a new Pending is created together with
/// two IDs to send to the client and server and that they will use to create a new TransportStream
/// that will be then registered to the right Record.
/// When a Record has the two TransportStreams needed, the Record is completed and can be consumed
/// and used to create a DataConnection sending the ownership of the two streams to the
/// Data Connection Manager.
pub struct PendingDataConnManager { pub struct PendingDataConnManager {
db: Vec<Record>, db: Vec<Record>,
dcm_addr: Addr<DataConnManager>, dataconn_manager: Addr<DataConnManager>,
}
impl Actor for PendingDataConnManager {
type Context = Context<Self>;
} }
impl PendingDataConnManager { impl PendingDataConnManager {
pub fn new(dcm_addr: Addr<DataConnManager>) -> Self { pub fn new(dataconn_manager_addr: Addr<DataConnManager>) -> Self {
PendingDataConnManager { db: Vec::new(), dcm_addr } PendingDataConnManager { db: Vec::new(), dataconn_manager: dataconn_manager_addr }
} }
/// The Row retrieve need to use ServerConnID AND ClientConnID as keys, so the best approach is to
/// use a single Vec and check the two IDs while iterating.
fn retrieve_siderecord(&mut self, kind: &RegisterKind, conn_id: &Uuid) -> Option<&mut SideRecord> { fn retrieve_siderecord(&mut self, kind: &RegisterKind, conn_id: &Uuid) -> Option<&mut SideRecord> {
use RegisterKind::*; use RegisterKind::*;
let record = match match kind { let record = match match kind {
@@ -108,10 +114,6 @@ impl PendingDataConnManager {
} }
} }
impl Actor for PendingDataConnManager {
type Context = Context<Self>;
}
impl Handler<NewPendingConn> for PendingDataConnManager { impl Handler<NewPendingConn> for PendingDataConnManager {
type Result = Result<(), PendingDataConnError>; type Result = Result<(), PendingDataConnError>;
@@ -175,7 +177,7 @@ impl Handler<TryStartDataStream> for PendingDataConnManager {
server_transport: real_record.server.transport.unwrap(), // Safety: We are sure this is Some server_transport: real_record.server.transport.unwrap(), // Safety: We are sure this is Some
client_transport: real_record.client.transport.unwrap(), // Safety: We are sure this is Some client_transport: real_record.client.transport.unwrap(), // Safety: We are sure this is Some
}; };
self.dcm_addr.do_send(msg); self.dataconn_manager.do_send(msg);
} }
Ok(()) Ok(())
} else { } else {

View File

@@ -1,15 +1,14 @@
use std::collections::HashMap; use std::path::Path;
use actix::prelude::*; use actix::prelude::*;
use rusqlite::{Connection};
use thiserror::Error; use thiserror::Error;
// TODO: Probably it's better to remove the pub from inside the structs and impl a new() funct
#[derive(Error, Debug)] #[derive(Error, Debug)]
pub enum DBError { pub enum DBError {
#[error("Certificate is already registered with name {0}")] #[error("Certificate is already registered with name {0}")]
CertAlreadyRegistered(String), CertAlreadyRegistered(String),
// #[error("Generic Failure")] #[error("Generic Failure")]
// GenericFailure, GenericFailure,
} }
#[derive(Message)] #[derive(Message)]
@@ -37,14 +36,29 @@ pub struct FetchName {
pub cert: Vec<u8>, pub cert: Vec<u8>,
} }
// TODO: Move into Sqlite DB with unique check on col1 and col2!!!! Right now name is not unique fn init_db(conn: Connection) -> rusqlite::Result<Connection> {
conn.execute(
"CREATE TABLE IF NOT EXISTS servercert (
cert BLOB PRIMARY KEY,
name TEXT NOT NULL
)",
(), // empty list of parameters.
)?;
Ok(conn)
}
// TODO: Right now name is not unique. Consider making it unique and checking it for duplication
pub struct ServerCertDB { pub struct ServerCertDB {
db: HashMap<Vec<u8>, String>, // Cert to Name conn: Connection,
} }
impl ServerCertDB { impl ServerCertDB {
pub fn new() -> Self { fn new_in_memory() -> rusqlite::Result<Self> {
ServerCertDB { db: HashMap::new() } Ok(ServerCertDB { conn: init_db(Connection::open_in_memory()?)? })
}
pub fn new<P: AsRef<Path>>(path: P) -> rusqlite::Result<Self> {
Ok(ServerCertDB { conn: init_db(Connection::open(path)?)? })
} }
} }
@@ -56,13 +70,23 @@ impl Handler<RegisterServer> for ServerCertDB {
type Result = Result<(), DBError>; type Result = Result<(), DBError>;
fn handle(&mut self, msg: RegisterServer, _ctx: &mut Self::Context) -> Self::Result { fn handle(&mut self, msg: RegisterServer, _ctx: &mut Self::Context) -> Self::Result {
match self.db.get(&msg.cert) { match self.conn.query_row(
None => { "SELECT name FROM servercert WHERE cert = ?1",
self.db.insert(msg.cert, msg.name); (&msg.cert,),
|row| row.get::<_, String>(0)
) {
Ok(name) => {
Err(DBError::CertAlreadyRegistered(name))
}
Err(rusqlite::Error::QueryReturnedNoRows) => {
self.conn.execute(
"INSERT INTO servercert (cert, name) VALUES (?1, ?2)",
(&msg.cert, &msg.name)
).map_err(|_| DBError::GenericFailure)?;
Ok(()) Ok(())
} }
Some(name) => { Err(_) => {
Err(DBError::CertAlreadyRegistered(name.clone())) Err(DBError::GenericFailure)
} }
} }
} }
@@ -72,7 +96,12 @@ impl Handler<IsNameRegistered> for ServerCertDB {
type Result = bool; type Result = bool;
fn handle(&mut self, msg: IsNameRegistered, _ctx: &mut Self::Context) -> Self::Result { fn handle(&mut self, msg: IsNameRegistered, _ctx: &mut Self::Context) -> Self::Result {
self.db.values().any(|x| *x == msg.name) let count: u64 = self.conn.query_row(
"SELECT COUNT(*) FROM servercert WHERE name = ?1",
(&msg.name,),
|row| row.get(0)
).unwrap();
count > 0
} }
} }
@@ -80,7 +109,21 @@ impl Handler<FetchName> for ServerCertDB {
type Result = Option<String>; type Result = Option<String>;
fn handle(&mut self, msg: FetchName, _ctx: &mut Self::Context) -> Self::Result { fn handle(&mut self, msg: FetchName, _ctx: &mut Self::Context) -> Self::Result {
self.db.get(&msg.cert).map(|s| s.to_owned()) match self.conn.query_row(
"SELECT name FROM servercert WHERE cert = ?1",
(&msg.cert,),
|row| row.get(0)
) {
Ok(name) => {
Some(name)
}
Err(rusqlite::Error::QueryReturnedNoRows) => {
None
}
Err(e) => {
panic!("Error during FetchName: {}", e);
}
}
} }
} }
@@ -88,6 +131,83 @@ impl Handler<UnregisterServer> for ServerCertDB {
type Result = Option<String>; type Result = Option<String>;
fn handle(&mut self, msg: UnregisterServer, _ctx: &mut Self::Context) -> Self::Result { fn handle(&mut self, msg: UnregisterServer, _ctx: &mut Self::Context) -> Self::Result {
self.db.remove(&msg.cert) match self.conn.query_row(
"SELECT name FROM servercert WHERE cert = ?1",
(&msg.cert,),
|row| row.get::<_, String>(0)
) {
Ok(name) => {
self.conn.execute(
"DELETE FROM servercert WHERE cert = ?1",
(&msg.cert,)
).unwrap();
Some(name)
}
Err(rusqlite::Error::QueryReturnedNoRows) => {
None
}
Err(e) => {
panic!("Error during UnregisterServer: {}", e);
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[actix::test]
async fn emptydb_isnameregistered() {
let servercert_db = ServerCertDB::new_in_memory().unwrap().start();
assert!(!servercert_db.send(IsNameRegistered { name: "test".into() }).await.unwrap());
}
#[actix::test]
async fn emptyvec() {
let servercert_db = ServerCertDB::new_in_memory().unwrap().start();
assert!(servercert_db.send(RegisterServer { cert: vec![], name: "test".into() }).await.unwrap().is_ok());
assert!(servercert_db.send(IsNameRegistered { name: "test".into() }).await.unwrap());
assert_eq!(servercert_db.send(FetchName { cert: vec![] }).await.unwrap().unwrap(), "test");
assert_eq!(servercert_db.send(UnregisterServer { cert: vec![] }).await.unwrap().unwrap(), "test");
assert!(!servercert_db.send(IsNameRegistered { name: "test".into() }).await.unwrap());
assert!(servercert_db.send(FetchName { cert: vec![] }).await.unwrap().is_none());
assert!(servercert_db.send(UnregisterServer { cert: vec![] }).await.unwrap().is_none());
}
#[actix::test]
async fn normalcert() {
let servercert_db = ServerCertDB::new_in_memory().unwrap().start();
let cert = vec![112, 111, 114, 99, 111, 100, 105, 111];
assert!(servercert_db.send(RegisterServer { cert: cert.clone(), name: "test2".into() }).await.unwrap().is_ok());
assert!(servercert_db.send(IsNameRegistered { name: "test2".into() }).await.unwrap());
assert_eq!(servercert_db.send(FetchName { cert: cert.clone() }).await.unwrap().unwrap(), "test2");
assert_eq!(servercert_db.send(UnregisterServer { cert: cert.clone() }).await.unwrap().unwrap(), "test2");
assert!(!servercert_db.send(IsNameRegistered { name: "test2".into() }).await.unwrap());
assert!(servercert_db.send(FetchName { cert: cert.clone() }).await.unwrap().is_none());
assert!(servercert_db.send(UnregisterServer { cert: cert.clone() }).await.unwrap().is_none());
}
#[actix::test]
async fn cert2_remains_after_delete_cert1() {
let servercert_db = ServerCertDB::new_in_memory().unwrap().start();
let cert1 = vec![112, 111, 114, 99, 111, 100, 105, 111];
let cert2 = vec![67, 65, 78, 68, 69, 68, 73, 79];
assert!(servercert_db.send(RegisterServer { cert: cert1.clone(), name: "test3".into() }).await.unwrap().is_ok());
assert!(servercert_db.send(IsNameRegistered { name: "test3".into() }).await.unwrap());
assert_eq!(servercert_db.send(FetchName { cert: cert1.clone() }).await.unwrap().unwrap(), "test3");
assert!(servercert_db.send(RegisterServer { cert: cert2.clone(), name: "test4".into() }).await.unwrap().is_ok());
assert!(servercert_db.send(IsNameRegistered { name: "test4".into() }).await.unwrap());
assert_eq!(servercert_db.send(FetchName { cert: cert2.clone() }).await.unwrap().unwrap(), "test4");
assert_eq!(servercert_db.send(UnregisterServer { cert: cert1.clone() }).await.unwrap().unwrap(), "test3");
assert!(!servercert_db.send(IsNameRegistered { name: "test3".into() }).await.unwrap());
assert!(servercert_db.send(FetchName { cert: cert1.clone() }).await.unwrap().is_none());
assert!(servercert_db.send(UnregisterServer { cert: cert1.clone() }).await.unwrap().is_none());
assert!(servercert_db.send(RegisterServer { cert: cert2.clone(), name: "test4".into() }).await.unwrap().is_err());
assert!(servercert_db.send(IsNameRegistered { name: "test4".into() }).await.unwrap());
assert_eq!(servercert_db.send(FetchName { cert: cert2.clone() }).await.unwrap().unwrap(), "test4");
} }
} }

View File

@@ -10,7 +10,7 @@ use tokio::sync::{Mutex, oneshot};
use tokio_util::bytes::{Bytes, BytesMut}; use tokio_util::bytes::{Bytes, BytesMut};
use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec}; use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec};
use tracing::{debug, error, info}; use tracing::{debug, error, info};
use crate::{TransportStream, TransportStreamRx, TransportStreamTx}; use crate::streamutils::*;
use uuid::Uuid; use uuid::Uuid;
use libbonknet::servermsg::*; use libbonknet::servermsg::*;
use crate::pendingdataconndb::*; use crate::pendingdataconndb::*;

View File

@@ -0,0 +1,8 @@
use actix_tls::accept::rustls_0_22::TlsStream;
use tokio::io::{ReadHalf, WriteHalf};
use tokio::net::TcpStream;
use tokio_util::codec::{Framed, FramedRead, FramedWrite, LengthDelimitedCodec};
pub type TransportStream = Framed<TlsStream<TcpStream>, LengthDelimitedCodec>;
pub type TransportStreamTx = FramedWrite<WriteHalf<TlsStream<TcpStream>>, LengthDelimitedCodec>;
pub type TransportStreamRx = FramedRead<ReadHalf<TlsStream<TcpStream>>, LengthDelimitedCodec>;

View File

@@ -2,6 +2,7 @@
name = "bonknet_client" name = "bonknet_client"
version = "0.1.0" version = "0.1.0"
edition = "2021" edition = "2021"
description = "A CLI Client for the Bonknet system"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
@@ -17,3 +18,5 @@ rmp-serde = "1.1.2"
tracing = "0.1" tracing = "0.1"
tracing-subscriber = "0.3" tracing-subscriber = "0.3"
uuid = { version = "1.7.0", features = ["serde"] } uuid = { version = "1.7.0", features = ["serde"] }
clap = { version = "4.5.2", features = ["derive"] }
thiserror = "1.0.56"

View File

@@ -2,7 +2,7 @@ use std::io::{Error, ErrorKind};
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use futures::{SinkExt, StreamExt}; use futures::{SinkExt, StreamExt};
use tokio::net::TcpStream; use tokio::net::{TcpStream, TcpListener};
use tokio_rustls::rustls::{ClientConfig, RootCertStore}; use tokio_rustls::rustls::{ClientConfig, RootCertStore};
use tokio_rustls::rustls::pki_types::ServerName; use tokio_rustls::rustls::pki_types::ServerName;
use tokio_rustls::TlsConnector; use tokio_rustls::TlsConnector;
@@ -78,22 +78,16 @@ async fn datastream(tlsconfig: ClientConfig, conn_id: Uuid) -> std::io::Result<(
} }
} }
} }
let (mut rx, mut tx) = tokio::io::split(transport.into_inner()); let mut outbound = transport.into_inner();
let mut stdout = tokio::io::stdout(); let listener = TcpListener::bind("127.0.0.1:9919").await?;
let mut stdin = tokio::io::stdin(); if let Ok((mut inbound, _)) = listener.accept().await {
let stdout_task = async move { match tokio::io::copy_bidirectional(&mut inbound, &mut outbound).await {
match tokio::io::copy(&mut rx, &mut stdout).await { Ok(bytes_copied) => info!("{bytes_copied:?}"),
Ok(bytes_copied) => info!("{bytes_copied}"),
Err(e) => error!("Error during copy: {e}"), Err(e) => error!("Error during copy: {e}"),
} }
}; } else {
let stdin_task = async move { error!("Error");
match tokio::io::copy(&mut stdin, &mut tx).await { }
Ok(bytes_copied) => info!("{bytes_copied}"),
Err(e) => error!("Error during copy: {e}"),
}
};
tokio::join!(stdout_task, stdin_task);
Ok(()) Ok(())
} }

View File

@@ -0,0 +1,61 @@
use std::sync::Arc;
use tokio::net::TcpStream;
use tokio_rustls::{TlsConnector};
use tokio_rustls::client::TlsStream;
use tokio_rustls::rustls::pki_types::ServerName;
use tokio_util::codec::{Framed, LengthDelimitedCodec};
use futures::{SinkExt, StreamExt};
use thiserror::Error;
use tokio_rustls::rustls::ClientConfig;
use tracing::{error};
use libbonknet::clientmsg::{FromClientCommand, ToClientResponse};
pub type TransportStream = Framed<TlsStream<TcpStream>, LengthDelimitedCodec>;
#[derive(Error, Debug)]
pub enum SendError {
#[error("Failure during serialization of the msg to send")]
MsgSerializationFailure(rmp_serde::encode::Error),
#[error("Failure during the transport send of the data")]
TransportError(std::io::Error),
#[error("Empty buffer during reply wait")]
EmptyBufferError,
#[error("Generic buffer error during response reading")]
GenericBufferError(std::io::Error),
#[error("Failure during deserialization of the reply msg")]
ReplyDeserializationFailure(rmp_serde::decode::Error),
}
pub struct BonkClient {
transport: TransportStream,
}
impl BonkClient {
pub async fn connect(tlsconfig: ClientConfig, dnsname: &str, port: u16) -> tokio::io::Result<BonkClient> {
// Load TLS Config
let connector = TlsConnector::from(Arc::new(tlsconfig));
let stream = TcpStream::connect(format!("{}:{}", dnsname, port)).await?;
let dnsservername = ServerName::try_from(dnsname).unwrap().to_owned();
let stream = connector.connect(dnsservername, stream).await?;
let transport = Framed::new(stream, LengthDelimitedCodec::new());
Ok(BonkClient { transport })
}
// TODO: make this private and make single calls for each Bonk interaction?
// The main idea is that the messages need to be inside the API and not ouside as elements of it
pub async fn send(&mut self, msg: &FromClientCommand) -> Result<ToClientResponse, SendError> {
let bmsg = rmp_serde::to_vec(msg).map_err(SendError::MsgSerializationFailure)?;
self.transport.send(bmsg.into()).await.map_err(SendError::TransportError)?;
match self.transport.next().await {
None => Err(SendError::EmptyBufferError),
Some(item) => match item {
Ok(buf) => {
let reply: ToClientResponse = rmp_serde::from_slice(&buf).map_err(SendError::ReplyDeserializationFailure)?;
Ok(reply)
}
Err(e) => Err(SendError::GenericBufferError(e))
}
}
}
}

View File

@@ -0,0 +1,64 @@
mod client;
use std::path::PathBuf;
use clap::{Parser, Subcommand};
use libbonknet::cert::*;
use tracing::{info};
use libbonknet::clientmsg::FromClientCommand;
use crate::client::BonkClient;
#[derive(Parser, Debug)]
#[command(version, about)]
struct Cli {
#[command(subcommand)]
command: Commands,
}
#[derive(Subcommand, Debug)]
enum Commands {
/// connect via ssh to the given remote_id.
Ssh {
remote_id: String,
},
/// send a file from source to remote. This command uses scp syntax and must contain wildcards to work (see examples).
Scp {
remote_id: String,
source: PathBuf,
dest: PathBuf,
},
/// get a list of all the servers connected to the broker.
Serverlist {
pattern: Option<String>,
},
}
#[tokio::main]
async fn main() -> std::io::Result<()> {
// Tracing Subscriber
let subscriber = tracing_subscriber::FmtSubscriber::new();
tracing::subscriber::set_global_default(subscriber).unwrap();
// CLI parsing
let cli = Cli::parse();
// Load Identity files
let client_ident = LeafCertPair::load_from_file("certs_pem/client.pem").unwrap();
let broker_root = BrokerRootCerts::load_from_file("certs_pem/broker_root_ca_cert.pem").unwrap();
let tlsconfig = client_ident.to_tlsclientconfig(&broker_root);
// Execute command
match &cli.command {
Commands::Ssh { remote_id } => {
info!("Run SSH on {remote_id}");
unimplemented!()
}
Commands::Scp { remote_id, source, dest } => {
info!("Run SCP on {} moving {} to {}", remote_id, source.display(), dest.display());
unimplemented!()
}
Commands::Serverlist { pattern } => {
info!("Run Clientlist with pattern {:?}", pattern);
let mut client = BonkClient::connect(tlsconfig, "localhost", 2541).await.unwrap();
let reply = client.send(&FromClientCommand::ServerList).await.unwrap();
info!("Reply: {:?}", reply);
}
}
Ok(())
}

View File

@@ -2,6 +2,7 @@
name = "bonknet_server" name = "bonknet_server"
version = "0.1.0" version = "0.1.0"
edition = "2021" edition = "2021"
description = "An automated Server for the Bonknet system"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
@@ -16,4 +17,7 @@ rustls-pemfile = "2.0.0"
rmp-serde = "1.1.2" rmp-serde = "1.1.2"
tracing = "0.1" tracing = "0.1"
tracing-subscriber = "0.3" tracing-subscriber = "0.3"
uuid = { version = "1.7.0", features = ["serde"] } uuid = { version = "1.7.0", features = ["serde"] }
clap = { version = "4.5.2", features = ["derive"] }
thiserror = "1.0.56"
serde = { version = "1.0" }

View File

@@ -2,12 +2,13 @@ use std::io::{Error, ErrorKind};
use std::sync::Arc; use std::sync::Arc;
use futures::{SinkExt, StreamExt}; use futures::{SinkExt, StreamExt};
use tokio::net::TcpStream; use tokio::net::TcpStream;
use tokio_rustls::rustls::{ClientConfig, RootCertStore}; use tokio_rustls::rustls::ClientConfig;
use tokio_rustls::rustls::pki_types::{CertificateDer, PrivatePkcs8KeyDer, ServerName}; use tokio_rustls::rustls::pki_types::ServerName;
use tokio_rustls::TlsConnector; use tokio_rustls::TlsConnector;
use tokio_util::codec::{Framed, LengthDelimitedCodec}; use tokio_util::codec::{Framed, LengthDelimitedCodec};
use libbonknet::*; use libbonknet::*;
use libbonknet::servermsg::*; use libbonknet::servermsg::*;
use libbonknet::cert::*;
use uuid::Uuid; use uuid::Uuid;
use tracing::{error, info}; use tracing::{error, info};
@@ -77,9 +78,11 @@ async fn datastream(tlsconfig: Arc<ClientConfig>, conn_id: Uuid) -> std::io::Res
} }
} }
} }
let (mut rx, mut tx) = tokio::io::split(transport.into_inner()); // Initialize outbound stream
match tokio::io::copy(&mut rx, &mut tx).await { let mut inbound = transport.into_inner();
Ok(bytes_copied) => info!("{bytes_copied}"), let mut outbound = TcpStream::connect("127.0.0.1:22").await?;
match tokio::io::copy_bidirectional(&mut inbound, &mut outbound).await {
Ok(bytes_copied) => info!("{bytes_copied:?}"),
Err(e) => error!("Error during copy: {e}"), Err(e) => error!("Error during copy: {e}"),
} }
Ok(()) Ok(())
@@ -90,22 +93,17 @@ async fn main() -> std::io::Result<()> {
// Tracing Subscriber // Tracing Subscriber
let subscriber = tracing_subscriber::FmtSubscriber::new(); let subscriber = tracing_subscriber::FmtSubscriber::new();
tracing::subscriber::set_global_default(subscriber).unwrap(); tracing::subscriber::set_global_default(subscriber).unwrap();
// Root certs to verify the server is the right one // Server Name
let mut broker_root_cert_store = RootCertStore::empty(); let my_name = "cicciopizza";
let broker_root_cert_der = load_cert("certs/broker_root_cert.pem").unwrap(); // Load Identity files
broker_root_cert_store.add(broker_root_cert_der).unwrap(); let guestserver_ident = LeafCertPair::load_from_file("certs_pem/guestserver.pem").unwrap();
// Public CA that will be used to generate the Server certificate let broker_root = BrokerRootCerts::load_from_file("certs_pem/broker_root_ca_cert.pem").unwrap();
let root_server_cert = load_cert("certs/server_root_cert.pem").unwrap();
// Guest CA
let root_guestserver_cert = load_cert("certs/guestserver_root_cert.pem").unwrap();
// Certificate used to do the first authentication
let guestserver_cert = load_cert("certs/guestserver_cert.pem").unwrap();
let guestserver_prkey = load_prkey("certs/guestserver_key.pem").unwrap();
// Load TLS Config // Load TLS Config
let guest_cert_chain = guestserver_ident.fullchain();
let tlsconfig = ClientConfig::builder() let tlsconfig = ClientConfig::builder()
.with_root_certificates(broker_root_cert_store.clone()) .with_root_certificates(broker_root.to_rootcertstore())
// .with_no_client_auth(); // .with_no_client_auth();
.with_client_auth_cert(vec![guestserver_cert, root_guestserver_cert], guestserver_prkey.into()) .with_client_auth_cert(guest_cert_chain, guestserver_ident.clone_key().into())
.unwrap(); .unwrap();
let connector = TlsConnector::from(Arc::new(tlsconfig)); let connector = TlsConnector::from(Arc::new(tlsconfig));
let dnsname = ServerName::try_from("localhost").unwrap(); let dnsname = ServerName::try_from("localhost").unwrap();
@@ -114,29 +112,26 @@ async fn main() -> std::io::Result<()> {
let stream = connector.connect(dnsname, stream).await?; let stream = connector.connect(dnsname, stream).await?;
let mut transport = Framed::new(stream, LengthDelimitedCodec::new()); let mut transport = Framed::new(stream, LengthDelimitedCodec::new());
let msg = FromGuestServerMessage::Announce { name: "cicciopizza".into() }; let msg = FromGuestServerMessage::Announce { name: my_name.into() };
transport.send(rmp_serde::to_vec(&msg).unwrap().into()).await.unwrap(); transport.send(rmp_serde::to_vec(&msg).unwrap().into()).await.unwrap();
let mut myserver_cert: Option<CertificateDer> = None; // TODO: Remove this two mutable option
let mut myserver_prkey: Option<PrivatePkcs8KeyDer> = None; let mut myserver_leaf: Option<LeafCertPair> = None;
match transport.next().await { match transport.next().await {
None => { None => {
panic!("None in the transport"); panic!("None in the transport");
} }
Some(item) => match item { Some(item) => match item {
Ok(buf) => { Ok(buf) => {
use libbonknet::servermsg::{okannounce_to_cert, ToGuestServerMessage};
use libbonknet::servermsg::ToGuestServerMessage::*; use libbonknet::servermsg::ToGuestServerMessage::*;
let msg: ToGuestServerMessage = rmp_serde::from_slice(&buf).unwrap(); let msg: ToGuestServerMessage = rmp_serde::from_slice(&buf).unwrap();
info!("{:?}", msg); info!("{:?}", msg);
match msg { match msg {
OkAnnounce { server_cert, server_prkey } => { OkAnnounce(payload) => {
info!("Ok Announce"); info!("Ok Announce");
let (server_cert, server_prkey) = okannounce_to_cert(server_cert, server_prkey); myserver_leaf = Some(payload.parse());
myserver_cert = Some(server_cert);
myserver_prkey = Some(server_prkey);
} }
FailedNameAlreadyOccupied => { FailedNameAlreadyOccupied => {
error!("Failed Announce"); error!("Failed Announce, name already occupied");
return Ok(()); return Ok(());
} }
} }
@@ -147,10 +142,10 @@ async fn main() -> std::io::Result<()> {
} }
} }
transport.close().await.unwrap(); transport.close().await.unwrap();
if let (Some(server_cert), Some(server_prkey)) = (myserver_cert, myserver_prkey) { if let Some(server_leaf) = myserver_leaf {
let tlsconfig = Arc::new(ClientConfig::builder() let tlsconfig = Arc::new(ClientConfig::builder()
.with_root_certificates(broker_root_cert_store) .with_root_certificates(broker_root.to_rootcertstore())
.with_client_auth_cert(vec![server_cert, root_server_cert], server_prkey.into()) .with_client_auth_cert(server_leaf.fullchain(), server_leaf.clone_key().into())
.unwrap()); .unwrap());
let connector = TlsConnector::from(Arc::clone(&tlsconfig)); let connector = TlsConnector::from(Arc::clone(&tlsconfig));
let dnsname = ServerName::try_from("localhost").unwrap(); let dnsname = ServerName::try_from("localhost").unwrap();

228
bonknet_server/src/main.rs Normal file
View File

@@ -0,0 +1,228 @@
mod transportstream;
use std::path::Path;
use std::sync::Arc;
use tokio::net::TcpStream;
use tokio_rustls::rustls::{ClientConfig};
use libbonknet::*;
use libbonknet::servermsg::*;
use libbonknet::cert::*;
use uuid::Uuid;
use tracing::{error, info};
use crate::transportstream::*;
#[derive(Clone)]
struct ServerContext<'a> {
identity: LeafCertPair<'a>,
broker_root: BrokerRootCerts<'a>,
my_name: String,
}
impl ServerContext<'_> {
fn tlsconfig(&self) -> ClientConfig {
self.identity.to_tlsclientconfig(&self.broker_root)
}
}
async fn subscribe(ctx: &ServerContext<'_>) -> Result<(), TransportError> {
use ToServerConnTypeReply::*;
let tlsconfig = Arc::new(ctx.tlsconfig());
let mut transport = TransportStream::new(Arc::clone(&tlsconfig)).await?;
let msg = FromServerConnTypeMessage::Subscribe;
match transport.send_and_listen::<_, ToServerConnTypeReply>(&msg).await? {
OkSubscribe => {
info!("Stream set in Subscribe mode");
}
GenericFailure => {
panic!("Generic Failure during SendCommand");
}
others => {
panic!("Unexpected Message type: {:?}", others);
}
}
loop {
use ToServerMessage::*;
let out: Option<FromServerReply>;
match transport.listen_one::<ToServerMessage>().await? {
Msg { reply_id, body } => {
use libbonknet::servermsg::ToServerMessageBody::*;
match body {
Request { conn_id } => {
info!("I'm required with Connection ID {}", conn_id);
out = Some(FromServerReply::Msg {
reply_id,
body: FromServerReplyBody::RequestAccepted,
});
// TODO: Spawn Datastream
tokio::spawn(datastream(ctx.tlsconfig(), conn_id));
}
}
}
Ping => {
info!("Ping!");
out = Some(FromServerReply::Pong);
}
}
if let Some(msg) = out {
transport.send(&msg).await?;
}
}
}
async fn datastream(tlsconfig: ClientConfig, conn_id: Uuid) -> Result<(), TransportError> {
use TransportError::StreamError;
use ToPeerDataStream::*;
let mut transport = TransportStream::new(Arc::new(tlsconfig)).await?;
let msg = FromServerConnTypeMessage::OpenDataStream(conn_id);
match transport.send_and_listen::<_, ToPeerDataStream>(&msg).await? {
OkDataStreamRequestAccepted => {
info!("Data Stream Accepted. Waiting for Open...");
}
Refused => {
panic!("Refused");
}
other => {
panic!("Unexpected response: {:?}", other);
}
}
match transport.listen_one().await? {
OkDataStreamOpen => {
info!("Data Stream Open!. Connecting Streams.");
}
Revoked => {
panic!("Data Stream Revoked!");
}
Refused => {
panic!("Refused");
}
other => {
panic!("Unexpected response: {:?}", other);
}
}
// Initialize outbound stream
let mut inbound = transport.into_inner().into_inner();
let mut outbound = TcpStream::connect("127.0.0.1:22").await.map_err(StreamError)?;
match tokio::io::copy_bidirectional(&mut inbound, &mut outbound).await {
Ok(bytes_copied) => info!("{bytes_copied:?}"),
Err(e) => error!("Error during copy: {e}"),
}
Ok(())
}
async fn announce<'a>(ctx: &ServerContext<'_>) -> Result<LeafCertPair<'a>, TransportError> {
use ToGuestServerMessage::*;
let mut transport = TransportStream::new(Arc::new(ctx.tlsconfig())).await.unwrap();
let msg = FromGuestServerMessage::Announce { name: ctx.my_name.clone() };
transport.send(&msg).await?;
for i in 0..10 {
match transport.listen_one().await? {
OkAnnounce(payload) => {
info!("Ok Announce");
transport.close().await?;
return Ok(payload.parse());
}
FailedNameAlreadyOccupied => {
let new_name = format!("ERROR_{}_{}", &ctx.my_name, i + 1);
error!("Failed Announce, name already occupied. Using {}", &new_name);
let msg = FromGuestServerMessage::Announce { name: new_name };
transport.send(&msg).await?;
}
}
}
panic!("Out of retry");
}
async fn server_name_confirmation<'a>(ctx: &ServerContext<'_>) -> Result<(), TransportError> {
use ToServerConnTypeReply::*;
use ToServerCommandReply::*;
let mut transport = TransportStream::new(Arc::new(ctx.tlsconfig())).await?;
// Declare Conn Type
let msg = FromServerConnTypeMessage::SendCommand;
match transport.send_and_listen(&msg).await? {
OkSendCommand => {}
e => {
panic!("Error during ConnType Declare: {:?}", e);
}
}
// Ask Name
let msg = FromServerCommandMessage::WhoAmI;
match transport.send_and_listen(&msg).await? {
YouAre { name } => {
if ctx.my_name == name {
return Ok(());
}
}
other => {
panic!("Unexpected response: {:?}", other);
}
}
// If name doesn't correspond, try to ChangeName. 10 retry. If they fail, keep the actual one without panic.
let msg = FromServerCommandMessage::ChangeName { name: ctx.my_name.clone() };
transport.send(&msg).await?;
for i in 0..10 {
match transport.listen_one().await? {
NameChanged => {
return Ok(());
}
NameNotAvailable => {
let msg = FromServerCommandMessage::ChangeName { name: format!("ERROR_{}_{}", ctx.my_name, i + 1) };
transport.send(&msg).await?;
}
other => {
panic!("Unexpected response: {:?}", other);
}
}
}
panic!("Exhausted Announce Retry");
}
#[tokio::main]
async fn main() -> std::io::Result<()> {
// Tracing Subscriber
let subscriber = tracing_subscriber::FmtSubscriber::new();
tracing::subscriber::set_global_default(subscriber).unwrap();
// Server Name
// TODO: from config using std
let my_name = String::from("cicciopizza");
let serverident_path = Path::new("server/serveridentity.pem"); // "/etc/bonknet/identity.pem"
let guestserverident_path = Path::new("server/guestidentity.pem"); // "/etc/bonknet/guestidentity.pem"
let broker_root_path = Path::new("certs_pem/broker_root_ca_cert.pem"); // "/etc/bonknet/broker_root_ca_cert.pem"
// Load Broker Root file
if !(broker_root_path.try_exists().unwrap() && broker_root_path.is_file()) {
panic!("No Broker Root file");
}
let broker_root = BrokerRootCerts::load_from_file("certs_pem/broker_root_ca_cert.pem").unwrap();
// Load Identity Files (if needed, contact the broker for generation)
let exists_serverident = serverident_path.try_exists().unwrap() && serverident_path.is_file();
let exists_guestident = guestserverident_path.try_exists().unwrap() && guestserverident_path.is_file();
// Do Guest registration and Name confirmation
let ctx = if !exists_serverident && exists_guestident {
info!("No Server Identity. Starting Guest Announce...");
// No Server, Yes Guest -> Use Guest to retrieve Server identity
let guest_ident = LeafCertPair::load_from_file(guestserverident_path).unwrap();
let ctx = ServerContext { identity: guest_ident, broker_root: broker_root.clone(), my_name: my_name.clone() };
let server_ident = announce(&ctx).await.unwrap();
server_ident.save_into_file(serverident_path).unwrap();
ServerContext { identity: server_ident, broker_root: broker_root.clone(), my_name: my_name.clone() }
} else if exists_serverident {
// Yes Server -> Use Server file as identity
info!("Server Identity found. Confirming...");
let server_ident = LeafCertPair::load_from_file(serverident_path).unwrap();
let ctx = ServerContext { identity: server_ident, broker_root: broker_root.clone(), my_name: my_name.clone() };
server_name_confirmation(&ctx).await.unwrap();
ctx
} else {
// No identity file present
panic!("No Identity file found");
};
// Start Server Main
let ctx = Arc::new(ctx);
loop {
if let Err(e) = subscribe(&ctx).await {
error!("Subscribe Task aborted due to {}", e);
error!("Restoring Subscribe Task...");
}
}
}

View File

@@ -0,0 +1,80 @@
use std::sync::Arc;
use futures::{SinkExt, StreamExt};
use tokio::net::TcpStream;
use tokio_rustls::client::TlsStream;
use tokio_rustls::rustls::pki_types::ServerName;
use tokio_rustls::TlsConnector;
use tokio_util::codec::{Framed, LengthDelimitedCodec};
use serde::{Serialize};
use serde::de::DeserializeOwned;
use thiserror::Error;
use tokio_rustls::rustls::ClientConfig;
#[derive(Error, Debug)]
pub enum TransportError {
#[error("Stream Terminated, next() returned None")]
StreamTerminated,
#[error("Stream Error")]
StreamError(std::io::Error),
#[error("Serialization Error")]
SerializeError(rmp_serde::encode::Error),
#[error("Deserialization Error")]
DeserializeError(rmp_serde::decode::Error),
}
pub struct TransportStream {
transport: Framed<TlsStream<TcpStream>, LengthDelimitedCodec>,
}
impl TransportStream {
pub async fn new(tlsconfig: Arc<ClientConfig>) -> Result<Self, TransportError> {
use TransportError::StreamError;
let connector = TlsConnector::from(tlsconfig);
let dnsname = ServerName::try_from("localhost").unwrap();
let stream = TcpStream::connect("localhost:2541").await.map_err(StreamError)?;
let stream = connector.connect(dnsname, stream).await.map_err(StreamError)?;
let transport = Framed::new(stream, LengthDelimitedCodec::new());
Ok(TransportStream { transport })
}
pub fn into_inner(self) -> Framed<TlsStream<TcpStream>, LengthDelimitedCodec> {
self.transport
}
pub async fn send<T: Serialize>(&mut self, msg: &T) -> Result<(), TransportError> {
use TransportError::*;
self.transport.send(rmp_serde::to_vec(&msg).map_err(SerializeError)?.into()).await.map_err(StreamError)?;
Ok(())
}
pub async fn send_and_listen<T: Serialize, U: DeserializeOwned>(&mut self, msg: &T) -> Result<U, TransportError> {
self.send(msg).await?;
self.listen_one().await
}
pub async fn listen_one<T: DeserializeOwned>(&mut self) -> Result<T, TransportError> {
use TransportError::*;
match self.transport.next().await {
None => {
// Stream Terminated
Err(StreamTerminated)
}
Some(item) => match item {
Ok(buf) => {
let msg: T = rmp_serde::from_slice(&buf).map_err(DeserializeError)?;
Ok(msg)
}
Err(e) => {
// Disconnection
Err(StreamError(e))
}
}
}
}
pub async fn close(mut self) -> Result<(), TransportError> {
self.transport.close().await.map_err(TransportError::StreamError)
}
}

View File

@@ -8,5 +8,8 @@ edition = "2021"
[dependencies] [dependencies]
tokio-rustls = "0.25.0" tokio-rustls = "0.25.0"
rustls-pemfile = "2.0.0" rustls-pemfile = "2.0.0"
rcgen = { version = "0.12.1", features = ["x509-parser"] }
x509-parser = "0.16.0"
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
uuid = { version = "1.7.0", features = ["serde"] } uuid = { version = "1.7.0", features = ["serde"] }
pem = "3.0.3"

227
libbonknet/src/cert.rs Normal file
View File

@@ -0,0 +1,227 @@
use std::io::{BufReader, Error, ErrorKind, Write};
use rcgen::{Certificate, CertificateParams, DnType, KeyPair};
use rustls_pemfile::{Item, read_all, read_one};
use tokio_rustls::rustls::{ClientConfig, RootCertStore};
use tokio_rustls::rustls::pki_types::{CertificateDer, PrivatePkcs8KeyDer};
use x509_parser::nom::AsBytes;
use pem::{self, Pem};
pub struct RawCertPair {
pub cert: Vec<u8>,
pub prkey: Vec<u8>,
}
// TODO: Eventually consider shifting from CertificateDer to x509_parser::X509Certificate
// for the extra information it can provide runtime
pub struct LeafCertPair<'a> {
cert: CertificateDer<'a>,
ca_chain: Vec<CertificateDer<'a>>,
prkey: PrivatePkcs8KeyDer<'a>,
}
impl Clone for LeafCertPair<'_> {
fn clone(&self) -> Self {
Self {
cert: self.cert.clone(),
ca_chain: self.ca_chain.clone(),
prkey: self.prkey.clone_key(),
}
}
}
impl LeafCertPair<'_> {
pub fn parse<'a>(cert: Vec<u8>, ca_chain: Vec<Vec<u8>>, prkey: Vec<u8>) -> LeafCertPair<'a> {
let cert = CertificateDer::from(cert);
let ca_chain = ca_chain.into_iter().map(CertificateDer::from).collect();
let prkey = PrivatePkcs8KeyDer::from(prkey);
LeafCertPair {
cert,
ca_chain,
prkey,
}
}
pub fn save_into_file<P: AsRef<std::path::Path>>(&self, filename: P) -> std::io::Result<()> {
let mut file = std::fs::File::create(filename)?;
let mut pems = vec![
Pem::new("CERTIFICATE", self.cert.as_bytes())
];
for c in self.ca_chain.iter() {
pems.push(Pem::new("CERTIFICATE", c.as_bytes()));
}
pems.push(Pem::new("PRIVATE KEY", self.prkey.secret_pkcs8_der()));
file.write_all(pem::encode_many(&pems).as_bytes())?;
Ok(())
}
pub fn load_from_file<'a, P: AsRef<std::path::Path>>(filename: P) -> std::io::Result<LeafCertPair<'a>> {
let file = std::fs::File::open(filename).unwrap();
let mut buf = BufReader::new(file);
if let Item::X509Certificate(cert) = read_one(&mut buf).unwrap().unwrap() {
let parsed_cert = x509_parser::parse_x509_certificate(&cert).unwrap().1;
if parsed_cert.is_ca() {
return Err(Error::new(ErrorKind::InvalidInput, "main cert is ca"));
}
let mut ca_chain: Vec<CertificateDer> = Vec::new();
for item in read_all(&mut buf) {
match item {
Ok(Item::X509Certificate(c)) => {
let parsed_cert = x509_parser::parse_x509_certificate(&c).unwrap().1;
if !parsed_cert.is_ca() {
return Err(Error::new(ErrorKind::InvalidInput, "chain cert is not ca"));
}
ca_chain.push(c);
},
Ok(Item::Pkcs8Key(prkey)) => {
return Ok(LeafCertPair { cert, ca_chain, prkey });
}
_ => {
return Err(Error::new(ErrorKind::InvalidInput, "invalid format"));
}
}
}
Err(Error::new(ErrorKind::InvalidInput, "pkcs8key not found"))
} else {
Err(Error::new(ErrorKind::InvalidInput, "no main x509 cert"))
}
}
pub fn cert(&self) -> &CertificateDer {
&self.cert
}
pub fn ca_chain(&self) -> &Vec<CertificateDer> {
&self.ca_chain
}
pub fn prkey(&self) -> &PrivatePkcs8KeyDer {
&self.prkey
}
pub fn clone_key(&self) -> PrivatePkcs8KeyDer<'static> {
self.prkey.clone_key()
}
pub fn fullchain<'a>(&self) -> Vec<CertificateDer<'a>> {
let mut chain: Vec<CertificateDer> = Vec::with_capacity(self.ca_chain.len() + 1);
chain.push(self.cert.clone().into_owned());
let mut ca_chain = self.ca_chain.clone().into_iter().map(|c| c.into_owned()).collect();
chain.append(&mut ca_chain);
chain
}
pub fn to_raw(&self) -> RawCertPair {
let cert = self.cert.to_vec();
let prkey = self.prkey.secret_pkcs8_der().to_vec();
RawCertPair { cert, prkey }
}
pub fn to_tlsclientconfig(&self, broker_root_certs: &BrokerRootCerts) -> ClientConfig {
let broker_root_cert_store = broker_root_certs.to_rootcertstore();
let cert_chain = self.fullchain();
ClientConfig::builder()
.with_root_certificates(broker_root_cert_store)
.with_client_auth_cert(cert_chain, self.prkey.clone_key().into()).expect("Invalid Cert chain")
}
}
pub struct CACertPair<'a> {
cert: CertificateDer<'a>,
ca_chain: Vec<CertificateDer<'a>>,
prkey: PrivatePkcs8KeyDer<'a>,
}
impl CACertPair<'_> {
pub fn load_from_file<'a, P: AsRef<std::path::Path>>(filename: P) -> std::io::Result<CACertPair<'a>> {
let file = std::fs::File::open(filename).unwrap();
let mut buf = BufReader::new(file);
if let Item::X509Certificate(cert) = read_one(&mut buf).unwrap().unwrap() {
let parsed_cert = x509_parser::parse_x509_certificate(&cert).unwrap().1;
if !parsed_cert.is_ca() {
return Err(Error::new(ErrorKind::InvalidInput, "cert is not ca"));
}
// TODO: Implement ca_chain reading (for now it's unused)
if let Item::Pkcs8Key(prkey) = read_one(&mut buf).unwrap().unwrap() {
Ok(CACertPair { cert, ca_chain: vec![], prkey })
} else {
Err(Error::new(ErrorKind::InvalidInput, "no ca pkcs8key"))
}
} else {
Err(Error::new(ErrorKind::InvalidInput, "no ca x509 cert"))
}
}
fn rcgen_keypair(&self) -> KeyPair {
KeyPair::from_der(self.prkey.secret_pkcs8_der()).unwrap()
}
fn rcgen_certificate(&self) -> Certificate {
Certificate::from_params(CertificateParams::from_ca_cert_der(
&self.cert,
self.rcgen_keypair()
).unwrap()).unwrap()
}
pub fn sign_new_cert(&self, params: CertificateParams) -> LeafCertPair {
let root_cert = self.rcgen_certificate();
let certificate = Certificate::from_params(params).unwrap();
let b_cert = certificate.serialize_der_with_signer(&root_cert).unwrap();
let b_prkey = certificate.serialize_private_key_der();
let cert = CertificateDer::from(b_cert);
let prkey = PrivatePkcs8KeyDer::from(b_prkey);
let mut ca_chain = Vec::with_capacity(self.ca_chain.len() + 1);
ca_chain.push(self.cert.clone());
ca_chain.extend(self.ca_chain.iter().cloned());
LeafCertPair { cert, ca_chain, prkey }
}
pub fn cert(&self) -> &CertificateDer {
&self.cert
}
pub fn clone_key(&self) -> PrivatePkcs8KeyDer {
self.prkey.clone_key()
}
pub fn to_raw(&self) -> RawCertPair {
let cert = self.cert.to_vec();
let prkey = self.prkey.secret_pkcs8_der().to_vec();
RawCertPair { cert, prkey }
}
}
pub fn server_leaf_certparams(name: &str) -> CertificateParams {
let mut params = CertificateParams::new(vec!["entity.other.host".into(), format!("bonk.server.{name}")]);
params.distinguished_name.push(DnType::CommonName, name);
params.use_authority_key_identifier_extension = true;
params.key_usages.push(rcgen::KeyUsagePurpose::DigitalSignature);
params.extended_key_usages.push(rcgen::ExtendedKeyUsagePurpose::ClientAuth);
params
}
#[derive(Clone)]
pub struct BrokerRootCerts<'a> {
root_cert: CertificateDer<'a>
}
impl BrokerRootCerts<'_> {
pub fn load_from_file<'a, P: AsRef<std::path::Path>>(filename: P) -> std::io::Result<BrokerRootCerts<'a>> {
let file = std::fs::File::open(filename).unwrap();
let mut buf = BufReader::new(file);
if let Item::X509Certificate(root_cert) = read_one(&mut buf).unwrap().unwrap() {
Ok(BrokerRootCerts { root_cert })
} else {
Err(Error::new(ErrorKind::InvalidInput, "no broker root x509 cert"))
}
}
pub fn to_rootcertstore(&self) -> RootCertStore {
let mut broker_root_cert_store = RootCertStore::empty();
broker_root_cert_store.add(self.root_cert.clone()).expect("Invalid Broker Root");
broker_root_cert_store
}
pub fn certs(&self) -> Vec<&CertificateDer> {
vec![&self.root_cert]
}
}

View File

@@ -1,5 +1,6 @@
pub mod servermsg; pub mod servermsg;
pub mod clientmsg; pub mod clientmsg;
pub mod cert;
use std::io::{BufReader, Error, ErrorKind}; use std::io::{BufReader, Error, ErrorKind};
use rustls_pemfile::{Item, read_one}; use rustls_pemfile::{Item, read_one};
@@ -28,6 +29,7 @@ pub fn load_prkey(filename: &str) -> std::io::Result<PrivatePkcs8KeyDer> {
} }
} }
// TODO: move all this inside Client and Server, for example using a DataStreamCmd(ToPeerDataStream)
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
pub enum ToPeerDataStream { pub enum ToPeerDataStream {
// You are now a DataStream, wait the Open message // You are now a DataStream, wait the Open message

View File

@@ -3,6 +3,7 @@ pub use crate::ToPeerDataStream;
use tokio_rustls::rustls::pki_types::{CertificateDer, PrivatePkcs8KeyDer}; use tokio_rustls::rustls::pki_types::{CertificateDer, PrivatePkcs8KeyDer};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use uuid::Uuid; use uuid::Uuid;
use crate::cert::LeafCertPair;
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
pub enum FromServerConnTypeMessage { pub enum FromServerConnTypeMessage {
@@ -77,17 +78,31 @@ pub fn okannounce_to_cert<'a>(server_cert: Vec<u8>, server_prkey: Vec<u8>) -> (C
(server_cert, server_prkey) (server_cert, server_prkey)
} }
#[derive(Debug, Serialize, Deserialize)]
pub struct OkAnnoucePayload {
server_cert: Vec<u8>,
ca_chain: Vec<Vec<u8>>,
server_prkey: Vec<u8>,
}
impl OkAnnoucePayload {
pub fn parse<'a>(self) -> LeafCertPair<'a> {
LeafCertPair::parse(self.server_cert, self.ca_chain, self.server_prkey)
}
}
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
pub enum ToGuestServerMessage { pub enum ToGuestServerMessage {
OkAnnounce { server_cert: Vec<u8>, server_prkey: Vec<u8> }, OkAnnounce(OkAnnoucePayload),
FailedNameAlreadyOccupied, FailedNameAlreadyOccupied,
} }
impl ToGuestServerMessage { impl ToGuestServerMessage {
pub fn make_okannounce(server_cert: CertificateDer, server_prkey: PrivatePkcs8KeyDer) -> Self { pub fn make_okannounce(server_leaf: &LeafCertPair) -> Self {
ToGuestServerMessage::OkAnnounce { ToGuestServerMessage::OkAnnounce(OkAnnoucePayload {
server_cert: server_cert.to_vec(), server_cert: server_leaf.cert().to_vec(),
server_prkey: server_prkey.secret_pkcs8_der().to_vec() ca_chain: server_leaf.ca_chain().iter().map(|c| c.to_vec()).collect(),
} server_prkey: server_leaf.prkey().secret_pkcs8_der().to_vec(),
})
} }
} }