Compare commits

...

4 Commits

20 changed files with 1014 additions and 162 deletions

2
.gitignore vendored
View File

@@ -1,3 +1,5 @@
/target
# Experiments with certificates
certs
certs_pem
/server

View File

@@ -25,7 +25,13 @@ rmp-serde = "1.1.2"
rcgen = { version = "0.12.1", features = ["x509-parser"] }
rand = "0.8.5"
uuid = { version = "1.7.0", features = ["v4", "serde"] }
rustls-pemfile = "2.0.0"
x509-parser = "0.16.0"
[[bin]]
name = "init_certs"
path = "src/bin/init_certs.rs"
[[bin]]
name = "init_certs_2"
path = "src/bin/init_certs_2.rs"

View File

@@ -0,0 +1,201 @@
use std::fs::File;
use std::io::Write;
use rcgen::{self, BasicConstraints, Certificate, CertificateParams, DnType};
fn server_root_cert() -> Certificate {
let subject_alt_names = vec!["hello.world.example".into()];
let mut certparams = CertificateParams::new(subject_alt_names);
certparams.is_ca = rcgen::IsCa::Ca(BasicConstraints::Unconstrained);
let mut distname = rcgen::DistinguishedName::new();
distname.push(DnType::OrganizationName, "Eister Corporation");
distname.push(DnType::CommonName, "Bonknet Server Root Cert CA");
certparams.distinguished_name = distname;
Certificate::from_params(certparams).unwrap()
}
fn server_cert() -> Certificate {
let mut params = CertificateParams::new(vec!["entity.other.host".into(), "bonk.server.1".into()]);
params.distinguished_name.push(DnType::CommonName, "Server 1");
params.use_authority_key_identifier_extension = true;
params.key_usages.push(rcgen::KeyUsagePurpose::DigitalSignature);
params
.extended_key_usages
.push(rcgen::ExtendedKeyUsagePurpose::ClientAuth);
Certificate::from_params(params).unwrap()
}
fn guestserver_root_cert() -> Certificate {
let subject_alt_names = vec!["hello.world.example".into()];
let mut certparams = CertificateParams::new(subject_alt_names);
certparams.is_ca = rcgen::IsCa::Ca(BasicConstraints::Unconstrained);
let mut distname = rcgen::DistinguishedName::new();
distname.push(DnType::OrganizationName, "Eister Corporation");
distname.push(DnType::CommonName, "Bonknet Guest Server Root Cert CA");
certparams.distinguished_name = distname;
Certificate::from_params(certparams).unwrap()
}
fn guestserver_cert() -> Certificate {
let mut params = CertificateParams::new(vec!["entity.other.host".into(), "bonk.guestserver.1".into()]);
params.distinguished_name.push(DnType::CommonName, "Guest Server 1");
params.use_authority_key_identifier_extension = true;
params.key_usages.push(rcgen::KeyUsagePurpose::DigitalSignature);
params
.extended_key_usages
.push(rcgen::ExtendedKeyUsagePurpose::ClientAuth);
Certificate::from_params(params).unwrap()
}
fn client_root_cert() -> Certificate {
let subject_alt_names = vec!["hello.world.example".into()];
let mut certparams = CertificateParams::new(subject_alt_names);
certparams.is_ca = rcgen::IsCa::Ca(BasicConstraints::Unconstrained);
let mut distname = rcgen::DistinguishedName::new();
distname.push(DnType::OrganizationName, "Eister Corporation");
distname.push(DnType::CommonName, "Bonknet Client Root Cert CA");
certparams.distinguished_name = distname;
Certificate::from_params(certparams).unwrap()
}
fn client_cert() -> Certificate {
let mut params = CertificateParams::new(vec!["entity.other.host".into(), "bonk.client.1".into()]);
params.distinguished_name.push(DnType::CommonName, "Client 1");
params.use_authority_key_identifier_extension = true;
params.key_usages.push(rcgen::KeyUsagePurpose::DigitalSignature);
params
.extended_key_usages
.push(rcgen::ExtendedKeyUsagePurpose::ClientAuth);
Certificate::from_params(params).unwrap()
}
fn broker_root_cert() -> Certificate {
let subject_alt_names = vec!["hello.world.example".into()];
let mut certparams = CertificateParams::new(subject_alt_names);
certparams.is_ca = rcgen::IsCa::Ca(BasicConstraints::Unconstrained);
let mut distname = rcgen::DistinguishedName::new();
distname.push(DnType::OrganizationName, "Eister Corporation");
distname.push(DnType::CommonName, "Bonknet Broker Root Cert CA");
certparams.distinguished_name = distname;
Certificate::from_params(certparams).unwrap()
}
fn broker_cert() -> Certificate {
let mut params = CertificateParams::new(vec!["entity.other.host".into(), "localhost".into()]);
params.distinguished_name.push(DnType::CommonName, "localhost");
params.use_authority_key_identifier_extension = true;
params.key_usages.push(rcgen::KeyUsagePurpose::DigitalSignature);
params
.extended_key_usages
.push(rcgen::ExtendedKeyUsagePurpose::ServerAuth);
Certificate::from_params(params).unwrap()
}
fn main() -> std::io::Result<()> {
// Generate Root CA Certificates
let server_root_cert = server_root_cert();
let guestserver_root_cert = guestserver_root_cert();
let client_root_cert = client_root_cert();
let broker_root_cert = broker_root_cert();
// Generate Leafs
let server_leaf_cert = server_cert();
let guestserver_leaf_cert = guestserver_cert();
let client_leaf_cert = client_cert();
let broker_leaf_cert = broker_cert();
// Generate PEMs
// every time you generate one, a new random number is taken, so different cert hashes!
// and we need this PEMs to appear in multiple files
// We don't need this for the pvkey because we generate them only one time for each cert
// IF YOU NEED TO WRITE PVKEY IN MULTIPLE FILES, PLEASE DO IT LIKE THESE LINES FOR THE x509!!!
let server_root_cert_pem = server_root_cert.serialize_pem().unwrap();
let guestserver_root_cert_pem = guestserver_root_cert.serialize_pem().unwrap();
let client_root_cert_pem = client_root_cert.serialize_pem().unwrap();
let broker_root_cert_pem = broker_root_cert.serialize_pem().unwrap();
let server_leaf_cert_pem = server_leaf_cert.serialize_pem_with_signer(&server_root_cert).unwrap();
let guestserver_leaf_cert_pem = guestserver_leaf_cert.serialize_pem_with_signer(&guestserver_root_cert).unwrap();
let client_leaf_cert_pem = client_leaf_cert.serialize_pem_with_signer(&client_root_cert).unwrap();
let broker_leaf_cert_pem = broker_leaf_cert.serialize_pem_with_signer(&broker_root_cert).unwrap();
// Root CA PEMs
/*
1 - CA Cert
2 - CA Prkey
*/
// Generate Server Root CA PEM
{
let mut pemfile = File::create("certs_pem/server_root_ca.pem")?;
pemfile.write_all(server_root_cert_pem.as_bytes())?;
pemfile.write_all(server_root_cert.serialize_private_key_pem().as_bytes())?;
}
// Generate GuestServer Root CA PEM
{
let mut pemfile = File::create("certs_pem/guestserver_root_ca.pem")?;
pemfile.write_all(guestserver_root_cert_pem.as_bytes())?;
pemfile.write_all(guestserver_root_cert.serialize_private_key_pem().as_bytes())?;
}
// Generate Client Root CA PEM
{
let mut pemfile = File::create("certs_pem/client_root_ca.pem")?;
pemfile.write_all(client_root_cert_pem.as_bytes())?;
pemfile.write_all(client_root_cert.serialize_private_key_pem().as_bytes())?;
}
// Generate Broker Root CA PEM
{
let mut pemfile = File::create("certs_pem/broker_root_ca.pem")?;
pemfile.write_all(broker_root_cert_pem.as_bytes())?;
pemfile.write_all(broker_root_cert.serialize_private_key_pem().as_bytes())?;
}
// Generate Broker CA Cert PEM for Server Authentication
{
let mut pemfile = File::create("certs_pem/broker_root_ca_cert.pem")?;
pemfile.write_all(broker_root_cert_pem.as_bytes())?;
}
// Generate Server Leaf PEM
/*
1 - Server Leaf Cert
2 - Server CA Cert chain
3 - Server Leaf Prkey
*/
{
let mut pemfile = File::create("certs_pem/server.pem")?;
pemfile.write_all(server_leaf_cert_pem.as_bytes())?;
pemfile.write_all(server_root_cert_pem.as_bytes())?;
pemfile.write_all(server_leaf_cert.serialize_private_key_pem().as_bytes())?;
}
// Generate GuestServer Leaf PEM
/*
1 - GuestServer Leaf Cert
2 - GuestServer CA Cert chain
3 - GuestServer Leaf Prkey
*/
{
let mut pemfile = File::create("certs_pem/guestserver.pem")?;
pemfile.write_all(guestserver_leaf_cert_pem.as_bytes())?;
pemfile.write_all(guestserver_root_cert_pem.as_bytes())?;
pemfile.write_all(guestserver_leaf_cert.serialize_private_key_pem().as_bytes())?;
}
// Generate Client Leaf PEM
/*
1 - Client Leaf Cert
2 - Client CA Cert chain
3 - Client Leaf Prkey
*/
{
let mut pemfile = File::create("certs_pem/client.pem")?;
pemfile.write_all(client_leaf_cert_pem.as_bytes())?;
pemfile.write_all(client_root_cert_pem.as_bytes())?;
pemfile.write_all(client_leaf_cert.serialize_private_key_pem().as_bytes())?;
}
// Generate Broker Leaf PEM
/*
1 - Broker Leaf Cert
2 - Broker CA Cert
3 - Broker Leaf Prkey
*/
{
let mut pemfile = File::create("certs_pem/broker.pem")?;
pemfile.write_all(broker_leaf_cert_pem.as_bytes())?;
pemfile.write_all(broker_root_cert_pem.as_bytes())?;
pemfile.write_all(broker_leaf_cert.serialize_private_key_pem().as_bytes())?;
}
println!("Certificates created");
Ok(())
}

View File

@@ -5,7 +5,7 @@ use futures::SinkExt;
use thiserror::Error;
use tracing::{info, error, warn};
use libbonknet::ToPeerDataStream;
use crate::TransportStream;
use crate::streamutils::*;
#[allow(dead_code)]
#[derive(Error, Debug)]

View File

@@ -2,16 +2,18 @@ mod servercertdb;
mod pendingdataconndb;
mod servermanager;
mod dataconnmanager;
mod streamutils;
use servercertdb::*;
use pendingdataconndb::*;
use servermanager::*;
use dataconnmanager::*;
use streamutils::*;
use actix::prelude::*;
use std::sync::Arc;
use libbonknet::*;
use libbonknet::servermsg::*;
use libbonknet::clientmsg::*;
use libbonknet::cert::*;
use rustls::{RootCertStore, ServerConfig};
use rustls::server::WebPkiClientVerifier;
use actix_tls::accept::rustls_0_22::{Acceptor as RustlsAcceptor, TlsStream};
@@ -19,32 +21,18 @@ use actix_server::Server;
use actix_rt::net::TcpStream;
use actix_service::ServiceFactoryExt as _;
use futures::{SinkExt, StreamExt};
use tokio_util::codec::{Framed, FramedRead, FramedWrite, LengthDelimitedCodec};
use tokio_util::codec::{Framed, LengthDelimitedCodec};
use tracing::{error, info, warn};
use rcgen::{Certificate, CertificateParams, DnType, KeyPair};
use tokio::io::{ReadHalf, WriteHalf};
type TransportStream = Framed<TlsStream<TcpStream>, LengthDelimitedCodec>;
type TransportStreamTx = FramedWrite<WriteHalf<TlsStream<TcpStream>>, LengthDelimitedCodec>;
type TransportStreamRx = FramedRead<ReadHalf<TlsStream<TcpStream>>, LengthDelimitedCodec>;
struct ServerCert {
cert: Vec<u8>,
prkey: Vec<u8>,
}
fn generate_server_cert(root_cert: &Certificate, name: &str) -> ServerCert {
let mut params = CertificateParams::new(vec!["entity.other.host".into(), format!("bonk.server.{name}")]);
params.distinguished_name.push(DnType::CommonName, name);
params.use_authority_key_identifier_extension = true;
params.key_usages.push(rcgen::KeyUsagePurpose::DigitalSignature);
params
.extended_key_usages
.push(rcgen::ExtendedKeyUsagePurpose::ClientAuth);
let certificate = Certificate::from_params(params).unwrap();
let cert = certificate.serialize_der_with_signer(root_cert).unwrap();
let prkey = certificate.serialize_private_key_der();
ServerCert { cert, prkey }
struct BrokerContext {
broker_leaf: LeafCertPair<'static>,
client_ca: CACertPair<'static>,
server_ca: CACertPair<'static>,
guestserver_ca: CACertPair<'static>,
scdb_addr: Addr<ServerCertDB>,
pdcm_addr: Addr<PendingDataConnManager>,
sm_addr: Addr<ServerManager>,
}
@@ -53,72 +41,54 @@ async fn main() {
// Tracing Subscriber
let subscriber = tracing_subscriber::FmtSubscriber::new();
tracing::subscriber::set_global_default(subscriber).unwrap();
// BROKER CERTS
let broker_root_cert_der = load_cert("certs/broker_root_cert.pem").unwrap();
let broker_cert_der = load_cert("certs/broker_cert.pem").unwrap();
let broker_prkey_der = load_prkey("certs/broker_key.pem").unwrap();
// SERVER ROOT
let server_root_cert_der = load_cert("certs/server_root_cert.pem").unwrap();
let server_root_prkey_der = load_prkey("certs/server_root_key.pem").unwrap();
// GUESTSERVER ROOT
let guestserver_root_cert_der = load_cert("certs/guestserver_root_cert.pem").unwrap();
// CLIENT ROOT
let client_root_cert_der = load_cert("certs/client_root_cert.pem").unwrap();
// Client Verifier
// Load Identity
let broker_leaf = LeafCertPair::load_from_file("certs_pem/broker.pem").unwrap();
let client_ca = CACertPair::load_from_file("certs_pem/client_root_ca.pem").unwrap();
let server_ca = CACertPair::load_from_file("certs_pem/server_root_ca.pem").unwrap();
let guestserver_ca = CACertPair::load_from_file("certs_pem/guestserver_root_ca.pem").unwrap();
// Load Actors
let scdb_addr = ServerCertDB::new().start();
let dcm_addr = DataConnManager::new().start();
let pdcm_addr = PendingDataConnManager::new(dcm_addr).start();
let sm_addr = ServerManager::new(pdcm_addr.clone()).start();
// Create Context
let ctx = Arc::new(BrokerContext {
broker_leaf,
client_ca,
server_ca,
guestserver_ca,
scdb_addr,
pdcm_addr,
sm_addr,
});
// Pki Client Verifier
let mut broker_root_store = RootCertStore::empty();
broker_root_store.add(server_root_cert_der.clone()).unwrap();
broker_root_store.add(client_root_cert_der.clone()).unwrap();
broker_root_store.add(guestserver_root_cert_der.clone()).unwrap();
broker_root_store.add(ctx.server_ca.cert().clone()).unwrap();
broker_root_store.add(ctx.client_ca.cert().clone()).unwrap();
broker_root_store.add(ctx.guestserver_ca.cert().clone()).unwrap();
let server_verifier = WebPkiClientVerifier::builder(Arc::new(broker_root_store)).build().unwrap();
// Configure TLS
let server_tlsconfig = ServerConfig::builder()
// .with_no_client_auth()
.with_client_cert_verifier(server_verifier)
.with_single_cert(vec![broker_cert_der.clone(), broker_root_cert_der.clone()], broker_prkey_der.into())
.with_single_cert(ctx.broker_leaf.fullchain(), ctx.broker_leaf.clone_key().into())
.unwrap();
let server_acceptor = RustlsAcceptor::new(server_tlsconfig);
let server_root_cert_der = Arc::new(server_root_cert_der);
let server_root_prkey = KeyPair::from_der(server_root_prkey_der.secret_pkcs8_der()).unwrap();
let client_root_cert_der = Arc::new(client_root_cert_der);
let guestserver_root_cert_der = Arc::new(guestserver_root_cert_der);
let server_root_cert = Arc::new(Certificate::from_params(CertificateParams::from_ca_cert_der(
&server_root_cert_der,
server_root_prkey
).unwrap()).unwrap());
let scdb_addr = ServerCertDB::new().start();
let dcm_addr = DataConnManager::new().start();
let pdcm_addr = PendingDataConnManager::new(dcm_addr.clone()).start();
let sm_addr = ServerManager::new(pdcm_addr.clone()).start();
Server::build()
.bind("server-command", ("localhost", 2541), move || {
let server_root_cert_der = Arc::clone(&server_root_cert_der);
let client_root_cert_der = Arc::clone(&client_root_cert_der);
let guestserver_root_cert_der = Arc::clone(&guestserver_root_cert_der);
let server_root_cert = Arc::clone(&server_root_cert);
let scdb_addr = scdb_addr.clone();
let pdcm_addr = pdcm_addr.clone();
let sm_addr = sm_addr.clone();
let ctx = Arc::clone(&ctx);
// Set up TLS service factory
server_acceptor
.clone()
.map_err(|err| println!("Rustls error: {:?}", err))
.and_then(move |stream: TlsStream<TcpStream>| {
let server_root_cert_der = Arc::clone(&server_root_cert_der);
let client_root_cert_der = Arc::clone(&client_root_cert_der);
let guestserver_root_cert_der = Arc::clone(&guestserver_root_cert_der);
let server_root_cert = Arc::clone(&server_root_cert);
let scdb_addr = scdb_addr.clone();
let pdcm_addr = pdcm_addr.clone();
let sm_addr = sm_addr.clone();
let ctx = Arc::clone(&ctx);
async move {
let peer_certs = stream.get_ref().1.peer_certificates().unwrap();
let peer_cert_bytes = peer_certs.first().unwrap().to_vec();
let peer_root_cert_der = peer_certs.last().unwrap().clone();
if peer_root_cert_der == *server_root_cert_der {
if &peer_root_cert_der == ctx.server_ca.cert() {
info!("Server connection");
let mut transport = Framed::new(stream, LengthDelimitedCodec::new());
match transport.next().await {
@@ -127,8 +97,7 @@ async fn main() {
}
Some(item) => match item {
Ok(buf) => {
use libbonknet::servermsg::{FromServerConnTypeMessage, ToServerConnTypeReply};
use libbonknet::servermsg::FromServerConnTypeMessage::*;
use FromServerConnTypeMessage::*;
let msg: FromServerConnTypeMessage = rmp_serde::from_slice(&buf).unwrap();
info!("{:?}", msg);
match msg {
@@ -136,11 +105,11 @@ async fn main() {
info!("SendCommand Stream");
let reply = ToServerConnTypeReply::OkSendCommand;
transport.send(rmp_serde::to_vec(&reply).unwrap().into()).await.unwrap();
server_command_handler(transport, peer_cert_bytes, scdb_addr).await;
server_command_handler(&ctx, transport, peer_cert_bytes).await;
}
Subscribe => {
info!("Subscribe Stream");
let name = match scdb_addr.send(FetchName { cert: peer_cert_bytes }).await.unwrap() {
let name = match ctx.scdb_addr.send(FetchName { cert: peer_cert_bytes }).await.unwrap() {
None => {
error!("Cert has no name assigned!");
let reply = ToServerConnTypeReply::GenericFailure;
@@ -151,12 +120,12 @@ async fn main() {
};
let reply = ToServerConnTypeReply::OkSubscribe;
transport.send(rmp_serde::to_vec(&reply).unwrap().into()).await.unwrap();
server_subscribe_handler(transport, name, sm_addr).await;
server_subscribe_handler(&ctx, transport, name).await;
}
OpenDataStream(conn_id) => {
info!("OpenDataStream with {:?}", conn_id);
let msg = RegisterStream::server(conn_id, transport);
pdcm_addr.send(msg).await.unwrap().unwrap();
ctx.pdcm_addr.send(msg).await.unwrap().unwrap();
}
}
}
@@ -166,17 +135,16 @@ async fn main() {
}
}
info!("Server Task terminated!");
} else if peer_root_cert_der == *guestserver_root_cert_der {
} else if &peer_root_cert_der == ctx.guestserver_ca.cert() {
info!("GuestServer connection");
let server_root_cert = Arc::clone(&server_root_cert);
let codec = LengthDelimitedCodec::new();
let transport = Framed::new(stream, codec);
guestserver_handler(transport, scdb_addr, &server_root_cert).await;
} else if peer_root_cert_der == *client_root_cert_der {
guestserver_handler(&ctx, transport).await;
} else if &peer_root_cert_der == ctx.client_ca.cert() {
info!("Client connection");
let codec = LengthDelimitedCodec::new();
let transport = Framed::new(stream, codec);
client_handler(transport, sm_addr, pdcm_addr).await;
client_handler(&ctx, transport).await;
} else {
error!("Unknown Root Certificate");
}
@@ -190,8 +158,8 @@ async fn main() {
.unwrap();
}
async fn server_subscribe_handler(transport: TransportStream, name: String, sm_addr: Addr<ServerManager>) {
match sm_addr.send(StartTransporter { name, transport }).await.unwrap() {
async fn server_subscribe_handler(ctx: &BrokerContext, transport: TransportStream, name: String) {
match ctx.sm_addr.send(StartTransporter { name, transport }).await.unwrap() {
Ok(_) => {
info!("Stream sent to the manager");
}
@@ -201,7 +169,7 @@ async fn server_subscribe_handler(transport: TransportStream, name: String, sm_a
}
}
async fn server_command_handler(mut transport: TransportStream, peer_cert_bytes: Vec<u8>, server_db_addr: Addr<ServerCertDB>) {
async fn server_command_handler(ctx: &BrokerContext, mut transport: TransportStream, peer_cert_bytes: Vec<u8>) {
loop {
match transport.next().await {
None => {
@@ -216,7 +184,7 @@ async fn server_command_handler(mut transport: TransportStream, peer_cert_bytes:
match msg {
ChangeName { name } => {
info!("Changing name to {}", name);
match server_db_addr.send(UnregisterServer { cert: peer_cert_bytes.clone() }).await.unwrap() {
match ctx.scdb_addr.send(UnregisterServer { cert: peer_cert_bytes.clone() }).await.unwrap() {
None => {
info!("Nothing to unregister");
}
@@ -224,7 +192,7 @@ async fn server_command_handler(mut transport: TransportStream, peer_cert_bytes:
warn!("Unregistered from old name {}", old_name);
}
}
let reply = match server_db_addr.send(RegisterServer { cert: peer_cert_bytes.clone(), name }).await.unwrap() {
let reply = match ctx.scdb_addr.send(RegisterServer { cert: peer_cert_bytes.clone(), name }).await.unwrap() {
Ok(_) => {
info!("Registered!");
ToServerCommandReply::NameChanged
@@ -238,7 +206,7 @@ async fn server_command_handler(mut transport: TransportStream, peer_cert_bytes:
}
WhoAmI => {
info!("Asked who I am");
let reply = match server_db_addr.send(FetchName { cert: peer_cert_bytes.clone() }).await.unwrap() {
let reply = match ctx.scdb_addr.send(FetchName { cert: peer_cert_bytes.clone() }).await.unwrap() {
None => {
info!("I'm not registered anymore!? WTF");
ToServerCommandReply::GenericFailure
@@ -261,8 +229,7 @@ async fn server_command_handler(mut transport: TransportStream, peer_cert_bytes:
}
}
// TODO: Considera creare un context dove vengono contenute tutte le chiavi e gli address da dare a tutti gli handler
async fn guestserver_handler(mut transport: TransportStream, server_db_addr: Addr<ServerCertDB>, server_root_cert: &Arc<Certificate>) {
async fn guestserver_handler(ctx: &BrokerContext, mut transport: TransportStream) {
loop {
match transport.next().await {
None => {
@@ -278,21 +245,18 @@ async fn guestserver_handler(mut transport: TransportStream, server_db_addr: Add
match msg {
Announce { name } => {
info!("Announced with name {}", name);
if server_db_addr.send(IsNameRegistered { name: name.clone() }).await.unwrap() {
if ctx.scdb_addr.send(IsNameRegistered { name: name.clone() }).await.unwrap() {
info!("Name {} already registered!", name);
let reply = ToGuestServerMessage::FailedNameAlreadyOccupied;
transport.send(rmp_serde::to_vec(&reply).unwrap().into()).await.unwrap();
break; // Stop reading
} else {
let cert = generate_server_cert(server_root_cert, name.as_str());
server_db_addr.send(RegisterServer {
cert: cert.cert.clone(),
let cert = ctx.server_ca.sign_new_cert(server_leaf_certparams(name.as_str()));
ctx.scdb_addr.send(RegisterServer {
cert: cert.cert().to_vec(),
name,
}).await.unwrap().unwrap();
let reply = ToGuestServerMessage::OkAnnounce {
server_cert: cert.cert,
server_prkey: cert.prkey
};
let reply = ToGuestServerMessage::make_okannounce(&cert);
transport.send(rmp_serde::to_vec(&reply).unwrap().into()).await.unwrap();
}
}
@@ -308,7 +272,7 @@ async fn guestserver_handler(mut transport: TransportStream, server_db_addr: Add
}
}
async fn client_handler(mut transport: TransportStream, sm_addr: Addr<ServerManager>, pdcm_addr: Addr<PendingDataConnManager>) {
async fn client_handler(ctx: &BrokerContext, mut transport: TransportStream) {
loop {
match transport.next().await {
None => {
@@ -323,7 +287,7 @@ async fn client_handler(mut transport: TransportStream, sm_addr: Addr<ServerMana
match msg {
FromClientCommand::RequestServer { name } => {
info!("REQUESTED SERVER {}", name);
let data = sm_addr.send(RequestServer { name }).await.unwrap();
let data = ctx.sm_addr.send(RequestServer { name }).await.unwrap();
match data {
Ok(client_conn_id) => {
let reply = ToClientResponse::OkRequest { conn_id: client_conn_id };
@@ -337,14 +301,14 @@ async fn client_handler(mut transport: TransportStream, sm_addr: Addr<ServerMana
}
FromClientCommand::ServerList => {
info!("Requested ServerList");
let data = sm_addr.send(GetServerList {}).await.unwrap();
let data = ctx.sm_addr.send(GetServerList {}).await.unwrap();
let reply = ToClientResponse::OkServerList { data };
transport.send(rmp_serde::to_vec(&reply).unwrap().into()).await.unwrap();
}
FromClientCommand::UpgradeToDataStream(conn_id) => {
info!("Upgrade to DataStream with conn_id {:?}", conn_id);
let msg = RegisterStream::client(conn_id, transport);
pdcm_addr.send(msg).await.unwrap().unwrap();
ctx.pdcm_addr.send(msg).await.unwrap().unwrap();
break;
}
}

View File

@@ -1,15 +1,11 @@
use actix::prelude::*;
use actix_tls::accept::rustls_0_22::TlsStream;
use futures::SinkExt;
use thiserror::Error;
use tokio::net::TcpStream;
use tokio_util::codec::{Framed, LengthDelimitedCodec};
use tracing::{error, info};
use uuid::Uuid;
use libbonknet::*;
use crate::dataconnmanager::{DataConnManager, StartDataBridge};
type TransportStream = Framed<TlsStream<TcpStream>, LengthDelimitedCodec>;
use crate::streamutils::*;
/*
L'idea e' che il database deve avere una riga per ogni connessione dati in nascita.

View File

@@ -10,7 +10,7 @@ use tokio::sync::{Mutex, oneshot};
use tokio_util::bytes::{Bytes, BytesMut};
use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec};
use tracing::{debug, error, info};
use crate::{TransportStream, TransportStreamRx, TransportStreamTx};
use crate::streamutils::*;
use uuid::Uuid;
use libbonknet::servermsg::*;
use crate::pendingdataconndb::*;

View File

@@ -0,0 +1,8 @@
use actix_tls::accept::rustls_0_22::TlsStream;
use tokio::io::{ReadHalf, WriteHalf};
use tokio::net::TcpStream;
use tokio_util::codec::{Framed, FramedRead, FramedWrite, LengthDelimitedCodec};
pub type TransportStream = Framed<TlsStream<TcpStream>, LengthDelimitedCodec>;
pub type TransportStreamTx = FramedWrite<WriteHalf<TlsStream<TcpStream>>, LengthDelimitedCodec>;
pub type TransportStreamRx = FramedRead<ReadHalf<TlsStream<TcpStream>>, LengthDelimitedCodec>;

View File

@@ -2,6 +2,7 @@
name = "bonknet_client"
version = "0.1.0"
edition = "2021"
description = "A CLI Client for the Bonknet system"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
@@ -17,3 +18,5 @@ rmp-serde = "1.1.2"
tracing = "0.1"
tracing-subscriber = "0.3"
uuid = { version = "1.7.0", features = ["serde"] }
clap = { version = "4.5.2", features = ["derive"] }
thiserror = "1.0.56"

View File

@@ -2,7 +2,7 @@ use std::io::{Error, ErrorKind};
use std::sync::Arc;
use std::time::Duration;
use futures::{SinkExt, StreamExt};
use tokio::net::TcpStream;
use tokio::net::{TcpStream, TcpListener};
use tokio_rustls::rustls::{ClientConfig, RootCertStore};
use tokio_rustls::rustls::pki_types::ServerName;
use tokio_rustls::TlsConnector;
@@ -78,22 +78,16 @@ async fn datastream(tlsconfig: ClientConfig, conn_id: Uuid) -> std::io::Result<(
}
}
}
let (mut rx, mut tx) = tokio::io::split(transport.into_inner());
let mut stdout = tokio::io::stdout();
let mut stdin = tokio::io::stdin();
let stdout_task = async move {
match tokio::io::copy(&mut rx, &mut stdout).await {
Ok(bytes_copied) => info!("{bytes_copied}"),
let mut outbound = transport.into_inner();
let listener = TcpListener::bind("127.0.0.1:9919").await?;
if let Ok((mut inbound, _)) = listener.accept().await {
match tokio::io::copy_bidirectional(&mut inbound, &mut outbound).await {
Ok(bytes_copied) => info!("{bytes_copied:?}"),
Err(e) => error!("Error during copy: {e}"),
}
};
let stdin_task = async move {
match tokio::io::copy(&mut stdin, &mut tx).await {
Ok(bytes_copied) => info!("{bytes_copied}"),
Err(e) => error!("Error during copy: {e}"),
}
};
tokio::join!(stdout_task, stdin_task);
} else {
error!("Error");
}
Ok(())
}

View File

@@ -0,0 +1,61 @@
use std::sync::Arc;
use tokio::net::TcpStream;
use tokio_rustls::{TlsConnector};
use tokio_rustls::client::TlsStream;
use tokio_rustls::rustls::pki_types::ServerName;
use tokio_util::codec::{Framed, LengthDelimitedCodec};
use futures::{SinkExt, StreamExt};
use thiserror::Error;
use tokio_rustls::rustls::ClientConfig;
use tracing::{error};
use libbonknet::clientmsg::{FromClientCommand, ToClientResponse};
pub type TransportStream = Framed<TlsStream<TcpStream>, LengthDelimitedCodec>;
#[derive(Error, Debug)]
pub enum SendError {
#[error("Failure during serialization of the msg to send")]
MsgSerializationFailure(rmp_serde::encode::Error),
#[error("Failure during the transport send of the data")]
TransportError(std::io::Error),
#[error("Empty buffer during reply wait")]
EmptyBufferError,
#[error("Generic buffer error during response reading")]
GenericBufferError(std::io::Error),
#[error("Failure during deserialization of the reply msg")]
ReplyDeserializationFailure(rmp_serde::decode::Error),
}
pub struct BonkClient {
transport: TransportStream,
}
impl BonkClient {
pub async fn connect(tlsconfig: ClientConfig, dnsname: &str, port: u16) -> tokio::io::Result<BonkClient> {
// Load TLS Config
let connector = TlsConnector::from(Arc::new(tlsconfig));
let stream = TcpStream::connect(format!("{}:{}", dnsname, port)).await?;
let dnsservername = ServerName::try_from(dnsname).unwrap().to_owned();
let stream = connector.connect(dnsservername, stream).await?;
let transport = Framed::new(stream, LengthDelimitedCodec::new());
Ok(BonkClient { transport })
}
// TODO: make this private and make single calls for each Bonk interaction?
// The main idea is that the messages need to be inside the API and not ouside as elements of it
pub async fn send(&mut self, msg: &FromClientCommand) -> Result<ToClientResponse, SendError> {
let bmsg = rmp_serde::to_vec(msg).map_err(SendError::MsgSerializationFailure)?;
self.transport.send(bmsg.into()).await.map_err(SendError::TransportError)?;
match self.transport.next().await {
None => Err(SendError::EmptyBufferError),
Some(item) => match item {
Ok(buf) => {
let reply: ToClientResponse = rmp_serde::from_slice(&buf).map_err(SendError::ReplyDeserializationFailure)?;
Ok(reply)
}
Err(e) => Err(SendError::GenericBufferError(e))
}
}
}
}

View File

@@ -0,0 +1,64 @@
mod client;
use std::path::PathBuf;
use clap::{Parser, Subcommand};
use libbonknet::cert::*;
use tracing::{info};
use libbonknet::clientmsg::FromClientCommand;
use crate::client::BonkClient;
#[derive(Parser, Debug)]
#[command(version, about)]
struct Cli {
#[command(subcommand)]
command: Commands,
}
#[derive(Subcommand, Debug)]
enum Commands {
/// connect via ssh to the given remote_id.
Ssh {
remote_id: String,
},
/// send a file from source to remote. This command uses scp syntax and must contain wildcards to work (see examples).
Scp {
remote_id: String,
source: PathBuf,
dest: PathBuf,
},
/// get a list of all the servers connected to the broker.
Serverlist {
pattern: Option<String>,
},
}
#[tokio::main]
async fn main() -> std::io::Result<()> {
// Tracing Subscriber
let subscriber = tracing_subscriber::FmtSubscriber::new();
tracing::subscriber::set_global_default(subscriber).unwrap();
// CLI parsing
let cli = Cli::parse();
// Load Identity files
let client_ident = LeafCertPair::load_from_file("certs_pem/client.pem").unwrap();
let broker_root = BrokerRootCerts::load_from_file("certs_pem/broker_root_ca_cert.pem").unwrap();
let tlsconfig = client_ident.to_tlsclientconfig(&broker_root);
// Execute command
match &cli.command {
Commands::Ssh { remote_id } => {
info!("Run SSH on {remote_id}");
unimplemented!()
}
Commands::Scp { remote_id, source, dest } => {
info!("Run SCP on {} moving {} to {}", remote_id, source.display(), dest.display());
unimplemented!()
}
Commands::Serverlist { pattern } => {
info!("Run Clientlist with pattern {:?}", pattern);
let mut client = BonkClient::connect(tlsconfig, "localhost", 2541).await.unwrap();
let reply = client.send(&FromClientCommand::ServerList).await.unwrap();
info!("Reply: {:?}", reply);
}
}
Ok(())
}

View File

@@ -2,6 +2,7 @@
name = "bonknet_server"
version = "0.1.0"
edition = "2021"
description = "An automated Server for the Bonknet system"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
@@ -17,3 +18,6 @@ rmp-serde = "1.1.2"
tracing = "0.1"
tracing-subscriber = "0.3"
uuid = { version = "1.7.0", features = ["serde"] }
clap = { version = "4.5.2", features = ["derive"] }
thiserror = "1.0.56"
serde = { version = "1.0" }

View File

@@ -2,12 +2,13 @@ use std::io::{Error, ErrorKind};
use std::sync::Arc;
use futures::{SinkExt, StreamExt};
use tokio::net::TcpStream;
use tokio_rustls::rustls::{ClientConfig, RootCertStore};
use tokio_rustls::rustls::pki_types::{CertificateDer, PrivatePkcs8KeyDer, ServerName};
use tokio_rustls::rustls::ClientConfig;
use tokio_rustls::rustls::pki_types::ServerName;
use tokio_rustls::TlsConnector;
use tokio_util::codec::{Framed, LengthDelimitedCodec};
use libbonknet::*;
use libbonknet::servermsg::*;
use libbonknet::cert::*;
use uuid::Uuid;
use tracing::{error, info};
@@ -77,9 +78,11 @@ async fn datastream(tlsconfig: Arc<ClientConfig>, conn_id: Uuid) -> std::io::Res
}
}
}
let (mut rx, mut tx) = tokio::io::split(transport.into_inner());
match tokio::io::copy(&mut rx, &mut tx).await {
Ok(bytes_copied) => info!("{bytes_copied}"),
// Initialize outbound stream
let mut inbound = transport.into_inner();
let mut outbound = TcpStream::connect("127.0.0.1:22").await?;
match tokio::io::copy_bidirectional(&mut inbound, &mut outbound).await {
Ok(bytes_copied) => info!("{bytes_copied:?}"),
Err(e) => error!("Error during copy: {e}"),
}
Ok(())
@@ -90,22 +93,17 @@ async fn main() -> std::io::Result<()> {
// Tracing Subscriber
let subscriber = tracing_subscriber::FmtSubscriber::new();
tracing::subscriber::set_global_default(subscriber).unwrap();
// Root certs to verify the server is the right one
let mut broker_root_cert_store = RootCertStore::empty();
let broker_root_cert_der = load_cert("certs/broker_root_cert.pem").unwrap();
broker_root_cert_store.add(broker_root_cert_der).unwrap();
// Public CA that will be used to generate the Server certificate
let root_server_cert = load_cert("certs/server_root_cert.pem").unwrap();
// Guest CA
let root_guestserver_cert = load_cert("certs/guestserver_root_cert.pem").unwrap();
// Certificate used to do the first authentication
let guestserver_cert = load_cert("certs/guestserver_cert.pem").unwrap();
let guestserver_prkey = load_prkey("certs/guestserver_key.pem").unwrap();
// Server Name
let my_name = "cicciopizza";
// Load Identity files
let guestserver_ident = LeafCertPair::load_from_file("certs_pem/guestserver.pem").unwrap();
let broker_root = BrokerRootCerts::load_from_file("certs_pem/broker_root_ca_cert.pem").unwrap();
// Load TLS Config
let guest_cert_chain = guestserver_ident.fullchain();
let tlsconfig = ClientConfig::builder()
.with_root_certificates(broker_root_cert_store.clone())
.with_root_certificates(broker_root.to_rootcertstore())
// .with_no_client_auth();
.with_client_auth_cert(vec![guestserver_cert, root_guestserver_cert], guestserver_prkey.into())
.with_client_auth_cert(guest_cert_chain, guestserver_ident.clone_key().into())
.unwrap();
let connector = TlsConnector::from(Arc::new(tlsconfig));
let dnsname = ServerName::try_from("localhost").unwrap();
@@ -114,29 +112,26 @@ async fn main() -> std::io::Result<()> {
let stream = connector.connect(dnsname, stream).await?;
let mut transport = Framed::new(stream, LengthDelimitedCodec::new());
let msg = FromGuestServerMessage::Announce { name: "cicciopizza".into() };
let msg = FromGuestServerMessage::Announce { name: my_name.into() };
transport.send(rmp_serde::to_vec(&msg).unwrap().into()).await.unwrap();
let mut myserver_cert: Option<CertificateDer> = None;
let mut myserver_prkey: Option<PrivatePkcs8KeyDer> = None;
// TODO: Remove this two mutable option
let mut myserver_leaf: Option<LeafCertPair> = None;
match transport.next().await {
None => {
panic!("None in the transport");
}
Some(item) => match item {
Ok(buf) => {
use libbonknet::servermsg::{okannounce_to_cert, ToGuestServerMessage};
use libbonknet::servermsg::ToGuestServerMessage::*;
let msg: ToGuestServerMessage = rmp_serde::from_slice(&buf).unwrap();
info!("{:?}", msg);
match msg {
OkAnnounce { server_cert, server_prkey } => {
OkAnnounce(payload) => {
info!("Ok Announce");
let (server_cert, server_prkey) = okannounce_to_cert(server_cert, server_prkey);
myserver_cert = Some(server_cert);
myserver_prkey = Some(server_prkey);
myserver_leaf = Some(payload.parse());
}
FailedNameAlreadyOccupied => {
error!("Failed Announce");
error!("Failed Announce, name already occupied");
return Ok(());
}
}
@@ -147,10 +142,10 @@ async fn main() -> std::io::Result<()> {
}
}
transport.close().await.unwrap();
if let (Some(server_cert), Some(server_prkey)) = (myserver_cert, myserver_prkey) {
if let Some(server_leaf) = myserver_leaf {
let tlsconfig = Arc::new(ClientConfig::builder()
.with_root_certificates(broker_root_cert_store)
.with_client_auth_cert(vec![server_cert, root_server_cert], server_prkey.into())
.with_root_certificates(broker_root.to_rootcertstore())
.with_client_auth_cert(server_leaf.fullchain(), server_leaf.clone_key().into())
.unwrap());
let connector = TlsConnector::from(Arc::clone(&tlsconfig));
let dnsname = ServerName::try_from("localhost").unwrap();

227
bonknet_server/src/main.rs Normal file
View File

@@ -0,0 +1,227 @@
mod transportstream;
use std::path::Path;
use std::sync::Arc;
use tokio::net::TcpStream;
use tokio_rustls::rustls::{ClientConfig};
use libbonknet::*;
use libbonknet::servermsg::*;
use libbonknet::cert::*;
use uuid::Uuid;
use tracing::{error, info};
use crate::transportstream::*;
#[derive(Clone)]
struct ServerContext<'a> {
identity: LeafCertPair<'a>,
broker_root: BrokerRootCerts<'a>,
my_name: String,
}
impl ServerContext<'_> {
fn tlsconfig(&self) -> ClientConfig {
self.identity.to_tlsclientconfig(&self.broker_root)
}
}
async fn subscribe(ctx: &ServerContext<'_>) -> Result<(), TransportError> {
use ToServerConnTypeReply::*;
let tlsconfig = Arc::new(ctx.tlsconfig());
let mut transport = TransportStream::new(Arc::clone(&tlsconfig)).await?;
let msg = FromServerConnTypeMessage::Subscribe;
match transport.send_and_listen::<_, ToServerConnTypeReply>(&msg).await? {
OkSubscribe => {
info!("Stream set in Subscribe mode");
}
GenericFailure => {
panic!("Generic Failure during SendCommand");
}
others => {
panic!("Unexpected Message type: {:?}", others);
}
}
loop {
use ToServerMessage::*;
let out: Option<FromServerReply>;
match transport.listen_one::<ToServerMessage>().await? {
Msg { reply_id, body } => {
use libbonknet::servermsg::ToServerMessageBody::*;
match body {
Request { conn_id } => {
info!("I'm required with Connection ID {}", conn_id);
out = Some(FromServerReply::Msg {
reply_id,
body: FromServerReplyBody::RequestAccepted,
});
// TODO: Spawn Datastream
tokio::spawn(datastream(ctx.tlsconfig(), conn_id));
}
}
}
Ping => {
info!("Ping!");
out = Some(FromServerReply::Pong);
}
}
if let Some(msg) = out {
transport.send(&msg).await?;
}
}
}
async fn datastream(tlsconfig: ClientConfig, conn_id: Uuid) -> Result<(), TransportError> {
use TransportError::StreamError;
use ToPeerDataStream::*;
let mut transport = TransportStream::new(Arc::new(tlsconfig)).await?;
let msg = FromServerConnTypeMessage::OpenDataStream(conn_id);
match transport.send_and_listen::<_, ToPeerDataStream>(&msg).await? {
OkDataStreamRequestAccepted => {
info!("Data Stream Accepted. Waiting for Open...");
}
Refused => {
panic!("Refused");
}
other => {
panic!("Unexpected response: {:?}", other);
}
}
match transport.listen_one().await? {
OkDataStreamOpen => {
info!("Data Stream Open!. Connecting Streams.");
}
Revoked => {
panic!("Data Stream Revoked!");
}
Refused => {
panic!("Refused");
}
other => {
panic!("Unexpected response: {:?}", other);
}
}
// Initialize outbound stream
let mut inbound = transport.into_inner().into_inner();
let mut outbound = TcpStream::connect("127.0.0.1:22").await.map_err(StreamError)?;
match tokio::io::copy_bidirectional(&mut inbound, &mut outbound).await {
Ok(bytes_copied) => info!("{bytes_copied:?}"),
Err(e) => error!("Error during copy: {e}"),
}
Ok(())
}
async fn announce<'a>(ctx: &ServerContext<'_>) -> Result<LeafCertPair<'a>, TransportError> {
use ToGuestServerMessage::*;
let mut transport = TransportStream::new(Arc::new(ctx.tlsconfig())).await.unwrap();
let msg = FromGuestServerMessage::Announce { name: ctx.my_name.clone() };
transport.send(&msg).await?;
for i in 0..10 {
match transport.listen_one().await? {
OkAnnounce(payload) => {
info!("Ok Announce");
transport.close().await?;
return Ok(payload.parse());
}
FailedNameAlreadyOccupied => {
let new_name = format!("ERROR_{}_{}", &ctx.my_name, i + 1);
error!("Failed Announce, name already occupied. Using {}", &new_name);
let msg = FromGuestServerMessage::Announce { name: new_name };
transport.send(&msg).await?;
}
}
}
panic!("Out of retry");
}
async fn server_name_confirmation<'a>(ctx: &ServerContext<'_>) -> Result<(), TransportError> {
use ToServerConnTypeReply::*;
use ToServerCommandReply::*;
let mut transport = TransportStream::new(Arc::new(ctx.tlsconfig())).await?;
// Declare Conn Type
let msg = FromServerConnTypeMessage::SendCommand;
match transport.send_and_listen(&msg).await? {
OkSendCommand => {}
e => {
panic!("Error during ConnType Declare: {:?}", e);
}
}
// Ask Name
let msg = FromServerCommandMessage::WhoAmI;
match transport.send_and_listen(&msg).await? {
YouAre { name } => {
if ctx.my_name == name {
return Ok(());
}
}
other => {
panic!("Unexpected response: {:?}", other);
}
}
// If name doesn't correspond, try to ChangeName. 10 retry. If they fail, keep the actual one without panic.
let msg = FromServerCommandMessage::ChangeName { name: ctx.my_name.clone() };
transport.send(&msg).await?;
for i in 0..10 {
match transport.listen_one().await? {
NameChanged => {
return Ok(());
}
NameNotAvailable => {
let msg = FromServerCommandMessage::ChangeName { name: format!("ERROR_{}_{}", ctx.my_name, i + 1) };
transport.send(&msg).await?;
}
other => {
panic!("Unexpected response: {:?}", other);
}
}
}
panic!("Exhausted Announce Retry");
}
#[tokio::main]
async fn main() -> std::io::Result<()> {
// Tracing Subscriber
let subscriber = tracing_subscriber::FmtSubscriber::new();
tracing::subscriber::set_global_default(subscriber).unwrap();
// Server Name
// TODO: from config using std
let my_name = String::from("cicciopizza");
let serverident_path = Path::new("server/serveridentity.pem"); // "/etc/bonknet/identity.pem"
let guestserverident_path = Path::new("server/guestidentity.pem"); // "/etc/bonknet/guestidentity.pem"
let broker_root_path = Path::new("certs_pem/broker_root_ca_cert.pem"); // "/etc/bonknet/broker_root_ca_cert.pem"
// Load Broker Root file
if !(broker_root_path.try_exists().unwrap() && broker_root_path.is_file()) {
panic!("No Broker Root file");
}
let broker_root = BrokerRootCerts::load_from_file("certs_pem/broker_root_ca_cert.pem").unwrap();
// Load Identity Files (if needed, contact the broker for generation)
let exists_serverident = serverident_path.try_exists().unwrap() && serverident_path.is_file();
let exists_guestident = guestserverident_path.try_exists().unwrap() && guestserverident_path.is_file();
// Do Guest registration and Name confirmation
let ctx = if !exists_serverident && exists_guestident {
info!("No Server Identity. Starting Guest Announce...");
// No Server, Yes Guest -> Use Guest to retrieve Server identity
let guest_ident = LeafCertPair::load_from_file(guestserverident_path).unwrap();
let ctx = ServerContext { identity: guest_ident, broker_root: broker_root.clone(), my_name: my_name.clone() };
let server_ident = announce(&ctx).await.unwrap();
server_ident.save_into_file(serverident_path).unwrap();
ServerContext { identity: server_ident, broker_root: broker_root.clone(), my_name: my_name.clone() }
} else if exists_serverident {
// Yes Server -> Use Server file as identity
let server_ident = LeafCertPair::load_from_file(serverident_path).unwrap();
let ctx = ServerContext { identity: server_ident, broker_root: broker_root.clone(), my_name: my_name.clone() };
server_name_confirmation(&ctx).await.unwrap();
ctx
} else {
// No identity file present
panic!("No Identity file found");
};
// Start Server Main
let ctx = Arc::new(ctx);
loop {
if let Err(e) = subscribe(&ctx).await {
error!("Subscribe Task aborted due to {}", e);
error!("Restoring Subscribe Task...");
}
}
}

View File

@@ -0,0 +1,80 @@
use std::sync::Arc;
use futures::{SinkExt, StreamExt};
use tokio::net::TcpStream;
use tokio_rustls::client::TlsStream;
use tokio_rustls::rustls::pki_types::ServerName;
use tokio_rustls::TlsConnector;
use tokio_util::codec::{Framed, LengthDelimitedCodec};
use serde::{Serialize};
use serde::de::DeserializeOwned;
use thiserror::Error;
use tokio_rustls::rustls::ClientConfig;
#[derive(Error, Debug)]
pub enum TransportError {
#[error("Stream Terminated, next() returned None")]
StreamTerminated,
#[error("Stream Error")]
StreamError(std::io::Error),
#[error("Serialization Error")]
SerializeError(rmp_serde::encode::Error),
#[error("Deserialization Error")]
DeserializeError(rmp_serde::decode::Error),
}
pub struct TransportStream {
transport: Framed<TlsStream<TcpStream>, LengthDelimitedCodec>,
}
impl TransportStream {
pub async fn new(tlsconfig: Arc<ClientConfig>) -> Result<Self, TransportError> {
use TransportError::StreamError;
let connector = TlsConnector::from(tlsconfig);
let dnsname = ServerName::try_from("localhost").unwrap();
let stream = TcpStream::connect("localhost:2541").await.map_err(StreamError)?;
let stream = connector.connect(dnsname, stream).await.map_err(StreamError)?;
let transport = Framed::new(stream, LengthDelimitedCodec::new());
Ok(TransportStream { transport })
}
pub fn into_inner(self) -> Framed<TlsStream<TcpStream>, LengthDelimitedCodec> {
self.transport
}
pub async fn send<T: Serialize>(&mut self, msg: &T) -> Result<(), TransportError> {
use TransportError::*;
self.transport.send(rmp_serde::to_vec(&msg).map_err(SerializeError)?.into()).await.map_err(StreamError)?;
Ok(())
}
pub async fn send_and_listen<T: Serialize, U: DeserializeOwned>(&mut self, msg: &T) -> Result<U, TransportError> {
self.send(msg).await?;
self.listen_one().await
}
pub async fn listen_one<T: DeserializeOwned>(&mut self) -> Result<T, TransportError> {
use TransportError::*;
match self.transport.next().await {
None => {
// Stream Terminated
Err(StreamTerminated)
}
Some(item) => match item {
Ok(buf) => {
let msg: T = rmp_serde::from_slice(&buf).map_err(DeserializeError)?;
Ok(msg)
}
Err(e) => {
// Disconnection
Err(StreamError(e))
}
}
}
}
pub async fn close(mut self) -> Result<(), TransportError> {
self.transport.close().await.map_err(TransportError::StreamError)
}
}

View File

@@ -8,5 +8,8 @@ edition = "2021"
[dependencies]
tokio-rustls = "0.25.0"
rustls-pemfile = "2.0.0"
rcgen = { version = "0.12.1", features = ["x509-parser"] }
x509-parser = "0.16.0"
serde = { version = "1.0", features = ["derive"] }
uuid = { version = "1.7.0", features = ["serde"] }
pem = "3.0.3"

227
libbonknet/src/cert.rs Normal file
View File

@@ -0,0 +1,227 @@
use std::io::{BufReader, Error, ErrorKind, Write};
use rcgen::{Certificate, CertificateParams, DnType, KeyPair};
use rustls_pemfile::{Item, read_all, read_one};
use tokio_rustls::rustls::{ClientConfig, RootCertStore};
use tokio_rustls::rustls::pki_types::{CertificateDer, PrivatePkcs8KeyDer};
use x509_parser::nom::AsBytes;
use pem::{self, Pem};
pub struct RawCertPair {
pub cert: Vec<u8>,
pub prkey: Vec<u8>,
}
// TODO: Eventually consider shifting from CertificateDer to x509_parser::X509Certificate
// for the extra information it can provide runtime
pub struct LeafCertPair<'a> {
cert: CertificateDer<'a>,
ca_chain: Vec<CertificateDer<'a>>,
prkey: PrivatePkcs8KeyDer<'a>,
}
impl Clone for LeafCertPair<'_> {
fn clone(&self) -> Self {
Self {
cert: self.cert.clone(),
ca_chain: self.ca_chain.clone(),
prkey: self.prkey.clone_key(),
}
}
}
impl LeafCertPair<'_> {
pub fn parse<'a>(cert: Vec<u8>, ca_chain: Vec<Vec<u8>>, prkey: Vec<u8>) -> LeafCertPair<'a> {
let cert = CertificateDer::from(cert);
let ca_chain = ca_chain.into_iter().map(CertificateDer::from).collect();
let prkey = PrivatePkcs8KeyDer::from(prkey);
LeafCertPair {
cert,
ca_chain,
prkey,
}
}
pub fn save_into_file<P: AsRef<std::path::Path>>(&self, filename: P) -> std::io::Result<()> {
let mut file = std::fs::File::create(filename)?;
let mut pems = vec![
Pem::new("CERTIFICATE", self.cert.as_bytes())
];
for c in self.ca_chain.iter() {
pems.push(Pem::new("CERTIFICATE", c.as_bytes()));
}
pems.push(Pem::new("PRIVATE KEY", self.prkey.secret_pkcs8_der()));
file.write_all(pem::encode_many(&pems).as_bytes())?;
Ok(())
}
pub fn load_from_file<'a, P: AsRef<std::path::Path>>(filename: P) -> std::io::Result<LeafCertPair<'a>> {
let file = std::fs::File::open(filename).unwrap();
let mut buf = BufReader::new(file);
if let Item::X509Certificate(cert) = read_one(&mut buf).unwrap().unwrap() {
let parsed_cert = x509_parser::parse_x509_certificate(&cert).unwrap().1;
if parsed_cert.is_ca() {
return Err(Error::new(ErrorKind::InvalidInput, "main cert is ca"));
}
let mut ca_chain: Vec<CertificateDer> = Vec::new();
for item in read_all(&mut buf) {
match item {
Ok(Item::X509Certificate(c)) => {
let parsed_cert = x509_parser::parse_x509_certificate(&c).unwrap().1;
if !parsed_cert.is_ca() {
return Err(Error::new(ErrorKind::InvalidInput, "chain cert is not ca"));
}
ca_chain.push(c);
},
Ok(Item::Pkcs8Key(prkey)) => {
return Ok(LeafCertPair { cert, ca_chain, prkey });
}
_ => {
return Err(Error::new(ErrorKind::InvalidInput, "invalid format"));
}
}
}
Err(Error::new(ErrorKind::InvalidInput, "pkcs8key not found"))
} else {
Err(Error::new(ErrorKind::InvalidInput, "no main x509 cert"))
}
}
pub fn cert(&self) -> &CertificateDer {
&self.cert
}
pub fn ca_chain(&self) -> &Vec<CertificateDer> {
&self.ca_chain
}
pub fn prkey(&self) -> &PrivatePkcs8KeyDer {
&self.prkey
}
pub fn clone_key(&self) -> PrivatePkcs8KeyDer<'static> {
self.prkey.clone_key()
}
pub fn fullchain<'a>(&self) -> Vec<CertificateDer<'a>> {
let mut chain: Vec<CertificateDer> = Vec::with_capacity(self.ca_chain.len() + 1);
chain.push(self.cert.clone().into_owned());
let mut ca_chain = self.ca_chain.clone().into_iter().map(|c| c.into_owned()).collect();
chain.append(&mut ca_chain);
chain
}
pub fn to_raw(&self) -> RawCertPair {
let cert = self.cert.to_vec();
let prkey = self.prkey.secret_pkcs8_der().to_vec();
RawCertPair { cert, prkey }
}
pub fn to_tlsclientconfig(&self, broker_root_certs: &BrokerRootCerts) -> ClientConfig {
let broker_root_cert_store = broker_root_certs.to_rootcertstore();
let cert_chain = self.fullchain();
ClientConfig::builder()
.with_root_certificates(broker_root_cert_store)
.with_client_auth_cert(cert_chain, self.prkey.clone_key().into()).expect("Invalid Cert chain")
}
}
pub struct CACertPair<'a> {
cert: CertificateDer<'a>,
ca_chain: Vec<CertificateDer<'a>>,
prkey: PrivatePkcs8KeyDer<'a>,
}
impl CACertPair<'_> {
pub fn load_from_file<'a, P: AsRef<std::path::Path>>(filename: P) -> std::io::Result<CACertPair<'a>> {
let file = std::fs::File::open(filename).unwrap();
let mut buf = BufReader::new(file);
if let Item::X509Certificate(cert) = read_one(&mut buf).unwrap().unwrap() {
let parsed_cert = x509_parser::parse_x509_certificate(&cert).unwrap().1;
if !parsed_cert.is_ca() {
return Err(Error::new(ErrorKind::InvalidInput, "cert is not ca"));
}
// TODO: Implement ca_chain reading (for now it's unused)
if let Item::Pkcs8Key(prkey) = read_one(&mut buf).unwrap().unwrap() {
Ok(CACertPair { cert, ca_chain: vec![], prkey })
} else {
Err(Error::new(ErrorKind::InvalidInput, "no ca pkcs8key"))
}
} else {
Err(Error::new(ErrorKind::InvalidInput, "no ca x509 cert"))
}
}
fn rcgen_keypair(&self) -> KeyPair {
KeyPair::from_der(self.prkey.secret_pkcs8_der()).unwrap()
}
fn rcgen_certificate(&self) -> Certificate {
Certificate::from_params(CertificateParams::from_ca_cert_der(
&self.cert,
self.rcgen_keypair()
).unwrap()).unwrap()
}
pub fn sign_new_cert(&self, params: CertificateParams) -> LeafCertPair {
let root_cert = self.rcgen_certificate();
let certificate = Certificate::from_params(params).unwrap();
let b_cert = certificate.serialize_der_with_signer(&root_cert).unwrap();
let b_prkey = certificate.serialize_private_key_der();
let cert = CertificateDer::from(b_cert);
let prkey = PrivatePkcs8KeyDer::from(b_prkey);
let mut ca_chain = Vec::with_capacity(self.ca_chain.len() + 1);
ca_chain.push(self.cert.clone());
ca_chain.extend(self.ca_chain.iter().cloned());
LeafCertPair { cert, ca_chain, prkey }
}
pub fn cert(&self) -> &CertificateDer {
&self.cert
}
pub fn clone_key(&self) -> PrivatePkcs8KeyDer {
self.prkey.clone_key()
}
pub fn to_raw(&self) -> RawCertPair {
let cert = self.cert.to_vec();
let prkey = self.prkey.secret_pkcs8_der().to_vec();
RawCertPair { cert, prkey }
}
}
pub fn server_leaf_certparams(name: &str) -> CertificateParams {
let mut params = CertificateParams::new(vec!["entity.other.host".into(), format!("bonk.server.{name}")]);
params.distinguished_name.push(DnType::CommonName, name);
params.use_authority_key_identifier_extension = true;
params.key_usages.push(rcgen::KeyUsagePurpose::DigitalSignature);
params.extended_key_usages.push(rcgen::ExtendedKeyUsagePurpose::ClientAuth);
params
}
#[derive(Clone)]
pub struct BrokerRootCerts<'a> {
root_cert: CertificateDer<'a>
}
impl BrokerRootCerts<'_> {
pub fn load_from_file<'a, P: AsRef<std::path::Path>>(filename: P) -> std::io::Result<BrokerRootCerts<'a>> {
let file = std::fs::File::open(filename).unwrap();
let mut buf = BufReader::new(file);
if let Item::X509Certificate(root_cert) = read_one(&mut buf).unwrap().unwrap() {
Ok(BrokerRootCerts { root_cert })
} else {
Err(Error::new(ErrorKind::InvalidInput, "no broker root x509 cert"))
}
}
pub fn to_rootcertstore(&self) -> RootCertStore {
let mut broker_root_cert_store = RootCertStore::empty();
broker_root_cert_store.add(self.root_cert.clone()).expect("Invalid Broker Root");
broker_root_cert_store
}
pub fn certs(&self) -> Vec<&CertificateDer> {
vec![&self.root_cert]
}
}

View File

@@ -1,5 +1,6 @@
pub mod servermsg;
pub mod clientmsg;
pub mod cert;
use std::io::{BufReader, Error, ErrorKind};
use rustls_pemfile::{Item, read_one};
@@ -28,6 +29,7 @@ pub fn load_prkey(filename: &str) -> std::io::Result<PrivatePkcs8KeyDer> {
}
}
// TODO: move all this inside Client and Server, for example using a DataStreamCmd(ToPeerDataStream)
#[derive(Debug, Serialize, Deserialize)]
pub enum ToPeerDataStream {
// You are now a DataStream, wait the Open message

View File

@@ -3,6 +3,7 @@ pub use crate::ToPeerDataStream;
use tokio_rustls::rustls::pki_types::{CertificateDer, PrivatePkcs8KeyDer};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use crate::cert::LeafCertPair;
#[derive(Debug, Serialize, Deserialize)]
pub enum FromServerConnTypeMessage {
@@ -77,17 +78,31 @@ pub fn okannounce_to_cert<'a>(server_cert: Vec<u8>, server_prkey: Vec<u8>) -> (C
(server_cert, server_prkey)
}
#[derive(Debug, Serialize, Deserialize)]
pub struct OkAnnoucePayload {
server_cert: Vec<u8>,
ca_chain: Vec<Vec<u8>>,
server_prkey: Vec<u8>,
}
impl OkAnnoucePayload {
pub fn parse<'a>(self) -> LeafCertPair<'a> {
LeafCertPair::parse(self.server_cert, self.ca_chain, self.server_prkey)
}
}
#[derive(Debug, Serialize, Deserialize)]
pub enum ToGuestServerMessage {
OkAnnounce { server_cert: Vec<u8>, server_prkey: Vec<u8> },
OkAnnounce(OkAnnoucePayload),
FailedNameAlreadyOccupied,
}
impl ToGuestServerMessage {
pub fn make_okannounce(server_cert: CertificateDer, server_prkey: PrivatePkcs8KeyDer) -> Self {
ToGuestServerMessage::OkAnnounce {
server_cert: server_cert.to_vec(),
server_prkey: server_prkey.secret_pkcs8_der().to_vec()
}
pub fn make_okannounce(server_leaf: &LeafCertPair) -> Self {
ToGuestServerMessage::OkAnnounce(OkAnnoucePayload {
server_cert: server_leaf.cert().to_vec(),
ca_chain: server_leaf.ca_chain().iter().map(|c| c.to_vec()).collect(),
server_prkey: server_leaf.prkey().secret_pkcs8_der().to_vec(),
})
}
}