Compare commits

..

15 Commits

Author SHA1 Message Date
8ecaf9c993 Use SQLITE3 as Server Certificate Database backend 2024-07-14 17:03:53 +02:00
4212e23680 Add tests to servercertdb 2024-07-14 15:52:58 +02:00
7c1e5122de New docs in bonknet_broker 2024-07-14 14:54:32 +02:00
1e4e4bdb53 Refactor TransportStream in Server 2024-03-22 22:05:33 +01:00
a1b4865b3f Refactor Certificate management 2024-03-18 13:40:34 +01:00
177d472d59 Add first real SSH bridge 2024-03-13 13:04:29 +01:00
4604beed36 Start refactoring for a better code inside the broker 2024-02-26 23:56:13 +01:00
88aeb25fdf Split server and client msgs in libbonknet 2024-02-23 13:40:32 +01:00
0d39ea71c7 Remove some unused code and imports 2024-02-23 13:24:37 +01:00
467e2b6d21 Complete first version of DataStream 2024-02-21 17:54:59 +01:00
83c7a95414 Implement opening of the DataStream. Just the broker copy task/manager is missing 2024-02-21 16:40:49 +01:00
69a37ae89a Change name from AsyncSendMsg to SendMsg and remove old version 2024-02-19 16:54:37 +01:00
37cc133d7f Implement the skeleton for the ServerManager and the spawn of the connection_ids 2024-02-19 14:24:26 +01:00
f8feb9db81 Implement the skeleton for the Server Session handling 2024-02-15 18:01:47 +01:00
37c76aba22 Solved conundrum about Server separation of Subscribe and SendCommand streams 2024-02-14 15:28:08 +01:00
23 changed files with 2558 additions and 1734 deletions

2
.gitignore vendored
View File

@@ -1,3 +1,5 @@
/target
# Experiments with certificates
certs
certs_pem
/server

1365
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -13,15 +13,26 @@ actix-rt = "2.9.0"
actix-server = "2.3.0"
actix-service = "2.0.2"
actix-tls = { version = "3.3.0", features = ["rustls-0_22"] }
tokio = { version = "1", features = ["io-util", "sync", "time", "macros"] }
rustls = "0.22.2"
tracing = "0.1"
tracing-subscriber = "0.3"
futures = "0.3"
thiserror = "1.0.56"
tokio-util = { version = "0.7.10", features = ["codec"] }
serde = "1"
rmp-serde = "1.1.2"
rcgen = { version = "0.12.1", features = ["x509-parser"] }
rand = "0.8.5"
uuid = { version = "1.7.0", features = ["v4", "serde"] }
rustls-pemfile = "2.0.0"
x509-parser = "0.16.0"
rusqlite = { version = "0.31.0", features = ["bundled"] }
[[bin]]
name = "init_certs"
path = "src/bin/init_certs.rs"
[[bin]]
name = "init_certs_2"
path = "src/bin/init_certs_2.rs"

View File

@@ -0,0 +1,201 @@
use std::fs::File;
use std::io::Write;
use rcgen::{self, BasicConstraints, Certificate, CertificateParams, DnType};
fn server_root_cert() -> Certificate {
let subject_alt_names = vec!["hello.world.example".into()];
let mut certparams = CertificateParams::new(subject_alt_names);
certparams.is_ca = rcgen::IsCa::Ca(BasicConstraints::Unconstrained);
let mut distname = rcgen::DistinguishedName::new();
distname.push(DnType::OrganizationName, "Eister Corporation");
distname.push(DnType::CommonName, "Bonknet Server Root Cert CA");
certparams.distinguished_name = distname;
Certificate::from_params(certparams).unwrap()
}
fn server_cert() -> Certificate {
let mut params = CertificateParams::new(vec!["entity.other.host".into(), "bonk.server.1".into()]);
params.distinguished_name.push(DnType::CommonName, "Server 1");
params.use_authority_key_identifier_extension = true;
params.key_usages.push(rcgen::KeyUsagePurpose::DigitalSignature);
params
.extended_key_usages
.push(rcgen::ExtendedKeyUsagePurpose::ClientAuth);
Certificate::from_params(params).unwrap()
}
fn guestserver_root_cert() -> Certificate {
let subject_alt_names = vec!["hello.world.example".into()];
let mut certparams = CertificateParams::new(subject_alt_names);
certparams.is_ca = rcgen::IsCa::Ca(BasicConstraints::Unconstrained);
let mut distname = rcgen::DistinguishedName::new();
distname.push(DnType::OrganizationName, "Eister Corporation");
distname.push(DnType::CommonName, "Bonknet Guest Server Root Cert CA");
certparams.distinguished_name = distname;
Certificate::from_params(certparams).unwrap()
}
fn guestserver_cert() -> Certificate {
let mut params = CertificateParams::new(vec!["entity.other.host".into(), "bonk.guestserver.1".into()]);
params.distinguished_name.push(DnType::CommonName, "Guest Server 1");
params.use_authority_key_identifier_extension = true;
params.key_usages.push(rcgen::KeyUsagePurpose::DigitalSignature);
params
.extended_key_usages
.push(rcgen::ExtendedKeyUsagePurpose::ClientAuth);
Certificate::from_params(params).unwrap()
}
fn client_root_cert() -> Certificate {
let subject_alt_names = vec!["hello.world.example".into()];
let mut certparams = CertificateParams::new(subject_alt_names);
certparams.is_ca = rcgen::IsCa::Ca(BasicConstraints::Unconstrained);
let mut distname = rcgen::DistinguishedName::new();
distname.push(DnType::OrganizationName, "Eister Corporation");
distname.push(DnType::CommonName, "Bonknet Client Root Cert CA");
certparams.distinguished_name = distname;
Certificate::from_params(certparams).unwrap()
}
fn client_cert() -> Certificate {
let mut params = CertificateParams::new(vec!["entity.other.host".into(), "bonk.client.1".into()]);
params.distinguished_name.push(DnType::CommonName, "Client 1");
params.use_authority_key_identifier_extension = true;
params.key_usages.push(rcgen::KeyUsagePurpose::DigitalSignature);
params
.extended_key_usages
.push(rcgen::ExtendedKeyUsagePurpose::ClientAuth);
Certificate::from_params(params).unwrap()
}
fn broker_root_cert() -> Certificate {
let subject_alt_names = vec!["hello.world.example".into()];
let mut certparams = CertificateParams::new(subject_alt_names);
certparams.is_ca = rcgen::IsCa::Ca(BasicConstraints::Unconstrained);
let mut distname = rcgen::DistinguishedName::new();
distname.push(DnType::OrganizationName, "Eister Corporation");
distname.push(DnType::CommonName, "Bonknet Broker Root Cert CA");
certparams.distinguished_name = distname;
Certificate::from_params(certparams).unwrap()
}
fn broker_cert() -> Certificate {
let mut params = CertificateParams::new(vec!["entity.other.host".into(), "localhost".into()]);
params.distinguished_name.push(DnType::CommonName, "localhost");
params.use_authority_key_identifier_extension = true;
params.key_usages.push(rcgen::KeyUsagePurpose::DigitalSignature);
params
.extended_key_usages
.push(rcgen::ExtendedKeyUsagePurpose::ServerAuth);
Certificate::from_params(params).unwrap()
}
fn main() -> std::io::Result<()> {
// Generate Root CA Certificates
let server_root_cert = server_root_cert();
let guestserver_root_cert = guestserver_root_cert();
let client_root_cert = client_root_cert();
let broker_root_cert = broker_root_cert();
// Generate Leafs
let server_leaf_cert = server_cert();
let guestserver_leaf_cert = guestserver_cert();
let client_leaf_cert = client_cert();
let broker_leaf_cert = broker_cert();
// Generate PEMs
// every time you generate one, a new random number is taken, so different cert hashes!
// and we need this PEMs to appear in multiple files
// We don't need this for the pvkey because we generate them only one time for each cert
// IF YOU NEED TO WRITE PVKEY IN MULTIPLE FILES, PLEASE DO IT LIKE THESE LINES FOR THE x509!!!
let server_root_cert_pem = server_root_cert.serialize_pem().unwrap();
let guestserver_root_cert_pem = guestserver_root_cert.serialize_pem().unwrap();
let client_root_cert_pem = client_root_cert.serialize_pem().unwrap();
let broker_root_cert_pem = broker_root_cert.serialize_pem().unwrap();
let server_leaf_cert_pem = server_leaf_cert.serialize_pem_with_signer(&server_root_cert).unwrap();
let guestserver_leaf_cert_pem = guestserver_leaf_cert.serialize_pem_with_signer(&guestserver_root_cert).unwrap();
let client_leaf_cert_pem = client_leaf_cert.serialize_pem_with_signer(&client_root_cert).unwrap();
let broker_leaf_cert_pem = broker_leaf_cert.serialize_pem_with_signer(&broker_root_cert).unwrap();
// Root CA PEMs
/*
1 - CA Cert
2 - CA Prkey
*/
// Generate Server Root CA PEM
{
let mut pemfile = File::create("certs_pem/server_root_ca.pem")?;
pemfile.write_all(server_root_cert_pem.as_bytes())?;
pemfile.write_all(server_root_cert.serialize_private_key_pem().as_bytes())?;
}
// Generate GuestServer Root CA PEM
{
let mut pemfile = File::create("certs_pem/guestserver_root_ca.pem")?;
pemfile.write_all(guestserver_root_cert_pem.as_bytes())?;
pemfile.write_all(guestserver_root_cert.serialize_private_key_pem().as_bytes())?;
}
// Generate Client Root CA PEM
{
let mut pemfile = File::create("certs_pem/client_root_ca.pem")?;
pemfile.write_all(client_root_cert_pem.as_bytes())?;
pemfile.write_all(client_root_cert.serialize_private_key_pem().as_bytes())?;
}
// Generate Broker Root CA PEM
{
let mut pemfile = File::create("certs_pem/broker_root_ca.pem")?;
pemfile.write_all(broker_root_cert_pem.as_bytes())?;
pemfile.write_all(broker_root_cert.serialize_private_key_pem().as_bytes())?;
}
// Generate Broker CA Cert PEM for Server Authentication
{
let mut pemfile = File::create("certs_pem/broker_root_ca_cert.pem")?;
pemfile.write_all(broker_root_cert_pem.as_bytes())?;
}
// Generate Server Leaf PEM
/*
1 - Server Leaf Cert
2 - Server CA Cert chain
3 - Server Leaf Prkey
*/
{
let mut pemfile = File::create("certs_pem/server.pem")?;
pemfile.write_all(server_leaf_cert_pem.as_bytes())?;
pemfile.write_all(server_root_cert_pem.as_bytes())?;
pemfile.write_all(server_leaf_cert.serialize_private_key_pem().as_bytes())?;
}
// Generate GuestServer Leaf PEM
/*
1 - GuestServer Leaf Cert
2 - GuestServer CA Cert chain
3 - GuestServer Leaf Prkey
*/
{
let mut pemfile = File::create("certs_pem/guestserver.pem")?;
pemfile.write_all(guestserver_leaf_cert_pem.as_bytes())?;
pemfile.write_all(guestserver_root_cert_pem.as_bytes())?;
pemfile.write_all(guestserver_leaf_cert.serialize_private_key_pem().as_bytes())?;
}
// Generate Client Leaf PEM
/*
1 - Client Leaf Cert
2 - Client CA Cert chain
3 - Client Leaf Prkey
*/
{
let mut pemfile = File::create("certs_pem/client.pem")?;
pemfile.write_all(client_leaf_cert_pem.as_bytes())?;
pemfile.write_all(client_root_cert_pem.as_bytes())?;
pemfile.write_all(client_leaf_cert.serialize_private_key_pem().as_bytes())?;
}
// Generate Broker Leaf PEM
/*
1 - Broker Leaf Cert
2 - Broker CA Cert
3 - Broker Leaf Prkey
*/
{
let mut pemfile = File::create("certs_pem/broker.pem")?;
pemfile.write_all(broker_leaf_cert_pem.as_bytes())?;
pemfile.write_all(broker_root_cert_pem.as_bytes())?;
pemfile.write_all(broker_leaf_cert.serialize_private_key_pem().as_bytes())?;
}
println!("Certificates created");
Ok(())
}

View File

@@ -0,0 +1,112 @@
use actix::prelude::*;
use uuid::Uuid;
use std::collections::HashMap;
use futures::SinkExt;
use thiserror::Error;
use tracing::{info, error, warn};
use libbonknet::ToPeerDataStream;
use crate::streamutils::*;
#[allow(dead_code)]
#[derive(Error, Debug)]
pub enum DataConnManagerError {
#[error("Generic Failure")]
GenericFailure,
}
/// Ask the DataConnManager to start a new Data Connection with
/// these two TransportStream as ends of the bridge.
/// As internal ID the Client Connection ID is used.
#[derive(Message)]
#[rtype(result = "Result<(),DataConnManagerError>")]
pub struct StartDataBridge {
pub client_conn_id: Uuid,
pub server_transport: TransportStream,
pub client_transport: TransportStream,
}
/// Stop the data bridge using the Client Connection ID as reference to the bridge.
#[derive(Message)]
#[rtype(result = "()")]
pub struct StopDataBridge {
pub client_conn_id: Uuid,
}
type ClientConnId = Uuid;
struct Connection {
proxyhandler: SpawnHandle, // SpawnHandle is an actix Future handler (basically a Task)
}
/// This is the actor that manages the Data Connections.
/// When a new Data Bridge is created, that consist of a Task running indefinitely that
/// redirect information from a Client to a Server and viceversa.
/// Basically the DataBridge is where the SSH information are sent in.
pub struct DataConnManager {
conns: HashMap<ClientConnId, Connection>
}
impl Actor for DataConnManager {
type Context = Context<Self>;
}
impl DataConnManager {
pub fn new() -> Self {
Self { conns: HashMap::new() }
}
}
impl Handler<StopDataBridge> for DataConnManager {
type Result = ();
fn handle(&mut self, msg: StopDataBridge, ctx: &mut Self::Context) -> Self::Result {
match self.conns.remove(&msg.client_conn_id) {
None => warn!("Stopped Data Bridge {} was not in memory", msg.client_conn_id),
Some(conn) => {
if ctx.cancel_future(conn.proxyhandler) {
info!("Stopped Data Bridge {}", msg.client_conn_id);
} else {
info!("Stopped Data Bridge {} was with dead task", msg.client_conn_id);
}
},
}
}
}
impl Handler<StartDataBridge> for DataConnManager {
type Result = Result<(), DataConnManagerError>;
fn handle(&mut self, mut msg: StartDataBridge, ctx: &mut Self::Context) -> Self::Result {
let client_conn_id = msg.client_conn_id;
let handler = ctx.spawn(async move {
// Send to the streams the OK DATA OPEN message
let okmsg = ToPeerDataStream::OkDataStreamOpen;
if let Err(e) = tokio::try_join!(
msg.client_transport.send(rmp_serde::to_vec(&okmsg).unwrap().into()),
msg.server_transport.send(rmp_serde::to_vec(&okmsg).unwrap().into()),
) {
error!("Error during OkDataStreamOpen send: {:?}", e);
// TODO: potrei voler trasformare questa funzione in ResponseActFuture cosi che
// in caso ci sia fallimento su questo send l'errore possa venir riportato direttamente
// al PendingDataConnDb senza bisogno di gestione manuale?
// Da studiare perche non per forza c'e bisogno che il Pending sappia che c'e stato
// fallimento in questa fase.
} else {
let mut client_stream = msg.client_transport.into_inner();
let mut server_stream = msg.server_transport.into_inner();
match tokio::io::copy_bidirectional(&mut client_stream, &mut server_stream).await {
Ok((to_server, to_client)) => info!("DataConn closed with {to_server}B to server and {to_client}B to client"),
Err(e) => error!("Error during DataConn: {e:?}"),
}
}
msg.client_conn_id
}.into_actor(self).map(|res, _a, c| {
c.notify(StopDataBridge { client_conn_id: res });
}));
if let Some(other_conn) = self.conns.insert(client_conn_id, Connection { proxyhandler: handler }) {
ctx.cancel_future(other_conn.proxyhandler);
warn!("During init of Conn {client_conn_id} another connection has been found and is now closed.")
}
Ok(())
}
}

View File

@@ -1,263 +1,150 @@
mod servercertdb;
mod pendingdataconndb;
mod servermanager;
mod dataconnmanager;
mod streamutils;
use servercertdb::*;
use pendingdataconndb::*;
use servermanager::*;
use dataconnmanager::*;
use streamutils::*;
use actix::prelude::*;
use std::collections::HashMap;
use std::sync::Arc;
use libbonknet::{load_cert, load_prkey, FromServerMessage, RequiredReplyValues, FromGuestServerMessage, ToGuestServerMessage};
use libbonknet::servermsg::*;
use libbonknet::clientmsg::*;
use libbonknet::cert::*;
use rustls::{RootCertStore, ServerConfig};
use rustls::server::WebPkiClientVerifier;
use actix_tls::accept::rustls_0_22::{Acceptor as RustlsAcceptor, TlsStream};
use actix_server::Server;
use actix_rt::net::TcpStream;
use actix_service::{ServiceFactoryExt as _};
use futures::{StreamExt, SinkExt};
use thiserror::Error;
use actix_service::ServiceFactoryExt as _;
use futures::{SinkExt, StreamExt};
use tokio_util::codec::{Framed, LengthDelimitedCodec};
use tracing::{info, error};
use rcgen::{Certificate, CertificateParams, DnType, KeyPair};
use tracing::{error, info, warn};
struct ServerCert {
cert: Vec<u8>,
prkey: Vec<u8>,
struct BrokerContext {
broker_leaf: LeafCertPair<'static>,
client_ca: CACertPair<'static>,
server_ca: CACertPair<'static>,
guestserver_ca: CACertPair<'static>,
servercert_db: Addr<ServerCertDB>,
pendingdataconn_manager: Addr<PendingDataConnManager>,
server_manager: Addr<ServerManager>,
}
fn generate_server_cert(root_cert: &Certificate, name: &str) -> ServerCert {
let mut params = CertificateParams::new(vec!["entity.other.host".into(), format!("bonk.server.{name}")]);
params.distinguished_name.push(DnType::CommonName, format!("{name}"));
params.use_authority_key_identifier_extension = true;
params.key_usages.push(rcgen::KeyUsagePurpose::DigitalSignature);
params
.extended_key_usages
.push(rcgen::ExtendedKeyUsagePurpose::ClientAuth);
let certificate = Certificate::from_params(params).unwrap();
let cert = certificate.serialize_der_with_signer(root_cert).unwrap();
let prkey = certificate.serialize_private_key_der();
ServerCert { cert, prkey }
}
#[derive(Error, Debug)]
enum DBError {
#[error("Certificate is already registered with name {0}")]
CertAlreadyRegistered(String),
// #[error("Generic Failure")]
// GenericFailure,
}
#[derive(Message)]
#[rtype(result = "bool")]
struct IsNameRegistered {
name: String,
}
#[derive(Message)]
#[rtype(result = "Result<(), DBError>")]
struct RegisterServer {
cert: Vec<u8>,
name: String,
}
// TODO: Move into Sqlite DB with unique check on col1 and col2!!!! Right now name is not unique
struct ServerCertDB {
db: HashMap<Vec<u8>, String>, // Cert to Name
}
impl Actor for ServerCertDB {
type Context = Context<Self>;
}
impl Handler<RegisterServer> for ServerCertDB {
type Result = Result<(), DBError>;
fn handle(&mut self, msg: RegisterServer, _ctx: &mut Self::Context) -> Self::Result {
match self.db.get(&msg.cert) {
None => {
self.db.insert(msg.cert, msg.name);
Ok(())
}
Some(name) => {
Err(DBError::CertAlreadyRegistered(name.clone()))
}
}
}
}
impl Handler<IsNameRegistered> for ServerCertDB {
type Result = bool;
fn handle(&mut self, msg: IsNameRegistered, _ctx: &mut Self::Context) -> Self::Result {
self.db.values().any(|x| *x == msg.name)
}
}
struct GuestServerConnection {
stream: TlsStream<TcpStream>,
}
impl Actor for GuestServerConnection {
type Context = Context<Self>;
}
struct ServerConnection<T: 'static> {
stream: Framed<TlsStream<TcpStream>, T>,
name: String
}
impl<T> ServerConnection<T> {
fn new(stream: TlsStream<TcpStream>, codec: T) -> Self {
let stream = Framed::new(stream, codec);
ServerConnection {
stream,
name: "Polnareffland1".into(),
}
}
}
impl<T> Actor for ServerConnection<T> {
type Context = Context<Self>;
}
#[actix_rt::main]
async fn main() {
// Tracing Subscriber
let subscriber = tracing_subscriber::FmtSubscriber::new();
tracing::subscriber::set_global_default(subscriber).unwrap();
// BROKER CERTS
let broker_root_cert_der = load_cert("certs/broker_root_cert.pem").unwrap();
let broker_cert_der = load_cert("certs/broker_cert.pem").unwrap();
let broker_prkey_der = load_prkey("certs/broker_key.pem").unwrap();
// SERVER ROOT
let server_root_cert_der = load_cert("certs/server_root_cert.pem").unwrap();
let server_root_prkey_der = load_prkey("certs/server_root_key.pem").unwrap();
// GUESTSERVER ROOT
let guestserver_root_cert_der = load_cert("certs/guestserver_root_cert.pem").unwrap();
// CLIENT ROOT
let client_root_cert_der = load_cert("certs/client_root_cert.pem").unwrap();
// Client Verifier
// Load Identity
let broker_leaf = LeafCertPair::load_from_file("certs_pem/broker.pem").unwrap();
let client_ca = CACertPair::load_from_file("certs_pem/client_root_ca.pem").unwrap();
let server_ca = CACertPair::load_from_file("certs_pem/server_root_ca.pem").unwrap();
let guestserver_ca = CACertPair::load_from_file("certs_pem/guestserver_root_ca.pem").unwrap();
// Load Actors
let servercert_db = ServerCertDB::new("certsdb.sqlite").unwrap().start();
let dataconn_manager = DataConnManager::new().start();
let pendingdataconn_manager = PendingDataConnManager::new(dataconn_manager).start();
let server_manager = ServerManager::new(pendingdataconn_manager.clone()).start();
// Create Context
let ctx = Arc::new(BrokerContext {
broker_leaf,
client_ca,
server_ca,
guestserver_ca,
servercert_db,
pendingdataconn_manager,
server_manager,
});
// Pki Client Verifier
let mut broker_root_store = RootCertStore::empty();
broker_root_store.add(server_root_cert_der.clone()).unwrap();
broker_root_store.add(client_root_cert_der.clone()).unwrap();
broker_root_store.add(guestserver_root_cert_der.clone()).unwrap();
broker_root_store.add(ctx.server_ca.cert().clone()).unwrap();
broker_root_store.add(ctx.client_ca.cert().clone()).unwrap();
broker_root_store.add(ctx.guestserver_ca.cert().clone()).unwrap();
let server_verifier = WebPkiClientVerifier::builder(Arc::new(broker_root_store)).build().unwrap();
// Configure TLS
let server_tlsconfig = ServerConfig::builder()
// .with_no_client_auth()
.with_client_cert_verifier(server_verifier)
.with_single_cert(vec![broker_cert_der.clone(), broker_root_cert_der.clone()], broker_prkey_der.into())
.with_single_cert(ctx.broker_leaf.fullchain(), ctx.broker_leaf.clone_key().into())
.unwrap();
let server_acceptor = RustlsAcceptor::new(server_tlsconfig);
let server_root_cert_der = Arc::new(server_root_cert_der);
let server_root_prkey = KeyPair::from_der(server_root_prkey_der.secret_pkcs8_der()).unwrap();
let client_root_cert_der = Arc::new(client_root_cert_der);
let guestserver_root_cert_der = Arc::new(guestserver_root_cert_der);
let server_root_cert = Arc::new(Certificate::from_params(CertificateParams::from_ca_cert_der(
&*server_root_cert_der,
server_root_prkey
).unwrap()).unwrap());
let server_db_addr = ServerCertDB {
db: HashMap::new(),
}.start();
Server::build()
.bind("server-command", ("localhost", 2541), move || {
let server_root_cert_der = Arc::clone(&server_root_cert_der);
let client_root_cert_der = Arc::clone(&client_root_cert_der);
let guestserver_root_cert_der = Arc::clone(&guestserver_root_cert_der);
let server_root_cert = Arc::clone(&server_root_cert);
let server_db_addr = server_db_addr.clone();
let ctx = Arc::clone(&ctx);
// Set up TLS service factory
server_acceptor
.clone()
.map_err(|err| println!("Rustls error: {:?}", err))
.and_then(move |stream: TlsStream<TcpStream>| {
let server_root_cert_der = Arc::clone(&server_root_cert_der);
let client_root_cert_der = Arc::clone(&client_root_cert_der);
let guestserver_root_cert_der = Arc::clone(&guestserver_root_cert_der);
let server_root_cert = Arc::clone(&server_root_cert);
let server_db_addr = server_db_addr.clone();
let ctx = Arc::clone(&ctx);
async move {
let peer_cert_der = stream.get_ref().1.peer_certificates().unwrap().last().unwrap().clone();
if peer_cert_der == *server_root_cert_der {
let peer_certs = stream.get_ref().1.peer_certificates().unwrap();
let peer_cert_bytes = peer_certs.first().unwrap().to_vec();
let peer_root_cert_der = peer_certs.last().unwrap().clone();
if &peer_root_cert_der == ctx.server_ca.cert() {
info!("Server connection");
let framed = Framed::new(stream, LengthDelimitedCodec::new());
framed.for_each(|item| async move {
match item {
let mut transport = Framed::new(stream, LengthDelimitedCodec::new());
match transport.next().await {
None => {
info!("Connection closed by peer");
}
Some(item) => match item {
Ok(buf) => {
use FromServerMessage::*;
let msg: FromServerMessage = rmp_serde::from_slice(&buf).unwrap();
use FromServerConnTypeMessage::*;
let msg: FromServerConnTypeMessage = rmp_serde::from_slice(&buf).unwrap();
info!("{:?}", msg);
match msg {
RequiredReply(v) => match v {
RequiredReplyValues::Ok => {
info!("Required Reply OK")
}
RequiredReplyValues::GenericFailure { .. } => {
info!("Required Reply Generic Failure")
}
SendCommand => {
info!("SendCommand Stream");
let reply = ToServerConnTypeReply::OkSendCommand;
transport.send(rmp_serde::to_vec(&reply).unwrap().into()).await.unwrap();
server_command_handler(&ctx, transport, peer_cert_bytes).await;
}
ChangeName { name } => {
info!("Requested Change Name to Name {}", name);
Subscribe => {
info!("Subscribe Stream");
let name = match ctx.servercert_db.send(FetchName { cert: peer_cert_bytes }).await.unwrap() {
None => {
error!("Cert has no name assigned!");
let reply = ToServerConnTypeReply::GenericFailure;
transport.send(rmp_serde::to_vec(&reply).unwrap().into()).await.unwrap();
return Ok(());
}
Some(name) => name,
};
let reply = ToServerConnTypeReply::OkSubscribe;
transport.send(rmp_serde::to_vec(&reply).unwrap().into()).await.unwrap();
server_subscribe_handler(&ctx, transport, name).await;
}
WhoAmI => {
info!("Requested WhoAmI");
OpenDataStream(conn_id) => {
info!("OpenDataStream with {:?}", conn_id);
let msg = RegisterStream::server(conn_id, transport);
ctx.pendingdataconn_manager.send(msg).await.unwrap().unwrap();
}
}
},
}
Err(e) => {
info!("Disconnection: {:?}", e);
}
}
}).await;
info!("Disconnection!");
} else if peer_cert_der == *guestserver_root_cert_der {
info!("GuestServer connection");
let server_root_cert = Arc::clone(&server_root_cert);
let codec = LengthDelimitedCodec::new();
let mut transport = Framed::new(stream, codec);
loop {
match transport.next().await {
None => {
info!("Transport returned None");
}
Some(item) => {
match item {
Ok(buf) => {
use FromGuestServerMessage::*;
let msg: FromGuestServerMessage = rmp_serde::from_slice(&buf).unwrap();
info!("{:?}", msg);
match msg {
Announce { name } => {
info!("Announced with name {}", name);
if server_db_addr.send(IsNameRegistered { name: name.clone() }).await.unwrap() {
info!("Name {} already registered!", name);
let reply = ToGuestServerMessage::FailedNameAlreadyOccupied;
transport.send(rmp_serde::to_vec(&reply).unwrap().into()).await.unwrap();
break; // Stop reading
} else {
let cert = generate_server_cert(&server_root_cert, name.as_str());
server_db_addr.send(RegisterServer {
cert: cert.cert.clone(),
name,
}).await.unwrap().unwrap();
let reply = ToGuestServerMessage::OkAnnounce {
server_cert: cert.cert,
server_prkey: cert.prkey
};
transport.send(rmp_serde::to_vec(&reply).unwrap().into()).await.unwrap();
}
}
}
}
Err(e) => {
info!("Disconnection: {:?}", e);
break;
}
}
}
}
}
} else if peer_cert_der == *client_root_cert_der {
info!("Server Task terminated!");
} else if &peer_root_cert_der == ctx.guestserver_ca.cert() {
info!("GuestServer connection");
let codec = LengthDelimitedCodec::new();
let transport = Framed::new(stream, codec);
guestserver_handler(&ctx, transport).await;
} else if &peer_root_cert_der == ctx.client_ca.cert() {
info!("Client connection");
let codec = LengthDelimitedCodec::new();
let transport = Framed::new(stream, codec);
client_handler(&ctx, transport).await;
} else {
error!("Unknown Root Certificate");
}
@@ -270,3 +157,168 @@ async fn main() {
.await
.unwrap();
}
async fn server_subscribe_handler(ctx: &BrokerContext, transport: TransportStream, name: String) {
match ctx.server_manager.send(StartTransporter { name, transport }).await.unwrap() {
Ok(_) => {
info!("Stream sent to the manager");
}
Err(e) => {
error!("Error from manager: {:?}", e);
}
}
}
async fn server_command_handler(ctx: &BrokerContext, mut transport: TransportStream, peer_cert_bytes: Vec<u8>) {
loop {
match transport.next().await {
None => {
info!("Transport returned None");
break;
}
Some(item) => match item {
Ok(buf) => {
use libbonknet::servermsg::FromServerCommandMessage::*;
let msg: FromServerCommandMessage = rmp_serde::from_slice(&buf).unwrap();
info!("{:?}", msg);
match msg {
ChangeName { name } => {
info!("Changing name to {}", name);
match ctx.servercert_db.send(UnregisterServer { cert: peer_cert_bytes.clone() }).await.unwrap() {
None => {
info!("Nothing to unregister");
}
Some(old_name) => {
warn!("Unregistered from old name {}", old_name);
}
}
let reply = match ctx.servercert_db.send(RegisterServer { cert: peer_cert_bytes.clone(), name }).await.unwrap() {
Ok(_) => {
info!("Registered!");
ToServerCommandReply::NameChanged
}
Err(e) => {
error!("Error registering: {:?}", e);
ToServerCommandReply::GenericFailure
}
};
transport.send(rmp_serde::to_vec(&reply).unwrap().into()).await.unwrap();
}
WhoAmI => {
info!("Asked who I am");
let reply = match ctx.servercert_db.send(FetchName { cert: peer_cert_bytes.clone() }).await.unwrap() {
None => {
info!("I'm not registered anymore!? WTF");
ToServerCommandReply::GenericFailure
}
Some(name) => {
info!("I am {}", name);
ToServerCommandReply::YouAre { name }
}
};
transport.send(rmp_serde::to_vec(&reply).unwrap().into()).await.unwrap();
}
}
}
Err(e) => {
info!("Disconnection: {:?}", e);
break;
}
}
}
}
}
async fn guestserver_handler(ctx: &BrokerContext, mut transport: TransportStream) {
loop {
match transport.next().await {
None => {
info!("Transport returned None");
break;
}
Some(item) => {
match item {
Ok(buf) => {
use libbonknet::servermsg::FromGuestServerMessage::*;
let msg: FromGuestServerMessage = rmp_serde::from_slice(&buf).unwrap();
info!("{:?}", msg);
match msg {
Announce { name } => {
info!("Announced with name {}", name);
if ctx.servercert_db.send(IsNameRegistered { name: name.clone() }).await.unwrap() {
info!("Name {} already registered!", name);
let reply = ToGuestServerMessage::FailedNameAlreadyOccupied;
transport.send(rmp_serde::to_vec(&reply).unwrap().into()).await.unwrap();
break; // Stop reading
} else {
let cert = ctx.server_ca.sign_new_cert(server_leaf_certparams(name.as_str()));
ctx.servercert_db.send(RegisterServer {
cert: cert.cert().to_vec(),
name,
}).await.unwrap().unwrap();
let reply = ToGuestServerMessage::make_okannounce(&cert);
transport.send(rmp_serde::to_vec(&reply).unwrap().into()).await.unwrap();
}
}
}
}
Err(e) => {
info!("Disconnection: {:?}", e);
break;
}
}
}
}
}
}
async fn client_handler(ctx: &BrokerContext, mut transport: TransportStream) {
loop {
match transport.next().await {
None => {
info!("Transport returned None");
break;
}
Some(item) => {
match item {
Ok(buf) => {
let msg: FromClientCommand = rmp_serde::from_slice(&buf).unwrap();
info!("{:?}", msg);
match msg {
FromClientCommand::RequestServer { name } => {
info!("REQUESTED SERVER {}", name);
let data = ctx.server_manager.send(RequestServer { name }).await.unwrap();
match data {
Ok(client_conn_id) => {
let reply = ToClientResponse::OkRequest { conn_id: client_conn_id };
transport.send(rmp_serde::to_vec(&reply).unwrap().into()).await.unwrap();
info!("Sent OkRequest");
}
Err(e) => {
error!("Error! {:?}", e);
}
}
}
FromClientCommand::ServerList => {
info!("Requested ServerList");
let data = ctx.server_manager.send(GetServerList {}).await.unwrap();
let reply = ToClientResponse::OkServerList { data };
transport.send(rmp_serde::to_vec(&reply).unwrap().into()).await.unwrap();
}
FromClientCommand::UpgradeToDataStream(conn_id) => {
info!("Upgrade to DataStream with conn_id {:?}", conn_id);
let msg = RegisterStream::client(conn_id, transport);
ctx.pendingdataconn_manager.send(msg).await.unwrap().unwrap();
break;
}
}
}
Err(e) => {
info!("Disconnection: {:?}", e);
break;
}
}
}
}
}
}

View File

@@ -0,0 +1,243 @@
//! Pending Data Connect Database is where we store the Data Connection in creation.
//! Every row contains:
//! - The ServerConnID that the server will use (uuid)
//! - The ClientConnID that the client will use (uuid)
//! - If the Server is already connected and ready, the Server Socket
//! - If the Client is already connected and ready, the Client Socket
//!
//! When a row contains a valid ServerSocket and ClientSocket, then the row is removed and the
//! sockets moved to the Data Connection Manager
use actix::prelude::*;
use futures::SinkExt;
use thiserror::Error;
use tracing::{error, info};
use uuid::Uuid;
use libbonknet::*;
use crate::dataconnmanager::{DataConnManager, StartDataBridge};
use crate::streamutils::*;
#[derive(Error, Debug)]
pub enum PendingDataConnError {
#[error("Generic Failure")]
GenericFailure,
}
#[derive(Message)]
#[rtype(result = "Result<(),PendingDataConnError>")]
pub struct NewPendingConn {
pub server_conn_id: Uuid,
pub client_conn_id: Uuid,
}
#[derive(Debug)]
enum RegisterKind {
Server,
Client
}
#[derive(Message)]
#[rtype(result = "Result<(),PendingDataConnError>")]
pub struct RegisterStream {
kind: RegisterKind,
conn_id: Uuid,
transport: TransportStream,
}
impl RegisterStream {
pub fn server(conn_id: Uuid, transport: TransportStream) -> Self {
RegisterStream { kind: RegisterKind::Server, conn_id, transport }
}
pub fn client(conn_id: Uuid, transport: TransportStream) -> Self {
RegisterStream { kind: RegisterKind::Client, conn_id, transport }
}
}
/// Record with connection ID and TransportStream that can represent a client or a server
struct SideRecord {
conn_id: Uuid,
transport: Option<TransportStream>,
}
/// A DB Record, with a Server and a Client record containind IDs and Transports
struct Record {
server: SideRecord,
client: SideRecord,
}
impl Record {
fn new(server_conn_id: Uuid, client_conn_id: Uuid) -> Self {
let server = SideRecord { conn_id: server_conn_id, transport: None };
let client = SideRecord { conn_id: client_conn_id, transport: None };
Record { server, client }
}
}
// TODO: every 2 minutes verify the Records that have at least one stream invalidated and drop them
/// This contains the database of records of Data Connections in the phase of creation.
/// The approach is that when DataConnection is needed, a new Pending is created together with
/// two IDs to send to the client and server and that they will use to create a new TransportStream
/// that will be then registered to the right Record.
/// When a Record has the two TransportStreams needed, the Record is completed and can be consumed
/// and used to create a DataConnection sending the ownership of the two streams to the
/// Data Connection Manager.
pub struct PendingDataConnManager {
db: Vec<Record>,
dataconn_manager: Addr<DataConnManager>,
}
impl Actor for PendingDataConnManager {
type Context = Context<Self>;
}
impl PendingDataConnManager {
pub fn new(dataconn_manager_addr: Addr<DataConnManager>) -> Self {
PendingDataConnManager { db: Vec::new(), dataconn_manager: dataconn_manager_addr }
}
/// The Row retrieve need to use ServerConnID AND ClientConnID as keys, so the best approach is to
/// use a single Vec and check the two IDs while iterating.
fn retrieve_siderecord(&mut self, kind: &RegisterKind, conn_id: &Uuid) -> Option<&mut SideRecord> {
use RegisterKind::*;
let record = match match kind {
Server => self.db.iter_mut().find(|x| x.server.conn_id == *conn_id),
Client => self.db.iter_mut().find(|x| x.client.conn_id == *conn_id),
} {
None => return None,
Some(item) => item,
};
Some(match kind {
Server => &mut record.server,
Client => &mut record.client,
})
}
}
impl Handler<NewPendingConn> for PendingDataConnManager {
type Result = Result<(), PendingDataConnError>;
fn handle(&mut self, msg: NewPendingConn, _ctx: &mut Self::Context) -> Self::Result {
let server_conn_id = msg.server_conn_id;
let client_conn_id = msg.client_conn_id;
let new_record = Record::new(server_conn_id, client_conn_id);
if self.db.iter().any(|x| {
x.server.conn_id == new_record.server.conn_id || x.client.conn_id == new_record.client.conn_id
}) {
error!("Conflicting UUIDs: server {} - client {}", server_conn_id, client_conn_id);
Err(PendingDataConnError::GenericFailure)
} else {
self.db.push(new_record);
Ok(())
}
}
}
/*
Esegui tutti i test normali in Sync.
Quando devi inviare il Accepted, notificati la cosa come AcceptStream con lo stream in suo possesso
ma stavolta con un ResponseActFuture.
Se c'e un fallimento, sposta il transport in un ctx::spawn che invii un FAILED.
Se tutto OK, checka DI NUOVO tutto, e se i check sono positivi, registra lo stream nell'Actor.
Per gestire lo Spawn di una connection, l'unica risposta e' gestire lo spawn connection come un
Message Handler a sua volta. Quando uno stream completa l'invio del suo Accepted, esso appare nel Record.
Quando il secondo stream arriva e completa il suo accepted, anch'esso viene registrato nel Record, quindi
siamo nella condizione di spawn, perche ci sono entrambi i transport nel Record.
Quindi se alla fine di un check and register ci sono entrambi gli stream, spostali entrambi fuori, droppa
il record e invia i due transport ad un terzo actor, su un altro Arbiter, che esegua il tokio::io::copy e
gestisca le connessioni aperte.
*/
#[derive(Message)]
#[rtype(result = "Result<(),PendingDataConnError>")]
struct TryStartDataStream {
kind: RegisterKind,
conn_id: Uuid,
}
impl Handler<TryStartDataStream> for PendingDataConnManager {
type Result = Result<(), PendingDataConnError>;
fn handle(&mut self, msg: TryStartDataStream, _ctx: &mut Self::Context) -> Self::Result {
use RegisterKind::*;
let idx = match msg.kind {
Server => self.db.iter().enumerate().find(|(_i, x)| x.server.conn_id == msg.conn_id),
Client => self.db.iter().enumerate().find(|(_i, x)| x.client.conn_id == msg.conn_id),
};
if let Some((idx, record)) = idx {
if record.client.transport.is_some() && record.server.transport.is_some() {
info!("Requesting Data Bridge for client_conn_id {}", record.client.conn_id);
// We can drop record and use idx to remove the record itself from the vector
// and then send it to another manager for the spawn of the real connection
// This remove is necessary to drop the &Record and take full ownership of it.
let real_record = self.db.remove(idx); // Safety: We are sure this idx is valid
let msg = StartDataBridge {
client_conn_id: real_record.client.conn_id,
server_transport: real_record.server.transport.unwrap(), // Safety: We are sure this is Some
client_transport: real_record.client.transport.unwrap(), // Safety: We are sure this is Some
};
self.dataconn_manager.do_send(msg);
}
Ok(())
} else {
Err(PendingDataConnError::GenericFailure)
}
}
}
impl Handler<RegisterStream> for PendingDataConnManager {
type Result = ResponseActFuture<Self, Result<(), PendingDataConnError>>;
fn handle(&mut self, msg: RegisterStream, _ctx: &mut Self::Context) -> Self::Result {
let side_record = match self.retrieve_siderecord(&msg.kind, &msg.conn_id) {
None => {
error!("Found no connection with {:?} conn_id {:?}", msg.kind, msg.conn_id);
return Box::pin(fut::err(PendingDataConnError::GenericFailure));
}
Some(item) => item,
};
if side_record.transport.is_some() {
// TODO: It can be good to check if the connection is still open, if not, drop and use the new one.
error!("Connection already with a socket!");
Box::pin(fut::err(PendingDataConnError::GenericFailure))
} else {
// This Fut will send the Accepted and only then, register the transport stream
// in the Manager. If during the registration there are all the transport in places,
// you can start the datapiping
Box::pin(async move {
let mut transport = msg.transport;
let reply = ToPeerDataStream::OkDataStreamRequestAccepted;
let res = transport.send(rmp_serde::to_vec(&reply).unwrap().into()).await;
(transport, res)
}.into_actor(self).map(move |(transport, res), a, c| {
match res {
Ok(_) => {
// TODO: to not do the check twice I can put a "lock variable" inside the record
// to prevent the put in Accept of another stream while we are waiting to send the
// accept message
let side_record = match a.retrieve_siderecord(&msg.kind, &msg.conn_id) {
None => {
error!("Found no connection with {:?} conn_id {:?}", msg.kind, msg.conn_id);
return Err(PendingDataConnError::GenericFailure);
}
Some(item) => item,
};
if side_record.transport.is_some() {
// TODO: It can be good to check if the connection is still open, if not, drop and use the new one.
error!("Connection already with a socket!");
return Err(PendingDataConnError::GenericFailure);
}
side_record.transport = Some(transport);
c.notify(TryStartDataStream { kind: msg.kind, conn_id: msg.conn_id });
Ok(())
}
Err(e) => {
error!("Error during OkDataStreamRequestAccepted sending: {:?}", e);
Err(PendingDataConnError::GenericFailure)
}
}
}))
}
}
}

View File

@@ -0,0 +1,213 @@
use std::path::Path;
use actix::prelude::*;
use rusqlite::{Connection};
use thiserror::Error;
#[derive(Error, Debug)]
pub enum DBError {
#[error("Certificate is already registered with name {0}")]
CertAlreadyRegistered(String),
#[error("Generic Failure")]
GenericFailure,
}
#[derive(Message)]
#[rtype(result = "bool")]
pub struct IsNameRegistered {
pub name: String,
}
#[derive(Message)]
#[rtype(result = "Result<(), DBError>")]
pub struct RegisterServer {
pub cert: Vec<u8>,
pub name: String,
}
#[derive(Message)]
#[rtype(result = "Option<String>")] // None if nothing to unregister, Some if unregistered
pub struct UnregisterServer {
pub cert: Vec<u8>,
}
#[derive(Message)]
#[rtype(result = "Option<String>")]
pub struct FetchName {
pub cert: Vec<u8>,
}
fn init_db(conn: Connection) -> rusqlite::Result<Connection> {
conn.execute(
"CREATE TABLE IF NOT EXISTS servercert (
cert BLOB PRIMARY KEY,
name TEXT NOT NULL
)",
(), // empty list of parameters.
)?;
Ok(conn)
}
// TODO: Right now name is not unique. Consider making it unique and checking it for duplication
pub struct ServerCertDB {
conn: Connection,
}
impl ServerCertDB {
fn new_in_memory() -> rusqlite::Result<Self> {
Ok(ServerCertDB { conn: init_db(Connection::open_in_memory()?)? })
}
pub fn new<P: AsRef<Path>>(path: P) -> rusqlite::Result<Self> {
Ok(ServerCertDB { conn: init_db(Connection::open(path)?)? })
}
}
impl Actor for ServerCertDB {
type Context = Context<Self>;
}
impl Handler<RegisterServer> for ServerCertDB {
type Result = Result<(), DBError>;
fn handle(&mut self, msg: RegisterServer, _ctx: &mut Self::Context) -> Self::Result {
match self.conn.query_row(
"SELECT name FROM servercert WHERE cert = ?1",
(&msg.cert,),
|row| row.get::<_, String>(0)
) {
Ok(name) => {
Err(DBError::CertAlreadyRegistered(name))
}
Err(rusqlite::Error::QueryReturnedNoRows) => {
self.conn.execute(
"INSERT INTO servercert (cert, name) VALUES (?1, ?2)",
(&msg.cert, &msg.name)
).map_err(|_| DBError::GenericFailure)?;
Ok(())
}
Err(_) => {
Err(DBError::GenericFailure)
}
}
}
}
impl Handler<IsNameRegistered> for ServerCertDB {
type Result = bool;
fn handle(&mut self, msg: IsNameRegistered, _ctx: &mut Self::Context) -> Self::Result {
let count: u64 = self.conn.query_row(
"SELECT COUNT(*) FROM servercert WHERE name = ?1",
(&msg.name,),
|row| row.get(0)
).unwrap();
count > 0
}
}
impl Handler<FetchName> for ServerCertDB {
type Result = Option<String>;
fn handle(&mut self, msg: FetchName, _ctx: &mut Self::Context) -> Self::Result {
match self.conn.query_row(
"SELECT name FROM servercert WHERE cert = ?1",
(&msg.cert,),
|row| row.get(0)
) {
Ok(name) => {
Some(name)
}
Err(rusqlite::Error::QueryReturnedNoRows) => {
None
}
Err(e) => {
panic!("Error during FetchName: {}", e);
}
}
}
}
impl Handler<UnregisterServer> for ServerCertDB {
type Result = Option<String>;
fn handle(&mut self, msg: UnregisterServer, _ctx: &mut Self::Context) -> Self::Result {
match self.conn.query_row(
"SELECT name FROM servercert WHERE cert = ?1",
(&msg.cert,),
|row| row.get::<_, String>(0)
) {
Ok(name) => {
self.conn.execute(
"DELETE FROM servercert WHERE cert = ?1",
(&msg.cert,)
).unwrap();
Some(name)
}
Err(rusqlite::Error::QueryReturnedNoRows) => {
None
}
Err(e) => {
panic!("Error during UnregisterServer: {}", e);
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[actix::test]
async fn emptydb_isnameregistered() {
let servercert_db = ServerCertDB::new_in_memory().unwrap().start();
assert!(!servercert_db.send(IsNameRegistered { name: "test".into() }).await.unwrap());
}
#[actix::test]
async fn emptyvec() {
let servercert_db = ServerCertDB::new_in_memory().unwrap().start();
assert!(servercert_db.send(RegisterServer { cert: vec![], name: "test".into() }).await.unwrap().is_ok());
assert!(servercert_db.send(IsNameRegistered { name: "test".into() }).await.unwrap());
assert_eq!(servercert_db.send(FetchName { cert: vec![] }).await.unwrap().unwrap(), "test");
assert_eq!(servercert_db.send(UnregisterServer { cert: vec![] }).await.unwrap().unwrap(), "test");
assert!(!servercert_db.send(IsNameRegistered { name: "test".into() }).await.unwrap());
assert!(servercert_db.send(FetchName { cert: vec![] }).await.unwrap().is_none());
assert!(servercert_db.send(UnregisterServer { cert: vec![] }).await.unwrap().is_none());
}
#[actix::test]
async fn normalcert() {
let servercert_db = ServerCertDB::new_in_memory().unwrap().start();
let cert = vec![112, 111, 114, 99, 111, 100, 105, 111];
assert!(servercert_db.send(RegisterServer { cert: cert.clone(), name: "test2".into() }).await.unwrap().is_ok());
assert!(servercert_db.send(IsNameRegistered { name: "test2".into() }).await.unwrap());
assert_eq!(servercert_db.send(FetchName { cert: cert.clone() }).await.unwrap().unwrap(), "test2");
assert_eq!(servercert_db.send(UnregisterServer { cert: cert.clone() }).await.unwrap().unwrap(), "test2");
assert!(!servercert_db.send(IsNameRegistered { name: "test2".into() }).await.unwrap());
assert!(servercert_db.send(FetchName { cert: cert.clone() }).await.unwrap().is_none());
assert!(servercert_db.send(UnregisterServer { cert: cert.clone() }).await.unwrap().is_none());
}
#[actix::test]
async fn cert2_remains_after_delete_cert1() {
let servercert_db = ServerCertDB::new_in_memory().unwrap().start();
let cert1 = vec![112, 111, 114, 99, 111, 100, 105, 111];
let cert2 = vec![67, 65, 78, 68, 69, 68, 73, 79];
assert!(servercert_db.send(RegisterServer { cert: cert1.clone(), name: "test3".into() }).await.unwrap().is_ok());
assert!(servercert_db.send(IsNameRegistered { name: "test3".into() }).await.unwrap());
assert_eq!(servercert_db.send(FetchName { cert: cert1.clone() }).await.unwrap().unwrap(), "test3");
assert!(servercert_db.send(RegisterServer { cert: cert2.clone(), name: "test4".into() }).await.unwrap().is_ok());
assert!(servercert_db.send(IsNameRegistered { name: "test4".into() }).await.unwrap());
assert_eq!(servercert_db.send(FetchName { cert: cert2.clone() }).await.unwrap().unwrap(), "test4");
assert_eq!(servercert_db.send(UnregisterServer { cert: cert1.clone() }).await.unwrap().unwrap(), "test3");
assert!(!servercert_db.send(IsNameRegistered { name: "test3".into() }).await.unwrap());
assert!(servercert_db.send(FetchName { cert: cert1.clone() }).await.unwrap().is_none());
assert!(servercert_db.send(UnregisterServer { cert: cert1.clone() }).await.unwrap().is_none());
assert!(servercert_db.send(RegisterServer { cert: cert2.clone(), name: "test4".into() }).await.unwrap().is_err());
assert!(servercert_db.send(IsNameRegistered { name: "test4".into() }).await.unwrap());
assert_eq!(servercert_db.send(FetchName { cert: cert2.clone() }).await.unwrap().unwrap(), "test4");
}
}

View File

@@ -0,0 +1,290 @@
use std::collections::HashMap;
use std::io::Error;
use std::sync::Arc;
use std::time::{Duration, Instant};
use actix::prelude::*;
use rand::random;
use thiserror::Error;
use futures::SinkExt;
use tokio::sync::{Mutex, oneshot};
use tokio_util::bytes::{Bytes, BytesMut};
use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec};
use tracing::{debug, error, info};
use crate::streamutils::*;
use uuid::Uuid;
use libbonknet::servermsg::*;
use crate::pendingdataconndb::*;
#[derive(Error, Debug)]
pub enum SendMsgError {
#[error("Generic Failure")]
GenericFailure,
}
#[derive(Message)]
#[rtype(result = "Result<FromServerReplyBody, SendMsgError>")]
struct SendMsg {
msg: ToServerMessageBody,
}
struct ServerTransporter {
rx: Option<TransportStreamRx>,
tx: Arc<Mutex<TransportStreamTx>>,
timeout: Option<SpawnHandle>,
reply_channels: HashMap<u64, oneshot::Sender<FromServerReplyBody>>,
}
impl ServerTransporter {
fn new(transport: TransportStream) -> Self {
let internal = transport.into_inner();
let (srx, stx) = tokio::io::split(internal);
let codec = LengthDelimitedCodec::new();
let rx = FramedRead::new(srx, codec.clone());
let tx = FramedWrite::new(stx, codec.clone());
ServerTransporter {
rx: Some(rx),
tx: Arc::new(Mutex::new(tx)),
timeout: None,
reply_channels: HashMap::new(),
}
}
fn actor_close(&mut self, ctx: &mut Context<Self>) {
ctx.stop();
}
}
impl Actor for ServerTransporter {
type Context = Context<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
// Register Read Stream
let rx = self.rx.take().expect("Rx Stream not found");
ctx.add_stream(rx);
// Register Timeout task
self.timeout = Some(ctx.run_interval(Duration::from_secs(120), |s, c| {
s.actor_close(c)
}));
// Register Send Ping Task
ctx.run_interval(Duration::from_secs(60), |s, c| {
let msg = ToServerMessage::Ping;
let payload: Bytes = rmp_serde::to_vec(&msg).unwrap().into();
let arc_tx = Arc::clone(&s.tx);
c.spawn(async move {
arc_tx.lock().await.send(payload).await
}.into_actor(s).map(|res, _a, ctx| {
match res {
Ok(_) => {
info!("Ping sent!");
}
Err(_) => {
ctx.stop();
}
}
}));
});
}
}
impl Handler<SendMsg> for ServerTransporter {
type Result = ResponseFuture<Result<FromServerReplyBody, SendMsgError>>;
fn handle(&mut self, msg: SendMsg, _ctx: &mut Self::Context) -> Self::Result {
let (reply_channel_tx, reply_channel_rx) = oneshot::channel();
let mut reply_id: u64;
if self.reply_channels.len() == u64::MAX as usize {
return Box::pin(fut::ready(Err(SendMsgError::GenericFailure)));
}
loop {
reply_id = random();
if !self.reply_channels.contains_key(&reply_id) {
break;
}
}
self.reply_channels.insert(reply_id, reply_channel_tx);
let msg = ToServerMessage::Msg {
reply_id,
body: msg.msg,
};
let payload: Bytes = rmp_serde::to_vec(&msg).unwrap().into();
let arc_tx = self.tx.clone();
Box::pin(async move {
arc_tx.lock().await.send(payload).await.map_err(|_e| SendMsgError::GenericFailure)?;
info!("msg sent");
let r = reply_channel_rx.await.unwrap();
info!("reply received");
Ok(r)
})
}
}
impl StreamHandler<Result<BytesMut, Error>> for ServerTransporter {
fn handle(&mut self, item: Result<BytesMut, Error>, ctx: &mut Self::Context) {
match item {
Ok(buf) => {
use libbonknet::servermsg::FromServerReply::*;
let msg: FromServerReply = rmp_serde::from_slice(&buf).unwrap();
match msg {
Pong => {
info!("Pong received");
if let Some(spawn_handle) = self.timeout {
ctx.cancel_future(spawn_handle);
} else {
error!("There were no spawn handle configured!");
}
self.timeout = Some(ctx.run_interval(Duration::from_secs(120), |s, c| {
s.actor_close(c)
}));
}
Msg { reply_id, body } => match self.reply_channels.remove(&reply_id) {
None => {}
Some(reply_tx) => {
if let Err(_e) = reply_tx.send(body) {
error!("Oneshot channel {} got invalidated! No reply sent.", reply_id);
}
}
}
}
}
Err(e) => {
error!("ERROR {:?}", e);
}
}
}
}
#[derive(Error, Debug)]
pub enum ServerManagerError {
#[error("Generic Failure")]
GenericFailure,
}
#[derive(Message)]
#[rtype(result = "Result<(),ServerManagerError>")]
pub struct StartTransporter {
pub name: String,
pub transport: TransportStream,
}
#[derive(Message)]
#[rtype(result = "Vec<String>")]
pub struct GetServerList {}
#[derive(Message)]
#[rtype(result = "Result<Uuid,ServerManagerError>")] // TODO: Return Client ID with struct to give it a name
pub struct RequestServer {
pub name: String
}
pub struct ServerManager {
entries: Arc<Mutex<HashMap<String, Addr<ServerTransporter>>>>,
// Name -> Addr to Actor
pdcdb_addr: Addr<PendingDataConnManager>,
}
impl ServerManager {
pub fn new(pdcdb_addr: Addr<PendingDataConnManager>) -> Self {
ServerManager { entries: Arc::new(Mutex::new(HashMap::new())), pdcdb_addr }
}
}
impl Actor for ServerManager {
type Context = Context<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
// Remove all the ServerTransporters that are not running
// TODO: This is a "Pull" from entries, but we can have the entries that when killed tell
// the Manager making it more reactive!
ctx.run_interval(Duration::from_secs(10), |s, c| {
let start = Instant::now();
let entries = Arc::clone(&s.entries);
c.spawn(async move {
let mut entries_mg = entries.lock().await;
let mut keys_to_delete = vec![];
for (name, st_addr) in entries_mg.iter() {
if !st_addr.connected() {
keys_to_delete.push(name.clone())
}
}
for name in keys_to_delete {
entries_mg.remove(&name);
info!("Closed ServerTransporter {} for actor death", name);
}
debug!("Cleaned ServerManager in {:?}", Instant::now() - start);
}.into_actor(s));
});
}
}
impl Handler<StartTransporter> for ServerManager {
type Result = ResponseFuture<Result<(), ServerManagerError>>;
fn handle(&mut self, msg: StartTransporter, _ctx: &mut Self::Context) -> Self::Result {
let entries = Arc::clone(&self.entries);
Box::pin(async move {
let mut entries_mg = entries.lock().await;
if entries_mg.contains_key(&msg.name) {
error!("A server called {} is already connected!", msg.name);
return Err(ServerManagerError::GenericFailure);
}
let st_addr = ServerTransporter::new(msg.transport).start();
entries_mg.insert(msg.name, st_addr);
Ok(())
})
}
}
impl Handler<GetServerList> for ServerManager {
type Result = ResponseFuture<Vec<String>>;
fn handle(&mut self, _msg: GetServerList, _ctx: &mut Self::Context) -> Self::Result {
let entries = Arc::clone(&self.entries);
Box::pin(async move {
let entries_mg = entries.lock().await;
entries_mg.keys().cloned().collect()
})
}
}
impl Handler<RequestServer> for ServerManager {
type Result = ResponseFuture<Result<Uuid, ServerManagerError>>;
fn handle(&mut self, msg: RequestServer, _ctx: &mut Self::Context) -> Self::Result {
let name = msg.name.clone();
let entries = Arc::clone(&self.entries);
let pdcdb_addr = self.pdcdb_addr.clone();
Box::pin(async move {
let lock = entries.lock().await;
let sh_addr = match lock.get(&name) {
None => {
error!("Requested server {} that isn't registered", name);
return Err(ServerManagerError::GenericFailure);
}
Some(item) => item,
};
let server_conn_id = Uuid::new_v4();
let client_conn_id = Uuid::new_v4();
match pdcdb_addr.send(NewPendingConn { server_conn_id, client_conn_id }).await.unwrap() {
Ok(_) => {
let msg = ToServerMessageBody::Request { conn_id: server_conn_id };
match sh_addr.send(SendMsg { msg }).await.unwrap() {
Ok(reply) => match reply {
FromServerReplyBody::RequestAccepted => {
Ok(client_conn_id)
}
FromServerReplyBody::RequestFailed => {
error!("Request Failed!");
Err(ServerManagerError::GenericFailure)
}
FromServerReplyBody::Pong => unreachable!(),
}
Err(e) => {
panic!("ERROR: {:?}", e);
}
}
}
Err(_e) => Err(ServerManagerError::GenericFailure),
}
})
}
}

View File

@@ -0,0 +1,8 @@
use actix_tls::accept::rustls_0_22::TlsStream;
use tokio::io::{ReadHalf, WriteHalf};
use tokio::net::TcpStream;
use tokio_util::codec::{Framed, FramedRead, FramedWrite, LengthDelimitedCodec};
pub type TransportStream = Framed<TlsStream<TcpStream>, LengthDelimitedCodec>;
pub type TransportStreamTx = FramedWrite<WriteHalf<TlsStream<TcpStream>>, LengthDelimitedCodec>;
pub type TransportStreamRx = FramedRead<ReadHalf<TlsStream<TcpStream>>, LengthDelimitedCodec>;

View File

@@ -2,15 +2,21 @@
name = "bonknet_client"
version = "0.1.0"
edition = "2021"
description = "A CLI Client for the Bonknet system"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
libbonknet = { path = "../libbonknet" }
tokio = { version = "1", features = ["full"] }
tokio-rustls = "0.25.0"
tokio = { version = "1", features = ["full", "tracing"] }
tokio-util = { version = "0.7.10", features = ["codec"] }
futures = "0.3"
rcgen = "0.12.0"
tokio-rustls = "0.25.0"
rustls-pemfile = "2.0.0"
serde = { version = "1.0", features = ["derive"] }
rmp-serde = "1.1.2"
tracing = "0.1"
tracing-subscriber = "0.3"
uuid = { version = "1.7.0", features = ["serde"] }
clap = { version = "4.5.2", features = ["derive"] }
thiserror = "1.0.56"

View File

@@ -1,74 +1,170 @@
use std::io::{Error, ErrorKind};
use std::sync::Arc;
use futures::SinkExt;
use tokio::net::TcpStream;
use std::time::Duration;
use futures::{SinkExt, StreamExt};
use tokio::net::{TcpStream, TcpListener};
use tokio_rustls::rustls::{ClientConfig, RootCertStore};
use tokio_rustls::rustls::pki_types::{ServerName};
use tokio_rustls::rustls::pki_types::ServerName;
use tokio_rustls::TlsConnector;
use tokio_util::codec::{Framed, LengthDelimitedCodec};
use serde::{Serialize, Deserialize};
use libbonknet::{load_cert, load_prkey};
use libbonknet::*;
use libbonknet::clientmsg::*;
use uuid::Uuid;
use tracing::{error, info};
#[derive(Debug, Serialize, Deserialize)]
enum ClientMessage {
Response { status_code: u32, msg: Option<String> },
Announce { name: String },
Required { id: String },
NotRequired { id: String },
async fn datastream(tlsconfig: ClientConfig, conn_id: Uuid) -> std::io::Result<()> {
let connector = TlsConnector::from(Arc::new(tlsconfig.clone()));
let dnsname = ServerName::try_from("localhost").unwrap();
let stream = TcpStream::connect("localhost:2541").await?;
let stream = connector.connect(dnsname, stream).await?;
let mut transport = Framed::new(stream, LengthDelimitedCodec::new());
let msg = FromClientCommand::UpgradeToDataStream(conn_id);
transport.send(rmp_serde::to_vec(&msg).unwrap().into()).await.unwrap();
match transport.next().await {
None => panic!("None in the transport"),
Some(item) => match item {
Ok(buf) => {
use ToPeerDataStream::*;
let msg: ToPeerDataStream = rmp_serde::from_slice(&buf).unwrap();
match msg {
OkDataStreamRequestAccepted => {
info!("Data Stream Accepted. Waiting for Open...");
}
Refused => {
error!("Refused");
return Err(Error::new(ErrorKind::ConnectionRefused, "Refused"));
}
other => {
error!("Unexpected response: {:?}", other);
return Err(Error::new(ErrorKind::ConnectionRefused, "Unexpected response"));
}
}
}
Err(e) => {
error!("Error: {:?}", e);
return Err(e);
}
}
}
match transport.next().await {
None => panic!("None in the transport"),
Some(item) => match item {
Ok(buf) => {
use ToPeerDataStream::*;
let msg: ToPeerDataStream = rmp_serde::from_slice(&buf).unwrap();
match msg {
OkDataStreamOpen => {
info!("Data Stream Open!. Connecting Streams.");
}
Revoked => {
error!("Data Stream Revoked!");
return Err(Error::new(ErrorKind::ConnectionAborted, "Revoked"));
}
Refused => {
error!("Refused");
return Err(Error::new(ErrorKind::ConnectionRefused, "Refused"));
}
other => {
error!("Unexpected response: {:?}", other);
return Err(Error::new(ErrorKind::ConnectionRefused, "Unexpected response"));
}
}
}
Err(e) => {
error!("Error: {:?}", e);
return Err(e);
}
}
}
let mut outbound = transport.into_inner();
let listener = TcpListener::bind("127.0.0.1:9919").await?;
if let Ok((mut inbound, _)) = listener.accept().await {
match tokio::io::copy_bidirectional(&mut inbound, &mut outbound).await {
Ok(bytes_copied) => info!("{bytes_copied:?}"),
Err(e) => error!("Error during copy: {e}"),
}
} else {
error!("Error");
}
Ok(())
}
// TODO: This is an old examples
#[tokio::main]
async fn main() -> std::io::Result<()> {
let client_name = "Polnareffland1";
// Tracing Subscriber
let subscriber = tracing_subscriber::FmtSubscriber::new();
tracing::subscriber::set_global_default(subscriber).unwrap();
// Root certs to verify the server is the right one
let mut server_root_cert_store = RootCertStore::empty();
let server_root_cert_der = load_cert("server_root_cert.pem").unwrap();
server_root_cert_store.add(server_root_cert_der).unwrap();
// Auth Cert to send the server who am I
let root_client_cert = load_cert("client_root_cert.pem").unwrap();
let client_cert = load_cert("client_cert.pem").unwrap();
let client_prkey = load_prkey("client_key.pem").unwrap();
let mut broker_root_cert_store = RootCertStore::empty();
let broker_root_cert_der = load_cert("certs/broker_root_cert.pem").unwrap();
broker_root_cert_store.add(broker_root_cert_der).unwrap();
// Public CA for Clients
let root_client_cert = load_cert("certs/client_root_cert.pem").unwrap();
// My Client Certificate for authentication
let client_cert = load_cert("certs/client_cert.pem").unwrap();
let client_cert_prkey = load_prkey("certs/client_key.pem").unwrap();
// Load TLS Config
let tlsconfig = ClientConfig::builder()
.with_root_certificates(server_root_cert_store)
// .with_no_client_auth();
.with_client_auth_cert(vec![client_cert, root_client_cert], client_prkey.into())
.with_root_certificates(broker_root_cert_store.clone())
.with_client_auth_cert(vec![client_cert, root_client_cert], client_cert_prkey.into())
.unwrap();
let connector = TlsConnector::from(Arc::new(tlsconfig));
let connector = TlsConnector::from(Arc::new(tlsconfig.clone()));
let dnsname = ServerName::try_from("localhost").unwrap();
let stream = TcpStream::connect("localhost:2541").await?;
let stream = connector.connect(dnsname, stream).await?;
let mut transport = Framed::new(stream, LengthDelimitedCodec::new());
let msg1 = ClientMessage::Announce { name: client_name.into() };
transport.send(rmp_serde::to_vec(&msg1).unwrap().into()).await.unwrap();
for i in 0..10 {
let msg = ClientMessage::Response { status_code: 100+i, msg: Some(format!("yay {}", i)) };
transport.send(rmp_serde::to_vec(&msg).unwrap().into()).await.unwrap();
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
let msg = FromClientCommand::ServerList;
transport.send(rmp_serde::to_vec(&msg).unwrap().into()).await.unwrap();
match transport.next().await {
None => panic!("None in the transport"),
Some(item) => match item {
Ok(buf) => {
use libbonknet::clientmsg::ToClientResponse;
use libbonknet::clientmsg::ToClientResponse::*;
let msg: ToClientResponse = rmp_serde::from_slice(&buf).unwrap();
match msg {
OkServerList { data } => info!("{}", data.join("\n")),
GenericError => error!("Generic Error during remote command execution"),
others => {
panic!("Unexpected Message type: {:?}", others);
}
}
}
Err(e) => {
error!("Error: {:?}", e);
}
}
}
tokio::time::sleep(Duration::from_secs(5)).await;
let msg = FromClientCommand::RequestServer { name: "cicciopizza".into() };
transport.send(rmp_serde::to_vec(&msg).unwrap().into()).await.unwrap();
match transport.next().await {
None => panic!("None in the transport"),
Some(item) => match item {
Ok(buf) => {
use libbonknet::clientmsg::ToClientResponse;
use libbonknet::clientmsg::ToClientResponse::*;
let msg: ToClientResponse = rmp_serde::from_slice(&buf).unwrap();
match msg {
OkRequest { conn_id } => {
info!("Received Client Connection ID: {:?}", conn_id);
datastream(tlsconfig, conn_id).await.unwrap();
}
GenericError => error!("Generic Error during remote command execution"),
others => {
panic!("Unexpected Message type: {:?}", others);
}
}
}
Err(e) => {
error!("Error: {:?}", e);
}
}
}
// transport.for_each(|item| async move {
// let a: ClientMessage = rmp_serde::from_slice(&item.unwrap()).unwrap();
// println!("{:?}", a);
// }).await;
// let mut buf = vec![0;1024];
// let (mut rd,mut tx) = split(stream);
//
//
// tokio::spawn(async move {
// let mut stdout = tokio::io::stdout();
// tokio::io::copy(&mut rd, &mut stdout).await.unwrap();
// });
//
// let mut reader = tokio::io::BufReader::new(tokio::io::stdin()).lines();
//
// while let Some(line) = reader.next_line().await.unwrap() {
// tx.write_all(line.as_bytes()).await.unwrap();
// }
Ok(())
}

View File

@@ -0,0 +1,61 @@
use std::sync::Arc;
use tokio::net::TcpStream;
use tokio_rustls::{TlsConnector};
use tokio_rustls::client::TlsStream;
use tokio_rustls::rustls::pki_types::ServerName;
use tokio_util::codec::{Framed, LengthDelimitedCodec};
use futures::{SinkExt, StreamExt};
use thiserror::Error;
use tokio_rustls::rustls::ClientConfig;
use tracing::{error};
use libbonknet::clientmsg::{FromClientCommand, ToClientResponse};
pub type TransportStream = Framed<TlsStream<TcpStream>, LengthDelimitedCodec>;
#[derive(Error, Debug)]
pub enum SendError {
#[error("Failure during serialization of the msg to send")]
MsgSerializationFailure(rmp_serde::encode::Error),
#[error("Failure during the transport send of the data")]
TransportError(std::io::Error),
#[error("Empty buffer during reply wait")]
EmptyBufferError,
#[error("Generic buffer error during response reading")]
GenericBufferError(std::io::Error),
#[error("Failure during deserialization of the reply msg")]
ReplyDeserializationFailure(rmp_serde::decode::Error),
}
pub struct BonkClient {
transport: TransportStream,
}
impl BonkClient {
pub async fn connect(tlsconfig: ClientConfig, dnsname: &str, port: u16) -> tokio::io::Result<BonkClient> {
// Load TLS Config
let connector = TlsConnector::from(Arc::new(tlsconfig));
let stream = TcpStream::connect(format!("{}:{}", dnsname, port)).await?;
let dnsservername = ServerName::try_from(dnsname).unwrap().to_owned();
let stream = connector.connect(dnsservername, stream).await?;
let transport = Framed::new(stream, LengthDelimitedCodec::new());
Ok(BonkClient { transport })
}
// TODO: make this private and make single calls for each Bonk interaction?
// The main idea is that the messages need to be inside the API and not ouside as elements of it
pub async fn send(&mut self, msg: &FromClientCommand) -> Result<ToClientResponse, SendError> {
let bmsg = rmp_serde::to_vec(msg).map_err(SendError::MsgSerializationFailure)?;
self.transport.send(bmsg.into()).await.map_err(SendError::TransportError)?;
match self.transport.next().await {
None => Err(SendError::EmptyBufferError),
Some(item) => match item {
Ok(buf) => {
let reply: ToClientResponse = rmp_serde::from_slice(&buf).map_err(SendError::ReplyDeserializationFailure)?;
Ok(reply)
}
Err(e) => Err(SendError::GenericBufferError(e))
}
}
}
}

View File

@@ -0,0 +1,64 @@
mod client;
use std::path::PathBuf;
use clap::{Parser, Subcommand};
use libbonknet::cert::*;
use tracing::{info};
use libbonknet::clientmsg::FromClientCommand;
use crate::client::BonkClient;
#[derive(Parser, Debug)]
#[command(version, about)]
struct Cli {
#[command(subcommand)]
command: Commands,
}
#[derive(Subcommand, Debug)]
enum Commands {
/// connect via ssh to the given remote_id.
Ssh {
remote_id: String,
},
/// send a file from source to remote. This command uses scp syntax and must contain wildcards to work (see examples).
Scp {
remote_id: String,
source: PathBuf,
dest: PathBuf,
},
/// get a list of all the servers connected to the broker.
Serverlist {
pattern: Option<String>,
},
}
#[tokio::main]
async fn main() -> std::io::Result<()> {
// Tracing Subscriber
let subscriber = tracing_subscriber::FmtSubscriber::new();
tracing::subscriber::set_global_default(subscriber).unwrap();
// CLI parsing
let cli = Cli::parse();
// Load Identity files
let client_ident = LeafCertPair::load_from_file("certs_pem/client.pem").unwrap();
let broker_root = BrokerRootCerts::load_from_file("certs_pem/broker_root_ca_cert.pem").unwrap();
let tlsconfig = client_ident.to_tlsclientconfig(&broker_root);
// Execute command
match &cli.command {
Commands::Ssh { remote_id } => {
info!("Run SSH on {remote_id}");
unimplemented!()
}
Commands::Scp { remote_id, source, dest } => {
info!("Run SCP on {} moving {} to {}", remote_id, source.display(), dest.display());
unimplemented!()
}
Commands::Serverlist { pattern } => {
info!("Run Clientlist with pattern {:?}", pattern);
let mut client = BonkClient::connect(tlsconfig, "localhost", 2541).await.unwrap();
let reply = client.send(&FromClientCommand::ServerList).await.unwrap();
info!("Reply: {:?}", reply);
}
}
Ok(())
}

View File

@@ -2,6 +2,7 @@
name = "bonknet_server"
version = "0.1.0"
edition = "2021"
description = "An automated Server for the Bonknet system"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
@@ -16,3 +17,7 @@ rustls-pemfile = "2.0.0"
rmp-serde = "1.1.2"
tracing = "0.1"
tracing-subscriber = "0.3"
uuid = { version = "1.7.0", features = ["serde"] }
clap = { version = "4.5.2", features = ["derive"] }
thiserror = "1.0.56"
serde = { version = "1.0" }

View File

@@ -1,35 +1,109 @@
use std::io::{Error, ErrorKind};
use std::sync::Arc;
use futures::{StreamExt, SinkExt};
use futures::{SinkExt, StreamExt};
use tokio::net::TcpStream;
use tokio_rustls::rustls::{ClientConfig, RootCertStore};
use tokio_rustls::rustls::pki_types::{ServerName, CertificateDer, PrivatePkcs8KeyDer};
use tokio_rustls::rustls::ClientConfig;
use tokio_rustls::rustls::pki_types::ServerName;
use tokio_rustls::TlsConnector;
use tokio_util::codec::{Framed, LengthDelimitedCodec};
use libbonknet::*;
use tracing::{info, error};
use libbonknet::servermsg::*;
use libbonknet::cert::*;
use uuid::Uuid;
use tracing::{error, info};
async fn datastream(tlsconfig: Arc<ClientConfig>, conn_id: Uuid) -> std::io::Result<()> {
let connector = TlsConnector::from(tlsconfig);
let dnsname = ServerName::try_from("localhost").unwrap();
let stream = TcpStream::connect("localhost:2541").await?;
let stream = connector.connect(dnsname, stream).await?;
let mut transport = Framed::new(stream, LengthDelimitedCodec::new());
let msg = FromServerConnTypeMessage::OpenDataStream(conn_id);
transport.send(rmp_serde::to_vec(&msg).unwrap().into()).await.unwrap();
match transport.next().await {
None => panic!("None in the transport"),
Some(item) => match item {
Ok(buf) => {
use ToPeerDataStream::*;
let msg: ToPeerDataStream = rmp_serde::from_slice(&buf).unwrap();
match msg {
OkDataStreamRequestAccepted => {
info!("Data Stream Accepted. Waiting for Open...");
}
Refused => {
error!("Refused");
return Err(Error::new(ErrorKind::ConnectionRefused, "Refused"));
}
other => {
error!("Unexpected response: {:?}", other);
return Err(Error::new(ErrorKind::ConnectionRefused, "Unexpected response"));
}
}
}
Err(e) => {
error!("Error: {:?}", e);
return Err(e);
}
}
}
match transport.next().await {
None => panic!("None in the transport"),
Some(item) => match item {
Ok(buf) => {
use ToPeerDataStream::*;
let msg: ToPeerDataStream = rmp_serde::from_slice(&buf).unwrap();
match msg {
OkDataStreamOpen => {
info!("Data Stream Open!. Connecting Streams.");
}
Revoked => {
error!("Data Stream Revoked!");
return Err(Error::new(ErrorKind::ConnectionAborted, "Revoked"));
}
Refused => {
error!("Refused");
return Err(Error::new(ErrorKind::ConnectionRefused, "Refused"));
}
other => {
error!("Unexpected response: {:?}", other);
return Err(Error::new(ErrorKind::ConnectionRefused, "Unexpected response"));
}
}
}
Err(e) => {
error!("Error: {:?}", e);
return Err(e);
}
}
}
// Initialize outbound stream
let mut inbound = transport.into_inner();
let mut outbound = TcpStream::connect("127.0.0.1:22").await?;
match tokio::io::copy_bidirectional(&mut inbound, &mut outbound).await {
Ok(bytes_copied) => info!("{bytes_copied:?}"),
Err(e) => error!("Error during copy: {e}"),
}
Ok(())
}
#[tokio::main]
async fn main() -> std::io::Result<()> {
// Tracing Subscriber
let subscriber = tracing_subscriber::FmtSubscriber::new();
tracing::subscriber::set_global_default(subscriber).unwrap();
// Root certs to verify the server is the right one
let mut broker_root_cert_store = RootCertStore::empty();
let broker_root_cert_der = load_cert("certs/broker_root_cert.pem").unwrap();
broker_root_cert_store.add(broker_root_cert_der).unwrap();
// Public CA that will be used to generate the Server certificate
let root_server_cert = load_cert("certs/server_root_cert.pem").unwrap();
// Guest CA
let root_guestserver_cert = load_cert("certs/guestserver_root_cert.pem").unwrap();
// Certificate used to do the first authentication
let guestserver_cert = load_cert("certs/guestserver_cert.pem").unwrap();
let guestserver_prkey = load_prkey("certs/guestserver_key.pem").unwrap();
// Server Name
let my_name = "cicciopizza";
// Load Identity files
let guestserver_ident = LeafCertPair::load_from_file("certs_pem/guestserver.pem").unwrap();
let broker_root = BrokerRootCerts::load_from_file("certs_pem/broker_root_ca_cert.pem").unwrap();
// Load TLS Config
let guest_cert_chain = guestserver_ident.fullchain();
let tlsconfig = ClientConfig::builder()
.with_root_certificates(broker_root_cert_store.clone())
.with_root_certificates(broker_root.to_rootcertstore())
// .with_no_client_auth();
.with_client_auth_cert(vec![guestserver_cert, root_guestserver_cert], guestserver_prkey.into())
.with_client_auth_cert(guest_cert_chain, guestserver_ident.clone_key().into())
.unwrap();
let connector = TlsConnector::from(Arc::new(tlsconfig));
let dnsname = ServerName::try_from("localhost").unwrap();
@@ -38,28 +112,26 @@ async fn main() -> std::io::Result<()> {
let stream = connector.connect(dnsname, stream).await?;
let mut transport = Framed::new(stream, LengthDelimitedCodec::new());
let msg = FromGuestServerMessage::Announce { name: "cicciopizza".into() };
let msg = FromGuestServerMessage::Announce { name: my_name.into() };
transport.send(rmp_serde::to_vec(&msg).unwrap().into()).await.unwrap();
let mut myserver_cert: Option<CertificateDer> = None;
let mut myserver_prkey: Option<PrivatePkcs8KeyDer> = None;
// TODO: Remove this two mutable option
let mut myserver_leaf: Option<LeafCertPair> = None;
match transport.next().await {
None => {
info!("None in the transport.next() ???");
panic!("None in the transport");
}
Some(item) => match item {
Ok(buf) => {
use ToGuestServerMessage::*;
use libbonknet::servermsg::ToGuestServerMessage::*;
let msg: ToGuestServerMessage = rmp_serde::from_slice(&buf).unwrap();
info!("{:?}", msg);
match msg {
OkAnnounce { server_cert, server_prkey } => {
OkAnnounce(payload) => {
info!("Ok Announce");
let (server_cert, server_prkey) = okannounce_to_cert(server_cert, server_prkey);
myserver_cert = Some(server_cert);
myserver_prkey = Some(server_prkey);
myserver_leaf = Some(payload.parse());
}
FailedNameAlreadyOccupied => {
error!("Failed Announce");
error!("Failed Announce, name already occupied");
return Ok(());
}
}
@@ -69,41 +141,159 @@ async fn main() -> std::io::Result<()> {
}
}
}
if let (Some(server_cert), Some(server_prkey)) = (myserver_cert, myserver_prkey) {
let tlsconfig = ClientConfig::builder()
.with_root_certificates(broker_root_cert_store)
.with_client_auth_cert(vec![server_cert, root_server_cert], server_prkey.into())
.unwrap();
let connector = TlsConnector::from(Arc::new(tlsconfig));
transport.close().await.unwrap();
if let Some(server_leaf) = myserver_leaf {
let tlsconfig = Arc::new(ClientConfig::builder()
.with_root_certificates(broker_root.to_rootcertstore())
.with_client_auth_cert(server_leaf.fullchain(), server_leaf.clone_key().into())
.unwrap());
let connector = TlsConnector::from(Arc::clone(&tlsconfig));
let dnsname = ServerName::try_from("localhost").unwrap();
let stream = TcpStream::connect("localhost:2541").await?;
let stream = connector.connect(dnsname, stream).await?;
let transport = Framed::new(stream, LengthDelimitedCodec::new());
transport.for_each(|item| async move {
match item {
let mut transport = Framed::new(stream, LengthDelimitedCodec::new());
let msg = FromServerConnTypeMessage::SendCommand;
transport.send(rmp_serde::to_vec(&msg).unwrap().into()).await.unwrap();
match transport.next().await {
None => {
panic!("None in the transport");
}
Some(item) => match item {
Ok(buf) => {
use ToServerMessage::*;
let msg: ToServerMessage = rmp_serde::from_slice(&buf).unwrap();
use libbonknet::servermsg::ToServerConnTypeReply;
use libbonknet::servermsg::ToServerConnTypeReply::*;
let msg: ToServerConnTypeReply = rmp_serde::from_slice(&buf).unwrap();
info!("{:?}", msg);
match msg {
Required { id } => {
info!("I'm required with Connection ID {}", id);
OkSendCommand => {
info!("Stream set in SendCommand mode");
}
YouAre(name) => match name {
YouAreValues::Registered { name } => {
info!("I am {}", name);
}
YouAreValues::NotRegistered => {
info!("I'm not registered");
}
GenericFailure => {
panic!("Generic Failure during SendCommand");
}
others => {
panic!("Unexpected Message type: {:?}", others);
}
}
}
Err(e) => {
error!("Error: {:?}", e);
info!("Disconnection: {:?}", e);
}
}
}).await;
}
// Begin WhoAmI
let msg = FromServerCommandMessage::WhoAmI;
transport.send(rmp_serde::to_vec(&msg).unwrap().into()).await.unwrap();
match transport.next().await {
None => {
panic!("None in the transport");
}
Some(item) => match item {
Ok(buf) => {
use libbonknet::servermsg::ToServerCommandReply;
use libbonknet::servermsg::ToServerCommandReply::*;
let msg: ToServerCommandReply = rmp_serde::from_slice(&buf).unwrap();
info!("{:?}", msg);
match msg {
YouAre { name } => {
info!("I am {}", name);
}
GenericFailure => {
panic!("Generic failure during WhoAmI");
}
_ => {
panic!("Unexpected reply");
}
}
}
Err(e) => {
info!("Disconnection: {:?}", e);
}
}
}
transport.close().await.expect("Error during transport stream close");
// Start Subscribe Stream
let connector = TlsConnector::from(Arc::clone(&tlsconfig));
let dnsname = ServerName::try_from("localhost").unwrap();
let stream = TcpStream::connect("localhost:2541").await?;
let stream = connector.connect(dnsname, stream).await?;
let mut transport = Framed::new(stream, LengthDelimitedCodec::new());
let msg = FromServerConnTypeMessage::Subscribe;
transport.send(rmp_serde::to_vec(&msg).unwrap().into()).await.unwrap();
match transport.next().await {
None => {
panic!("None in the transport");
}
Some(item) => match item {
Ok(buf) => {
use libbonknet::servermsg::ToServerConnTypeReply;
use libbonknet::servermsg::ToServerConnTypeReply::*;
let msg: ToServerConnTypeReply = rmp_serde::from_slice(&buf).unwrap();
info!("{:?}", msg);
match msg {
OkSubscribe => {
info!("Stream set in Subscribe mode");
}
GenericFailure => {
panic!("Generic Failure during SendCommand");
}
others => {
panic!("Unexpected Message type: {:?}", others);
}
}
}
Err(e) => {
info!("Disconnection: {:?}", e);
}
}
}
// Subscribe consume
loop {
match transport.next().await {
None => {
info!("Empty Buffer");
}
Some(item) => {
let mut out: Option<FromServerReply> = None;
match item {
Ok(buf) => {
use libbonknet::servermsg::ToServerMessage;
use libbonknet::servermsg::ToServerMessage::*;
let msg: ToServerMessage = rmp_serde::from_slice(&buf).unwrap();
match msg {
Msg { reply_id, body } => {
use libbonknet::servermsg::FromServerReplyBody;
use libbonknet::servermsg::ToServerMessageBody::*;
match body {
Request { conn_id } => {
info!("I'm required with Connection ID {}", conn_id);
out = Some(FromServerReply::Msg {
reply_id,
body: FromServerReplyBody::RequestAccepted,
});
// TODO: SPAWN DATASTREAM
tokio::spawn(datastream(tlsconfig.clone(), conn_id));
}
}
}
Ping => {
info!("Ping!");
out = Some(FromServerReply::Pong);
}
}
}
Err(e) => {
error!("Error: {:?}", e);
}
}
if let Some(msg) = out {
transport.send(rmp_serde::to_vec(&msg).unwrap().into()).await.unwrap();
}
}
}
}
}
Ok(())
}

228
bonknet_server/src/main.rs Normal file
View File

@@ -0,0 +1,228 @@
mod transportstream;
use std::path::Path;
use std::sync::Arc;
use tokio::net::TcpStream;
use tokio_rustls::rustls::{ClientConfig};
use libbonknet::*;
use libbonknet::servermsg::*;
use libbonknet::cert::*;
use uuid::Uuid;
use tracing::{error, info};
use crate::transportstream::*;
#[derive(Clone)]
struct ServerContext<'a> {
identity: LeafCertPair<'a>,
broker_root: BrokerRootCerts<'a>,
my_name: String,
}
impl ServerContext<'_> {
fn tlsconfig(&self) -> ClientConfig {
self.identity.to_tlsclientconfig(&self.broker_root)
}
}
async fn subscribe(ctx: &ServerContext<'_>) -> Result<(), TransportError> {
use ToServerConnTypeReply::*;
let tlsconfig = Arc::new(ctx.tlsconfig());
let mut transport = TransportStream::new(Arc::clone(&tlsconfig)).await?;
let msg = FromServerConnTypeMessage::Subscribe;
match transport.send_and_listen::<_, ToServerConnTypeReply>(&msg).await? {
OkSubscribe => {
info!("Stream set in Subscribe mode");
}
GenericFailure => {
panic!("Generic Failure during SendCommand");
}
others => {
panic!("Unexpected Message type: {:?}", others);
}
}
loop {
use ToServerMessage::*;
let out: Option<FromServerReply>;
match transport.listen_one::<ToServerMessage>().await? {
Msg { reply_id, body } => {
use libbonknet::servermsg::ToServerMessageBody::*;
match body {
Request { conn_id } => {
info!("I'm required with Connection ID {}", conn_id);
out = Some(FromServerReply::Msg {
reply_id,
body: FromServerReplyBody::RequestAccepted,
});
// TODO: Spawn Datastream
tokio::spawn(datastream(ctx.tlsconfig(), conn_id));
}
}
}
Ping => {
info!("Ping!");
out = Some(FromServerReply::Pong);
}
}
if let Some(msg) = out {
transport.send(&msg).await?;
}
}
}
async fn datastream(tlsconfig: ClientConfig, conn_id: Uuid) -> Result<(), TransportError> {
use TransportError::StreamError;
use ToPeerDataStream::*;
let mut transport = TransportStream::new(Arc::new(tlsconfig)).await?;
let msg = FromServerConnTypeMessage::OpenDataStream(conn_id);
match transport.send_and_listen::<_, ToPeerDataStream>(&msg).await? {
OkDataStreamRequestAccepted => {
info!("Data Stream Accepted. Waiting for Open...");
}
Refused => {
panic!("Refused");
}
other => {
panic!("Unexpected response: {:?}", other);
}
}
match transport.listen_one().await? {
OkDataStreamOpen => {
info!("Data Stream Open!. Connecting Streams.");
}
Revoked => {
panic!("Data Stream Revoked!");
}
Refused => {
panic!("Refused");
}
other => {
panic!("Unexpected response: {:?}", other);
}
}
// Initialize outbound stream
let mut inbound = transport.into_inner().into_inner();
let mut outbound = TcpStream::connect("127.0.0.1:22").await.map_err(StreamError)?;
match tokio::io::copy_bidirectional(&mut inbound, &mut outbound).await {
Ok(bytes_copied) => info!("{bytes_copied:?}"),
Err(e) => error!("Error during copy: {e}"),
}
Ok(())
}
async fn announce<'a>(ctx: &ServerContext<'_>) -> Result<LeafCertPair<'a>, TransportError> {
use ToGuestServerMessage::*;
let mut transport = TransportStream::new(Arc::new(ctx.tlsconfig())).await.unwrap();
let msg = FromGuestServerMessage::Announce { name: ctx.my_name.clone() };
transport.send(&msg).await?;
for i in 0..10 {
match transport.listen_one().await? {
OkAnnounce(payload) => {
info!("Ok Announce");
transport.close().await?;
return Ok(payload.parse());
}
FailedNameAlreadyOccupied => {
let new_name = format!("ERROR_{}_{}", &ctx.my_name, i + 1);
error!("Failed Announce, name already occupied. Using {}", &new_name);
let msg = FromGuestServerMessage::Announce { name: new_name };
transport.send(&msg).await?;
}
}
}
panic!("Out of retry");
}
async fn server_name_confirmation<'a>(ctx: &ServerContext<'_>) -> Result<(), TransportError> {
use ToServerConnTypeReply::*;
use ToServerCommandReply::*;
let mut transport = TransportStream::new(Arc::new(ctx.tlsconfig())).await?;
// Declare Conn Type
let msg = FromServerConnTypeMessage::SendCommand;
match transport.send_and_listen(&msg).await? {
OkSendCommand => {}
e => {
panic!("Error during ConnType Declare: {:?}", e);
}
}
// Ask Name
let msg = FromServerCommandMessage::WhoAmI;
match transport.send_and_listen(&msg).await? {
YouAre { name } => {
if ctx.my_name == name {
return Ok(());
}
}
other => {
panic!("Unexpected response: {:?}", other);
}
}
// If name doesn't correspond, try to ChangeName. 10 retry. If they fail, keep the actual one without panic.
let msg = FromServerCommandMessage::ChangeName { name: ctx.my_name.clone() };
transport.send(&msg).await?;
for i in 0..10 {
match transport.listen_one().await? {
NameChanged => {
return Ok(());
}
NameNotAvailable => {
let msg = FromServerCommandMessage::ChangeName { name: format!("ERROR_{}_{}", ctx.my_name, i + 1) };
transport.send(&msg).await?;
}
other => {
panic!("Unexpected response: {:?}", other);
}
}
}
panic!("Exhausted Announce Retry");
}
#[tokio::main]
async fn main() -> std::io::Result<()> {
// Tracing Subscriber
let subscriber = tracing_subscriber::FmtSubscriber::new();
tracing::subscriber::set_global_default(subscriber).unwrap();
// Server Name
// TODO: from config using std
let my_name = String::from("cicciopizza");
let serverident_path = Path::new("server/serveridentity.pem"); // "/etc/bonknet/identity.pem"
let guestserverident_path = Path::new("server/guestidentity.pem"); // "/etc/bonknet/guestidentity.pem"
let broker_root_path = Path::new("certs_pem/broker_root_ca_cert.pem"); // "/etc/bonknet/broker_root_ca_cert.pem"
// Load Broker Root file
if !(broker_root_path.try_exists().unwrap() && broker_root_path.is_file()) {
panic!("No Broker Root file");
}
let broker_root = BrokerRootCerts::load_from_file("certs_pem/broker_root_ca_cert.pem").unwrap();
// Load Identity Files (if needed, contact the broker for generation)
let exists_serverident = serverident_path.try_exists().unwrap() && serverident_path.is_file();
let exists_guestident = guestserverident_path.try_exists().unwrap() && guestserverident_path.is_file();
// Do Guest registration and Name confirmation
let ctx = if !exists_serverident && exists_guestident {
info!("No Server Identity. Starting Guest Announce...");
// No Server, Yes Guest -> Use Guest to retrieve Server identity
let guest_ident = LeafCertPair::load_from_file(guestserverident_path).unwrap();
let ctx = ServerContext { identity: guest_ident, broker_root: broker_root.clone(), my_name: my_name.clone() };
let server_ident = announce(&ctx).await.unwrap();
server_ident.save_into_file(serverident_path).unwrap();
ServerContext { identity: server_ident, broker_root: broker_root.clone(), my_name: my_name.clone() }
} else if exists_serverident {
// Yes Server -> Use Server file as identity
info!("Server Identity found. Confirming...");
let server_ident = LeafCertPair::load_from_file(serverident_path).unwrap();
let ctx = ServerContext { identity: server_ident, broker_root: broker_root.clone(), my_name: my_name.clone() };
server_name_confirmation(&ctx).await.unwrap();
ctx
} else {
// No identity file present
panic!("No Identity file found");
};
// Start Server Main
let ctx = Arc::new(ctx);
loop {
if let Err(e) = subscribe(&ctx).await {
error!("Subscribe Task aborted due to {}", e);
error!("Restoring Subscribe Task...");
}
}
}

View File

@@ -0,0 +1,80 @@
use std::sync::Arc;
use futures::{SinkExt, StreamExt};
use tokio::net::TcpStream;
use tokio_rustls::client::TlsStream;
use tokio_rustls::rustls::pki_types::ServerName;
use tokio_rustls::TlsConnector;
use tokio_util::codec::{Framed, LengthDelimitedCodec};
use serde::{Serialize};
use serde::de::DeserializeOwned;
use thiserror::Error;
use tokio_rustls::rustls::ClientConfig;
#[derive(Error, Debug)]
pub enum TransportError {
#[error("Stream Terminated, next() returned None")]
StreamTerminated,
#[error("Stream Error")]
StreamError(std::io::Error),
#[error("Serialization Error")]
SerializeError(rmp_serde::encode::Error),
#[error("Deserialization Error")]
DeserializeError(rmp_serde::decode::Error),
}
pub struct TransportStream {
transport: Framed<TlsStream<TcpStream>, LengthDelimitedCodec>,
}
impl TransportStream {
pub async fn new(tlsconfig: Arc<ClientConfig>) -> Result<Self, TransportError> {
use TransportError::StreamError;
let connector = TlsConnector::from(tlsconfig);
let dnsname = ServerName::try_from("localhost").unwrap();
let stream = TcpStream::connect("localhost:2541").await.map_err(StreamError)?;
let stream = connector.connect(dnsname, stream).await.map_err(StreamError)?;
let transport = Framed::new(stream, LengthDelimitedCodec::new());
Ok(TransportStream { transport })
}
pub fn into_inner(self) -> Framed<TlsStream<TcpStream>, LengthDelimitedCodec> {
self.transport
}
pub async fn send<T: Serialize>(&mut self, msg: &T) -> Result<(), TransportError> {
use TransportError::*;
self.transport.send(rmp_serde::to_vec(&msg).map_err(SerializeError)?.into()).await.map_err(StreamError)?;
Ok(())
}
pub async fn send_and_listen<T: Serialize, U: DeserializeOwned>(&mut self, msg: &T) -> Result<U, TransportError> {
self.send(msg).await?;
self.listen_one().await
}
pub async fn listen_one<T: DeserializeOwned>(&mut self) -> Result<T, TransportError> {
use TransportError::*;
match self.transport.next().await {
None => {
// Stream Terminated
Err(StreamTerminated)
}
Some(item) => match item {
Ok(buf) => {
let msg: T = rmp_serde::from_slice(&buf).map_err(DeserializeError)?;
Ok(msg)
}
Err(e) => {
// Disconnection
Err(StreamError(e))
}
}
}
}
pub async fn close(mut self) -> Result<(), TransportError> {
self.transport.close().await.map_err(TransportError::StreamError)
}
}

View File

@@ -8,4 +8,8 @@ edition = "2021"
[dependencies]
tokio-rustls = "0.25.0"
rustls-pemfile = "2.0.0"
rcgen = { version = "0.12.1", features = ["x509-parser"] }
x509-parser = "0.16.0"
serde = { version = "1.0", features = ["derive"] }
uuid = { version = "1.7.0", features = ["serde"] }
pem = "3.0.3"

227
libbonknet/src/cert.rs Normal file
View File

@@ -0,0 +1,227 @@
use std::io::{BufReader, Error, ErrorKind, Write};
use rcgen::{Certificate, CertificateParams, DnType, KeyPair};
use rustls_pemfile::{Item, read_all, read_one};
use tokio_rustls::rustls::{ClientConfig, RootCertStore};
use tokio_rustls::rustls::pki_types::{CertificateDer, PrivatePkcs8KeyDer};
use x509_parser::nom::AsBytes;
use pem::{self, Pem};
pub struct RawCertPair {
pub cert: Vec<u8>,
pub prkey: Vec<u8>,
}
// TODO: Eventually consider shifting from CertificateDer to x509_parser::X509Certificate
// for the extra information it can provide runtime
pub struct LeafCertPair<'a> {
cert: CertificateDer<'a>,
ca_chain: Vec<CertificateDer<'a>>,
prkey: PrivatePkcs8KeyDer<'a>,
}
impl Clone for LeafCertPair<'_> {
fn clone(&self) -> Self {
Self {
cert: self.cert.clone(),
ca_chain: self.ca_chain.clone(),
prkey: self.prkey.clone_key(),
}
}
}
impl LeafCertPair<'_> {
pub fn parse<'a>(cert: Vec<u8>, ca_chain: Vec<Vec<u8>>, prkey: Vec<u8>) -> LeafCertPair<'a> {
let cert = CertificateDer::from(cert);
let ca_chain = ca_chain.into_iter().map(CertificateDer::from).collect();
let prkey = PrivatePkcs8KeyDer::from(prkey);
LeafCertPair {
cert,
ca_chain,
prkey,
}
}
pub fn save_into_file<P: AsRef<std::path::Path>>(&self, filename: P) -> std::io::Result<()> {
let mut file = std::fs::File::create(filename)?;
let mut pems = vec![
Pem::new("CERTIFICATE", self.cert.as_bytes())
];
for c in self.ca_chain.iter() {
pems.push(Pem::new("CERTIFICATE", c.as_bytes()));
}
pems.push(Pem::new("PRIVATE KEY", self.prkey.secret_pkcs8_der()));
file.write_all(pem::encode_many(&pems).as_bytes())?;
Ok(())
}
pub fn load_from_file<'a, P: AsRef<std::path::Path>>(filename: P) -> std::io::Result<LeafCertPair<'a>> {
let file = std::fs::File::open(filename).unwrap();
let mut buf = BufReader::new(file);
if let Item::X509Certificate(cert) = read_one(&mut buf).unwrap().unwrap() {
let parsed_cert = x509_parser::parse_x509_certificate(&cert).unwrap().1;
if parsed_cert.is_ca() {
return Err(Error::new(ErrorKind::InvalidInput, "main cert is ca"));
}
let mut ca_chain: Vec<CertificateDer> = Vec::new();
for item in read_all(&mut buf) {
match item {
Ok(Item::X509Certificate(c)) => {
let parsed_cert = x509_parser::parse_x509_certificate(&c).unwrap().1;
if !parsed_cert.is_ca() {
return Err(Error::new(ErrorKind::InvalidInput, "chain cert is not ca"));
}
ca_chain.push(c);
},
Ok(Item::Pkcs8Key(prkey)) => {
return Ok(LeafCertPair { cert, ca_chain, prkey });
}
_ => {
return Err(Error::new(ErrorKind::InvalidInput, "invalid format"));
}
}
}
Err(Error::new(ErrorKind::InvalidInput, "pkcs8key not found"))
} else {
Err(Error::new(ErrorKind::InvalidInput, "no main x509 cert"))
}
}
pub fn cert(&self) -> &CertificateDer {
&self.cert
}
pub fn ca_chain(&self) -> &Vec<CertificateDer> {
&self.ca_chain
}
pub fn prkey(&self) -> &PrivatePkcs8KeyDer {
&self.prkey
}
pub fn clone_key(&self) -> PrivatePkcs8KeyDer<'static> {
self.prkey.clone_key()
}
pub fn fullchain<'a>(&self) -> Vec<CertificateDer<'a>> {
let mut chain: Vec<CertificateDer> = Vec::with_capacity(self.ca_chain.len() + 1);
chain.push(self.cert.clone().into_owned());
let mut ca_chain = self.ca_chain.clone().into_iter().map(|c| c.into_owned()).collect();
chain.append(&mut ca_chain);
chain
}
pub fn to_raw(&self) -> RawCertPair {
let cert = self.cert.to_vec();
let prkey = self.prkey.secret_pkcs8_der().to_vec();
RawCertPair { cert, prkey }
}
pub fn to_tlsclientconfig(&self, broker_root_certs: &BrokerRootCerts) -> ClientConfig {
let broker_root_cert_store = broker_root_certs.to_rootcertstore();
let cert_chain = self.fullchain();
ClientConfig::builder()
.with_root_certificates(broker_root_cert_store)
.with_client_auth_cert(cert_chain, self.prkey.clone_key().into()).expect("Invalid Cert chain")
}
}
pub struct CACertPair<'a> {
cert: CertificateDer<'a>,
ca_chain: Vec<CertificateDer<'a>>,
prkey: PrivatePkcs8KeyDer<'a>,
}
impl CACertPair<'_> {
pub fn load_from_file<'a, P: AsRef<std::path::Path>>(filename: P) -> std::io::Result<CACertPair<'a>> {
let file = std::fs::File::open(filename).unwrap();
let mut buf = BufReader::new(file);
if let Item::X509Certificate(cert) = read_one(&mut buf).unwrap().unwrap() {
let parsed_cert = x509_parser::parse_x509_certificate(&cert).unwrap().1;
if !parsed_cert.is_ca() {
return Err(Error::new(ErrorKind::InvalidInput, "cert is not ca"));
}
// TODO: Implement ca_chain reading (for now it's unused)
if let Item::Pkcs8Key(prkey) = read_one(&mut buf).unwrap().unwrap() {
Ok(CACertPair { cert, ca_chain: vec![], prkey })
} else {
Err(Error::new(ErrorKind::InvalidInput, "no ca pkcs8key"))
}
} else {
Err(Error::new(ErrorKind::InvalidInput, "no ca x509 cert"))
}
}
fn rcgen_keypair(&self) -> KeyPair {
KeyPair::from_der(self.prkey.secret_pkcs8_der()).unwrap()
}
fn rcgen_certificate(&self) -> Certificate {
Certificate::from_params(CertificateParams::from_ca_cert_der(
&self.cert,
self.rcgen_keypair()
).unwrap()).unwrap()
}
pub fn sign_new_cert(&self, params: CertificateParams) -> LeafCertPair {
let root_cert = self.rcgen_certificate();
let certificate = Certificate::from_params(params).unwrap();
let b_cert = certificate.serialize_der_with_signer(&root_cert).unwrap();
let b_prkey = certificate.serialize_private_key_der();
let cert = CertificateDer::from(b_cert);
let prkey = PrivatePkcs8KeyDer::from(b_prkey);
let mut ca_chain = Vec::with_capacity(self.ca_chain.len() + 1);
ca_chain.push(self.cert.clone());
ca_chain.extend(self.ca_chain.iter().cloned());
LeafCertPair { cert, ca_chain, prkey }
}
pub fn cert(&self) -> &CertificateDer {
&self.cert
}
pub fn clone_key(&self) -> PrivatePkcs8KeyDer {
self.prkey.clone_key()
}
pub fn to_raw(&self) -> RawCertPair {
let cert = self.cert.to_vec();
let prkey = self.prkey.secret_pkcs8_der().to_vec();
RawCertPair { cert, prkey }
}
}
pub fn server_leaf_certparams(name: &str) -> CertificateParams {
let mut params = CertificateParams::new(vec!["entity.other.host".into(), format!("bonk.server.{name}")]);
params.distinguished_name.push(DnType::CommonName, name);
params.use_authority_key_identifier_extension = true;
params.key_usages.push(rcgen::KeyUsagePurpose::DigitalSignature);
params.extended_key_usages.push(rcgen::ExtendedKeyUsagePurpose::ClientAuth);
params
}
#[derive(Clone)]
pub struct BrokerRootCerts<'a> {
root_cert: CertificateDer<'a>
}
impl BrokerRootCerts<'_> {
pub fn load_from_file<'a, P: AsRef<std::path::Path>>(filename: P) -> std::io::Result<BrokerRootCerts<'a>> {
let file = std::fs::File::open(filename).unwrap();
let mut buf = BufReader::new(file);
if let Item::X509Certificate(root_cert) = read_one(&mut buf).unwrap().unwrap() {
Ok(BrokerRootCerts { root_cert })
} else {
Err(Error::new(ErrorKind::InvalidInput, "no broker root x509 cert"))
}
}
pub fn to_rootcertstore(&self) -> RootCertStore {
let mut broker_root_cert_store = RootCertStore::empty();
broker_root_cert_store.add(self.root_cert.clone()).expect("Invalid Broker Root");
broker_root_cert_store
}
pub fn certs(&self) -> Vec<&CertificateDer> {
vec![&self.root_cert]
}
}

View File

@@ -0,0 +1,23 @@
pub use crate::ToPeerDataStream;
use serde::{Deserialize, Serialize};
use uuid::Uuid;
// Client things
#[derive(Debug, Serialize, Deserialize)]
pub enum FromClientCommand {
RequestServer { name: String },
ServerList,
UpgradeToDataStream(Uuid),
}
#[derive(Debug, Serialize, Deserialize)]
pub enum ToClientResponse {
OkRequest { conn_id: Uuid },
OkServerList { data: Vec<String> },
// You are now a DataStream, wait the Open message
OkDataStreamRequestAccepted,
// The stream is open, you can pipe in-out the content you want!
OkDataStreamOpen,
GenericError,
}

View File

@@ -1,11 +1,15 @@
pub mod servermsg;
pub mod clientmsg;
pub mod cert;
use std::io::{BufReader, Error, ErrorKind};
use rustls_pemfile::{read_one, Item};
use rustls_pemfile::{Item, read_one};
use serde::{Deserialize, Serialize};
use tokio_rustls::rustls::pki_types::{CertificateDer, PrivatePkcs8KeyDer};
pub fn load_cert(filename: &str) -> std::io::Result<CertificateDer> {
let cert_file = std::fs::File::open(filename).unwrap();
let mut buf = std::io::BufReader::new(cert_file);
let mut buf = BufReader::new(cert_file);
if let Item::X509Certificate(cert) = read_one(&mut buf).unwrap().unwrap() {
Ok(cert)
} else {
@@ -25,53 +29,14 @@ pub fn load_prkey(filename: &str) -> std::io::Result<PrivatePkcs8KeyDer> {
}
}
// TODO: move all this inside Client and Server, for example using a DataStreamCmd(ToPeerDataStream)
#[derive(Debug, Serialize, Deserialize)]
pub enum RequiredReplyValues {
Ok,
GenericFailure { status_code: u32, msg: Option<String> },
}
#[derive(Debug, Serialize, Deserialize)]
pub enum FromServerMessage {
RequiredReply(RequiredReplyValues),
ChangeName { name: String },
WhoAmI,
}
#[derive(Debug, Serialize, Deserialize)]
pub enum YouAreValues {
Registered { name: String },
NotRegistered,
}
#[derive(Debug, Serialize, Deserialize)]
pub enum ToServerMessage {
Required { id: String },
YouAre(YouAreValues),
}
#[derive(Debug, Serialize, Deserialize)]
pub enum FromGuestServerMessage {
Announce { name: String }
}
#[derive(Debug, Serialize, Deserialize)]
pub enum ToGuestServerMessage {
OkAnnounce {server_cert: Vec<u8>, server_prkey: Vec<u8>},
FailedNameAlreadyOccupied,
}
pub fn okannounce_to_cert<'a>(server_cert: Vec<u8>, server_prkey: Vec<u8>) -> (CertificateDer<'a>, PrivatePkcs8KeyDer<'a>) {
let server_cert = CertificateDer::from(server_cert);
let server_prkey = PrivatePkcs8KeyDer::from(server_prkey);
(server_cert, server_prkey)
}
impl ToGuestServerMessage {
pub fn make_okannounce(server_cert: CertificateDer, server_prkey: PrivatePkcs8KeyDer) -> Self {
ToGuestServerMessage::OkAnnounce{
server_cert: server_cert.to_vec(),
server_prkey: server_prkey.secret_pkcs8_der().to_vec()
}
}
pub enum ToPeerDataStream {
// You are now a DataStream, wait the Open message
OkDataStreamRequestAccepted,
// The stream is open, you can pipe in-out the content you want!
OkDataStreamOpen,
Refused,
Revoked,
GenericError,
}

108
libbonknet/src/servermsg.rs Normal file
View File

@@ -0,0 +1,108 @@
pub use crate::ToPeerDataStream;
use tokio_rustls::rustls::pki_types::{CertificateDer, PrivatePkcs8KeyDer};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use crate::cert::LeafCertPair;
#[derive(Debug, Serialize, Deserialize)]
pub enum FromServerConnTypeMessage {
SendCommand,
Subscribe,
OpenDataStream(Uuid),
}
#[derive(Debug, Serialize, Deserialize)]
pub enum ToServerConnTypeReply {
OkSendCommand,
OkSubscribe,
// You are now a DataStream, wait the Open message
OkDataStreamRequestAccepted,
// The stream is open, you can pipe in-out the content you want!
OkDataStreamOpen,
GenericFailure,
}
#[derive(Debug, Serialize, Deserialize)]
pub enum FromServerCommandMessage {
ChangeName { name: String },
WhoAmI,
}
#[derive(Debug, Serialize, Deserialize)]
pub enum ToServerCommandReply {
NameChanged,
NameNotAvailable,
YouAre { name: String },
GenericFailure,
}
#[derive(Debug, Serialize, Deserialize)]
pub enum ToServerMessageBody {
Request { conn_id: Uuid },
}
#[derive(Debug, Serialize, Deserialize)]
pub enum ToServerMessage {
Ping,
Msg {
reply_id: u64,
body: ToServerMessageBody,
},
}
#[derive(Debug, Serialize, Deserialize)]
pub enum FromServerReplyBody {
RequestAccepted,
RequestFailed,
Pong,
}
#[derive(Debug, Serialize, Deserialize)]
pub enum FromServerReply {
Pong,
Msg {
reply_id: u64,
body: FromServerReplyBody
}
}
#[derive(Debug, Serialize, Deserialize)]
pub enum FromGuestServerMessage {
Announce { name: String }
}
pub fn okannounce_to_cert<'a>(server_cert: Vec<u8>, server_prkey: Vec<u8>) -> (CertificateDer<'a>, PrivatePkcs8KeyDer<'a>) {
let server_cert = CertificateDer::from(server_cert);
let server_prkey = PrivatePkcs8KeyDer::from(server_prkey);
(server_cert, server_prkey)
}
#[derive(Debug, Serialize, Deserialize)]
pub struct OkAnnoucePayload {
server_cert: Vec<u8>,
ca_chain: Vec<Vec<u8>>,
server_prkey: Vec<u8>,
}
impl OkAnnoucePayload {
pub fn parse<'a>(self) -> LeafCertPair<'a> {
LeafCertPair::parse(self.server_cert, self.ca_chain, self.server_prkey)
}
}
#[derive(Debug, Serialize, Deserialize)]
pub enum ToGuestServerMessage {
OkAnnounce(OkAnnoucePayload),
FailedNameAlreadyOccupied,
}
impl ToGuestServerMessage {
pub fn make_okannounce(server_leaf: &LeafCertPair) -> Self {
ToGuestServerMessage::OkAnnounce(OkAnnoucePayload {
server_cert: server_leaf.cert().to_vec(),
ca_chain: server_leaf.ca_chain().iter().map(|c| c.to_vec()).collect(),
server_prkey: server_leaf.prkey().secret_pkcs8_der().to_vec(),
})
}
}