Compare commits
3 Commits
1e4e4bdb53
...
refactor
| Author | SHA1 | Date | |
|---|---|---|---|
| 8ecaf9c993 | |||
| 4212e23680 | |||
| 7c1e5122de |
@@ -27,6 +27,7 @@ rand = "0.8.5"
|
|||||||
uuid = { version = "1.7.0", features = ["v4", "serde"] }
|
uuid = { version = "1.7.0", features = ["v4", "serde"] }
|
||||||
rustls-pemfile = "2.0.0"
|
rustls-pemfile = "2.0.0"
|
||||||
x509-parser = "0.16.0"
|
x509-parser = "0.16.0"
|
||||||
|
rusqlite = { version = "0.31.0", features = ["bundled"] }
|
||||||
|
|
||||||
[[bin]]
|
[[bin]]
|
||||||
name = "init_certs"
|
name = "init_certs"
|
||||||
|
|||||||
@@ -14,6 +14,9 @@ pub enum DataConnManagerError {
|
|||||||
GenericFailure,
|
GenericFailure,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Ask the DataConnManager to start a new Data Connection with
|
||||||
|
/// these two TransportStream as ends of the bridge.
|
||||||
|
/// As internal ID the Client Connection ID is used.
|
||||||
#[derive(Message)]
|
#[derive(Message)]
|
||||||
#[rtype(result = "Result<(),DataConnManagerError>")]
|
#[rtype(result = "Result<(),DataConnManagerError>")]
|
||||||
pub struct StartDataBridge {
|
pub struct StartDataBridge {
|
||||||
@@ -22,6 +25,7 @@ pub struct StartDataBridge {
|
|||||||
pub client_transport: TransportStream,
|
pub client_transport: TransportStream,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Stop the data bridge using the Client Connection ID as reference to the bridge.
|
||||||
#[derive(Message)]
|
#[derive(Message)]
|
||||||
#[rtype(result = "()")]
|
#[rtype(result = "()")]
|
||||||
pub struct StopDataBridge {
|
pub struct StopDataBridge {
|
||||||
@@ -31,23 +35,27 @@ pub struct StopDataBridge {
|
|||||||
type ClientConnId = Uuid;
|
type ClientConnId = Uuid;
|
||||||
|
|
||||||
struct Connection {
|
struct Connection {
|
||||||
proxyhandler: SpawnHandle,
|
proxyhandler: SpawnHandle, // SpawnHandle is an actix Future handler (basically a Task)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// This is the actor that manages the Data Connections.
|
||||||
|
/// When a new Data Bridge is created, that consist of a Task running indefinitely that
|
||||||
|
/// redirect information from a Client to a Server and viceversa.
|
||||||
|
/// Basically the DataBridge is where the SSH information are sent in.
|
||||||
pub struct DataConnManager {
|
pub struct DataConnManager {
|
||||||
conns: HashMap<ClientConnId, Connection>
|
conns: HashMap<ClientConnId, Connection>
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl Actor for DataConnManager {
|
||||||
|
type Context = Context<Self>;
|
||||||
|
}
|
||||||
|
|
||||||
impl DataConnManager {
|
impl DataConnManager {
|
||||||
pub fn new() -> Self {
|
pub fn new() -> Self {
|
||||||
Self { conns: HashMap::new() }
|
Self { conns: HashMap::new() }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Actor for DataConnManager {
|
|
||||||
type Context = Context<Self>;
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Handler<StopDataBridge> for DataConnManager {
|
impl Handler<StopDataBridge> for DataConnManager {
|
||||||
type Result = ();
|
type Result = ();
|
||||||
|
|
||||||
|
|||||||
@@ -30,9 +30,9 @@ struct BrokerContext {
|
|||||||
client_ca: CACertPair<'static>,
|
client_ca: CACertPair<'static>,
|
||||||
server_ca: CACertPair<'static>,
|
server_ca: CACertPair<'static>,
|
||||||
guestserver_ca: CACertPair<'static>,
|
guestserver_ca: CACertPair<'static>,
|
||||||
scdb_addr: Addr<ServerCertDB>,
|
servercert_db: Addr<ServerCertDB>,
|
||||||
pdcm_addr: Addr<PendingDataConnManager>,
|
pendingdataconn_manager: Addr<PendingDataConnManager>,
|
||||||
sm_addr: Addr<ServerManager>,
|
server_manager: Addr<ServerManager>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@@ -47,19 +47,19 @@ async fn main() {
|
|||||||
let server_ca = CACertPair::load_from_file("certs_pem/server_root_ca.pem").unwrap();
|
let server_ca = CACertPair::load_from_file("certs_pem/server_root_ca.pem").unwrap();
|
||||||
let guestserver_ca = CACertPair::load_from_file("certs_pem/guestserver_root_ca.pem").unwrap();
|
let guestserver_ca = CACertPair::load_from_file("certs_pem/guestserver_root_ca.pem").unwrap();
|
||||||
// Load Actors
|
// Load Actors
|
||||||
let scdb_addr = ServerCertDB::new().start();
|
let servercert_db = ServerCertDB::new("certsdb.sqlite").unwrap().start();
|
||||||
let dcm_addr = DataConnManager::new().start();
|
let dataconn_manager = DataConnManager::new().start();
|
||||||
let pdcm_addr = PendingDataConnManager::new(dcm_addr).start();
|
let pendingdataconn_manager = PendingDataConnManager::new(dataconn_manager).start();
|
||||||
let sm_addr = ServerManager::new(pdcm_addr.clone()).start();
|
let server_manager = ServerManager::new(pendingdataconn_manager.clone()).start();
|
||||||
// Create Context
|
// Create Context
|
||||||
let ctx = Arc::new(BrokerContext {
|
let ctx = Arc::new(BrokerContext {
|
||||||
broker_leaf,
|
broker_leaf,
|
||||||
client_ca,
|
client_ca,
|
||||||
server_ca,
|
server_ca,
|
||||||
guestserver_ca,
|
guestserver_ca,
|
||||||
scdb_addr,
|
servercert_db,
|
||||||
pdcm_addr,
|
pendingdataconn_manager,
|
||||||
sm_addr,
|
server_manager,
|
||||||
});
|
});
|
||||||
|
|
||||||
// Pki Client Verifier
|
// Pki Client Verifier
|
||||||
@@ -109,7 +109,7 @@ async fn main() {
|
|||||||
}
|
}
|
||||||
Subscribe => {
|
Subscribe => {
|
||||||
info!("Subscribe Stream");
|
info!("Subscribe Stream");
|
||||||
let name = match ctx.scdb_addr.send(FetchName { cert: peer_cert_bytes }).await.unwrap() {
|
let name = match ctx.servercert_db.send(FetchName { cert: peer_cert_bytes }).await.unwrap() {
|
||||||
None => {
|
None => {
|
||||||
error!("Cert has no name assigned!");
|
error!("Cert has no name assigned!");
|
||||||
let reply = ToServerConnTypeReply::GenericFailure;
|
let reply = ToServerConnTypeReply::GenericFailure;
|
||||||
@@ -125,7 +125,7 @@ async fn main() {
|
|||||||
OpenDataStream(conn_id) => {
|
OpenDataStream(conn_id) => {
|
||||||
info!("OpenDataStream with {:?}", conn_id);
|
info!("OpenDataStream with {:?}", conn_id);
|
||||||
let msg = RegisterStream::server(conn_id, transport);
|
let msg = RegisterStream::server(conn_id, transport);
|
||||||
ctx.pdcm_addr.send(msg).await.unwrap().unwrap();
|
ctx.pendingdataconn_manager.send(msg).await.unwrap().unwrap();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -159,7 +159,7 @@ async fn main() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async fn server_subscribe_handler(ctx: &BrokerContext, transport: TransportStream, name: String) {
|
async fn server_subscribe_handler(ctx: &BrokerContext, transport: TransportStream, name: String) {
|
||||||
match ctx.sm_addr.send(StartTransporter { name, transport }).await.unwrap() {
|
match ctx.server_manager.send(StartTransporter { name, transport }).await.unwrap() {
|
||||||
Ok(_) => {
|
Ok(_) => {
|
||||||
info!("Stream sent to the manager");
|
info!("Stream sent to the manager");
|
||||||
}
|
}
|
||||||
@@ -184,7 +184,7 @@ async fn server_command_handler(ctx: &BrokerContext, mut transport: TransportStr
|
|||||||
match msg {
|
match msg {
|
||||||
ChangeName { name } => {
|
ChangeName { name } => {
|
||||||
info!("Changing name to {}", name);
|
info!("Changing name to {}", name);
|
||||||
match ctx.scdb_addr.send(UnregisterServer { cert: peer_cert_bytes.clone() }).await.unwrap() {
|
match ctx.servercert_db.send(UnregisterServer { cert: peer_cert_bytes.clone() }).await.unwrap() {
|
||||||
None => {
|
None => {
|
||||||
info!("Nothing to unregister");
|
info!("Nothing to unregister");
|
||||||
}
|
}
|
||||||
@@ -192,7 +192,7 @@ async fn server_command_handler(ctx: &BrokerContext, mut transport: TransportStr
|
|||||||
warn!("Unregistered from old name {}", old_name);
|
warn!("Unregistered from old name {}", old_name);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
let reply = match ctx.scdb_addr.send(RegisterServer { cert: peer_cert_bytes.clone(), name }).await.unwrap() {
|
let reply = match ctx.servercert_db.send(RegisterServer { cert: peer_cert_bytes.clone(), name }).await.unwrap() {
|
||||||
Ok(_) => {
|
Ok(_) => {
|
||||||
info!("Registered!");
|
info!("Registered!");
|
||||||
ToServerCommandReply::NameChanged
|
ToServerCommandReply::NameChanged
|
||||||
@@ -206,7 +206,7 @@ async fn server_command_handler(ctx: &BrokerContext, mut transport: TransportStr
|
|||||||
}
|
}
|
||||||
WhoAmI => {
|
WhoAmI => {
|
||||||
info!("Asked who I am");
|
info!("Asked who I am");
|
||||||
let reply = match ctx.scdb_addr.send(FetchName { cert: peer_cert_bytes.clone() }).await.unwrap() {
|
let reply = match ctx.servercert_db.send(FetchName { cert: peer_cert_bytes.clone() }).await.unwrap() {
|
||||||
None => {
|
None => {
|
||||||
info!("I'm not registered anymore!? WTF");
|
info!("I'm not registered anymore!? WTF");
|
||||||
ToServerCommandReply::GenericFailure
|
ToServerCommandReply::GenericFailure
|
||||||
@@ -245,14 +245,14 @@ async fn guestserver_handler(ctx: &BrokerContext, mut transport: TransportStream
|
|||||||
match msg {
|
match msg {
|
||||||
Announce { name } => {
|
Announce { name } => {
|
||||||
info!("Announced with name {}", name);
|
info!("Announced with name {}", name);
|
||||||
if ctx.scdb_addr.send(IsNameRegistered { name: name.clone() }).await.unwrap() {
|
if ctx.servercert_db.send(IsNameRegistered { name: name.clone() }).await.unwrap() {
|
||||||
info!("Name {} already registered!", name);
|
info!("Name {} already registered!", name);
|
||||||
let reply = ToGuestServerMessage::FailedNameAlreadyOccupied;
|
let reply = ToGuestServerMessage::FailedNameAlreadyOccupied;
|
||||||
transport.send(rmp_serde::to_vec(&reply).unwrap().into()).await.unwrap();
|
transport.send(rmp_serde::to_vec(&reply).unwrap().into()).await.unwrap();
|
||||||
break; // Stop reading
|
break; // Stop reading
|
||||||
} else {
|
} else {
|
||||||
let cert = ctx.server_ca.sign_new_cert(server_leaf_certparams(name.as_str()));
|
let cert = ctx.server_ca.sign_new_cert(server_leaf_certparams(name.as_str()));
|
||||||
ctx.scdb_addr.send(RegisterServer {
|
ctx.servercert_db.send(RegisterServer {
|
||||||
cert: cert.cert().to_vec(),
|
cert: cert.cert().to_vec(),
|
||||||
name,
|
name,
|
||||||
}).await.unwrap().unwrap();
|
}).await.unwrap().unwrap();
|
||||||
@@ -287,7 +287,7 @@ async fn client_handler(ctx: &BrokerContext, mut transport: TransportStream) {
|
|||||||
match msg {
|
match msg {
|
||||||
FromClientCommand::RequestServer { name } => {
|
FromClientCommand::RequestServer { name } => {
|
||||||
info!("REQUESTED SERVER {}", name);
|
info!("REQUESTED SERVER {}", name);
|
||||||
let data = ctx.sm_addr.send(RequestServer { name }).await.unwrap();
|
let data = ctx.server_manager.send(RequestServer { name }).await.unwrap();
|
||||||
match data {
|
match data {
|
||||||
Ok(client_conn_id) => {
|
Ok(client_conn_id) => {
|
||||||
let reply = ToClientResponse::OkRequest { conn_id: client_conn_id };
|
let reply = ToClientResponse::OkRequest { conn_id: client_conn_id };
|
||||||
@@ -301,14 +301,14 @@ async fn client_handler(ctx: &BrokerContext, mut transport: TransportStream) {
|
|||||||
}
|
}
|
||||||
FromClientCommand::ServerList => {
|
FromClientCommand::ServerList => {
|
||||||
info!("Requested ServerList");
|
info!("Requested ServerList");
|
||||||
let data = ctx.sm_addr.send(GetServerList {}).await.unwrap();
|
let data = ctx.server_manager.send(GetServerList {}).await.unwrap();
|
||||||
let reply = ToClientResponse::OkServerList { data };
|
let reply = ToClientResponse::OkServerList { data };
|
||||||
transport.send(rmp_serde::to_vec(&reply).unwrap().into()).await.unwrap();
|
transport.send(rmp_serde::to_vec(&reply).unwrap().into()).await.unwrap();
|
||||||
}
|
}
|
||||||
FromClientCommand::UpgradeToDataStream(conn_id) => {
|
FromClientCommand::UpgradeToDataStream(conn_id) => {
|
||||||
info!("Upgrade to DataStream with conn_id {:?}", conn_id);
|
info!("Upgrade to DataStream with conn_id {:?}", conn_id);
|
||||||
let msg = RegisterStream::client(conn_id, transport);
|
let msg = RegisterStream::client(conn_id, transport);
|
||||||
ctx.pdcm_addr.send(msg).await.unwrap().unwrap();
|
ctx.pendingdataconn_manager.send(msg).await.unwrap().unwrap();
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,3 +1,13 @@
|
|||||||
|
//! Pending Data Connect Database is where we store the Data Connection in creation.
|
||||||
|
//! Every row contains:
|
||||||
|
//! - The ServerConnID that the server will use (uuid)
|
||||||
|
//! - The ClientConnID that the client will use (uuid)
|
||||||
|
//! - If the Server is already connected and ready, the Server Socket
|
||||||
|
//! - If the Client is already connected and ready, the Client Socket
|
||||||
|
//!
|
||||||
|
//! When a row contains a valid ServerSocket and ClientSocket, then the row is removed and the
|
||||||
|
//! sockets moved to the Data Connection Manager
|
||||||
|
|
||||||
use actix::prelude::*;
|
use actix::prelude::*;
|
||||||
use futures::SinkExt;
|
use futures::SinkExt;
|
||||||
use thiserror::Error;
|
use thiserror::Error;
|
||||||
@@ -7,21 +17,6 @@ use libbonknet::*;
|
|||||||
use crate::dataconnmanager::{DataConnManager, StartDataBridge};
|
use crate::dataconnmanager::{DataConnManager, StartDataBridge};
|
||||||
use crate::streamutils::*;
|
use crate::streamutils::*;
|
||||||
|
|
||||||
/*
|
|
||||||
L'idea e' che il database deve avere una riga per ogni connessione dati in nascita.
|
|
||||||
In ogni "riga" deve essere presente:
|
|
||||||
- Il ServerConnID che il server dovra usare (pkey)
|
|
||||||
- Il ClientConnID che il client dovra usare (pkey)
|
|
||||||
- Se gia connesso, il Socket del Server
|
|
||||||
- Se gia connesso il Socket del Client
|
|
||||||
|
|
||||||
Quando in una riga sono presenti sia ServerSocket che ClientSocket allora poppa la riga
|
|
||||||
e usa i socket per lanciare un thread/actor che faccia il piping dei dati
|
|
||||||
|
|
||||||
Ricerca riga deve avvenire sia tramite ServerConnID che ClientConnID se essi sono diversi come pianifico
|
|
||||||
Quindi l'ideale e' non usare una collection ma andare direttamente di Vector!
|
|
||||||
*/
|
|
||||||
|
|
||||||
#[derive(Error, Debug)]
|
#[derive(Error, Debug)]
|
||||||
pub enum PendingDataConnError {
|
pub enum PendingDataConnError {
|
||||||
#[error("Generic Failure")]
|
#[error("Generic Failure")]
|
||||||
@@ -59,11 +54,13 @@ impl RegisterStream {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Record with connection ID and TransportStream that can represent a client or a server
|
||||||
struct SideRecord {
|
struct SideRecord {
|
||||||
conn_id: Uuid,
|
conn_id: Uuid,
|
||||||
transport: Option<TransportStream>,
|
transport: Option<TransportStream>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// A DB Record, with a Server and a Client record containind IDs and Transports
|
||||||
struct Record {
|
struct Record {
|
||||||
server: SideRecord,
|
server: SideRecord,
|
||||||
client: SideRecord,
|
client: SideRecord,
|
||||||
@@ -78,16 +75,29 @@ impl Record {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// TODO: every 2 minutes verify the Records that have at least one stream invalidated and drop them
|
// TODO: every 2 minutes verify the Records that have at least one stream invalidated and drop them
|
||||||
|
/// This contains the database of records of Data Connections in the phase of creation.
|
||||||
|
/// The approach is that when DataConnection is needed, a new Pending is created together with
|
||||||
|
/// two IDs to send to the client and server and that they will use to create a new TransportStream
|
||||||
|
/// that will be then registered to the right Record.
|
||||||
|
/// When a Record has the two TransportStreams needed, the Record is completed and can be consumed
|
||||||
|
/// and used to create a DataConnection sending the ownership of the two streams to the
|
||||||
|
/// Data Connection Manager.
|
||||||
pub struct PendingDataConnManager {
|
pub struct PendingDataConnManager {
|
||||||
db: Vec<Record>,
|
db: Vec<Record>,
|
||||||
dcm_addr: Addr<DataConnManager>,
|
dataconn_manager: Addr<DataConnManager>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Actor for PendingDataConnManager {
|
||||||
|
type Context = Context<Self>;
|
||||||
}
|
}
|
||||||
|
|
||||||
impl PendingDataConnManager {
|
impl PendingDataConnManager {
|
||||||
pub fn new(dcm_addr: Addr<DataConnManager>) -> Self {
|
pub fn new(dataconn_manager_addr: Addr<DataConnManager>) -> Self {
|
||||||
PendingDataConnManager { db: Vec::new(), dcm_addr }
|
PendingDataConnManager { db: Vec::new(), dataconn_manager: dataconn_manager_addr }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// The Row retrieve need to use ServerConnID AND ClientConnID as keys, so the best approach is to
|
||||||
|
/// use a single Vec and check the two IDs while iterating.
|
||||||
fn retrieve_siderecord(&mut self, kind: &RegisterKind, conn_id: &Uuid) -> Option<&mut SideRecord> {
|
fn retrieve_siderecord(&mut self, kind: &RegisterKind, conn_id: &Uuid) -> Option<&mut SideRecord> {
|
||||||
use RegisterKind::*;
|
use RegisterKind::*;
|
||||||
let record = match match kind {
|
let record = match match kind {
|
||||||
@@ -104,10 +114,6 @@ impl PendingDataConnManager {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Actor for PendingDataConnManager {
|
|
||||||
type Context = Context<Self>;
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Handler<NewPendingConn> for PendingDataConnManager {
|
impl Handler<NewPendingConn> for PendingDataConnManager {
|
||||||
type Result = Result<(), PendingDataConnError>;
|
type Result = Result<(), PendingDataConnError>;
|
||||||
|
|
||||||
@@ -171,7 +177,7 @@ impl Handler<TryStartDataStream> for PendingDataConnManager {
|
|||||||
server_transport: real_record.server.transport.unwrap(), // Safety: We are sure this is Some
|
server_transport: real_record.server.transport.unwrap(), // Safety: We are sure this is Some
|
||||||
client_transport: real_record.client.transport.unwrap(), // Safety: We are sure this is Some
|
client_transport: real_record.client.transport.unwrap(), // Safety: We are sure this is Some
|
||||||
};
|
};
|
||||||
self.dcm_addr.do_send(msg);
|
self.dataconn_manager.do_send(msg);
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
} else {
|
} else {
|
||||||
|
|||||||
@@ -1,15 +1,14 @@
|
|||||||
use std::collections::HashMap;
|
use std::path::Path;
|
||||||
use actix::prelude::*;
|
use actix::prelude::*;
|
||||||
|
use rusqlite::{Connection};
|
||||||
use thiserror::Error;
|
use thiserror::Error;
|
||||||
|
|
||||||
// TODO: Probably it's better to remove the pub from inside the structs and impl a new() funct
|
|
||||||
|
|
||||||
#[derive(Error, Debug)]
|
#[derive(Error, Debug)]
|
||||||
pub enum DBError {
|
pub enum DBError {
|
||||||
#[error("Certificate is already registered with name {0}")]
|
#[error("Certificate is already registered with name {0}")]
|
||||||
CertAlreadyRegistered(String),
|
CertAlreadyRegistered(String),
|
||||||
// #[error("Generic Failure")]
|
#[error("Generic Failure")]
|
||||||
// GenericFailure,
|
GenericFailure,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Message)]
|
#[derive(Message)]
|
||||||
@@ -37,14 +36,29 @@ pub struct FetchName {
|
|||||||
pub cert: Vec<u8>,
|
pub cert: Vec<u8>,
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: Move into Sqlite DB with unique check on col1 and col2!!!! Right now name is not unique
|
fn init_db(conn: Connection) -> rusqlite::Result<Connection> {
|
||||||
|
conn.execute(
|
||||||
|
"CREATE TABLE IF NOT EXISTS servercert (
|
||||||
|
cert BLOB PRIMARY KEY,
|
||||||
|
name TEXT NOT NULL
|
||||||
|
)",
|
||||||
|
(), // empty list of parameters.
|
||||||
|
)?;
|
||||||
|
Ok(conn)
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: Right now name is not unique. Consider making it unique and checking it for duplication
|
||||||
pub struct ServerCertDB {
|
pub struct ServerCertDB {
|
||||||
db: HashMap<Vec<u8>, String>, // Cert to Name
|
conn: Connection,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ServerCertDB {
|
impl ServerCertDB {
|
||||||
pub fn new() -> Self {
|
fn new_in_memory() -> rusqlite::Result<Self> {
|
||||||
ServerCertDB { db: HashMap::new() }
|
Ok(ServerCertDB { conn: init_db(Connection::open_in_memory()?)? })
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn new<P: AsRef<Path>>(path: P) -> rusqlite::Result<Self> {
|
||||||
|
Ok(ServerCertDB { conn: init_db(Connection::open(path)?)? })
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -56,13 +70,23 @@ impl Handler<RegisterServer> for ServerCertDB {
|
|||||||
type Result = Result<(), DBError>;
|
type Result = Result<(), DBError>;
|
||||||
|
|
||||||
fn handle(&mut self, msg: RegisterServer, _ctx: &mut Self::Context) -> Self::Result {
|
fn handle(&mut self, msg: RegisterServer, _ctx: &mut Self::Context) -> Self::Result {
|
||||||
match self.db.get(&msg.cert) {
|
match self.conn.query_row(
|
||||||
None => {
|
"SELECT name FROM servercert WHERE cert = ?1",
|
||||||
self.db.insert(msg.cert, msg.name);
|
(&msg.cert,),
|
||||||
|
|row| row.get::<_, String>(0)
|
||||||
|
) {
|
||||||
|
Ok(name) => {
|
||||||
|
Err(DBError::CertAlreadyRegistered(name))
|
||||||
|
}
|
||||||
|
Err(rusqlite::Error::QueryReturnedNoRows) => {
|
||||||
|
self.conn.execute(
|
||||||
|
"INSERT INTO servercert (cert, name) VALUES (?1, ?2)",
|
||||||
|
(&msg.cert, &msg.name)
|
||||||
|
).map_err(|_| DBError::GenericFailure)?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
Some(name) => {
|
Err(_) => {
|
||||||
Err(DBError::CertAlreadyRegistered(name.clone()))
|
Err(DBError::GenericFailure)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -72,7 +96,12 @@ impl Handler<IsNameRegistered> for ServerCertDB {
|
|||||||
type Result = bool;
|
type Result = bool;
|
||||||
|
|
||||||
fn handle(&mut self, msg: IsNameRegistered, _ctx: &mut Self::Context) -> Self::Result {
|
fn handle(&mut self, msg: IsNameRegistered, _ctx: &mut Self::Context) -> Self::Result {
|
||||||
self.db.values().any(|x| *x == msg.name)
|
let count: u64 = self.conn.query_row(
|
||||||
|
"SELECT COUNT(*) FROM servercert WHERE name = ?1",
|
||||||
|
(&msg.name,),
|
||||||
|
|row| row.get(0)
|
||||||
|
).unwrap();
|
||||||
|
count > 0
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -80,7 +109,21 @@ impl Handler<FetchName> for ServerCertDB {
|
|||||||
type Result = Option<String>;
|
type Result = Option<String>;
|
||||||
|
|
||||||
fn handle(&mut self, msg: FetchName, _ctx: &mut Self::Context) -> Self::Result {
|
fn handle(&mut self, msg: FetchName, _ctx: &mut Self::Context) -> Self::Result {
|
||||||
self.db.get(&msg.cert).map(|s| s.to_owned())
|
match self.conn.query_row(
|
||||||
|
"SELECT name FROM servercert WHERE cert = ?1",
|
||||||
|
(&msg.cert,),
|
||||||
|
|row| row.get(0)
|
||||||
|
) {
|
||||||
|
Ok(name) => {
|
||||||
|
Some(name)
|
||||||
|
}
|
||||||
|
Err(rusqlite::Error::QueryReturnedNoRows) => {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
panic!("Error during FetchName: {}", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -88,6 +131,83 @@ impl Handler<UnregisterServer> for ServerCertDB {
|
|||||||
type Result = Option<String>;
|
type Result = Option<String>;
|
||||||
|
|
||||||
fn handle(&mut self, msg: UnregisterServer, _ctx: &mut Self::Context) -> Self::Result {
|
fn handle(&mut self, msg: UnregisterServer, _ctx: &mut Self::Context) -> Self::Result {
|
||||||
self.db.remove(&msg.cert)
|
match self.conn.query_row(
|
||||||
|
"SELECT name FROM servercert WHERE cert = ?1",
|
||||||
|
(&msg.cert,),
|
||||||
|
|row| row.get::<_, String>(0)
|
||||||
|
) {
|
||||||
|
Ok(name) => {
|
||||||
|
self.conn.execute(
|
||||||
|
"DELETE FROM servercert WHERE cert = ?1",
|
||||||
|
(&msg.cert,)
|
||||||
|
).unwrap();
|
||||||
|
Some(name)
|
||||||
|
}
|
||||||
|
Err(rusqlite::Error::QueryReturnedNoRows) => {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
panic!("Error during UnregisterServer: {}", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
#[actix::test]
|
||||||
|
async fn emptydb_isnameregistered() {
|
||||||
|
let servercert_db = ServerCertDB::new_in_memory().unwrap().start();
|
||||||
|
assert!(!servercert_db.send(IsNameRegistered { name: "test".into() }).await.unwrap());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[actix::test]
|
||||||
|
async fn emptyvec() {
|
||||||
|
let servercert_db = ServerCertDB::new_in_memory().unwrap().start();
|
||||||
|
assert!(servercert_db.send(RegisterServer { cert: vec![], name: "test".into() }).await.unwrap().is_ok());
|
||||||
|
assert!(servercert_db.send(IsNameRegistered { name: "test".into() }).await.unwrap());
|
||||||
|
assert_eq!(servercert_db.send(FetchName { cert: vec![] }).await.unwrap().unwrap(), "test");
|
||||||
|
assert_eq!(servercert_db.send(UnregisterServer { cert: vec![] }).await.unwrap().unwrap(), "test");
|
||||||
|
assert!(!servercert_db.send(IsNameRegistered { name: "test".into() }).await.unwrap());
|
||||||
|
assert!(servercert_db.send(FetchName { cert: vec![] }).await.unwrap().is_none());
|
||||||
|
assert!(servercert_db.send(UnregisterServer { cert: vec![] }).await.unwrap().is_none());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[actix::test]
|
||||||
|
async fn normalcert() {
|
||||||
|
let servercert_db = ServerCertDB::new_in_memory().unwrap().start();
|
||||||
|
let cert = vec![112, 111, 114, 99, 111, 100, 105, 111];
|
||||||
|
assert!(servercert_db.send(RegisterServer { cert: cert.clone(), name: "test2".into() }).await.unwrap().is_ok());
|
||||||
|
assert!(servercert_db.send(IsNameRegistered { name: "test2".into() }).await.unwrap());
|
||||||
|
assert_eq!(servercert_db.send(FetchName { cert: cert.clone() }).await.unwrap().unwrap(), "test2");
|
||||||
|
assert_eq!(servercert_db.send(UnregisterServer { cert: cert.clone() }).await.unwrap().unwrap(), "test2");
|
||||||
|
assert!(!servercert_db.send(IsNameRegistered { name: "test2".into() }).await.unwrap());
|
||||||
|
assert!(servercert_db.send(FetchName { cert: cert.clone() }).await.unwrap().is_none());
|
||||||
|
assert!(servercert_db.send(UnregisterServer { cert: cert.clone() }).await.unwrap().is_none());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[actix::test]
|
||||||
|
async fn cert2_remains_after_delete_cert1() {
|
||||||
|
let servercert_db = ServerCertDB::new_in_memory().unwrap().start();
|
||||||
|
let cert1 = vec![112, 111, 114, 99, 111, 100, 105, 111];
|
||||||
|
let cert2 = vec![67, 65, 78, 68, 69, 68, 73, 79];
|
||||||
|
assert!(servercert_db.send(RegisterServer { cert: cert1.clone(), name: "test3".into() }).await.unwrap().is_ok());
|
||||||
|
assert!(servercert_db.send(IsNameRegistered { name: "test3".into() }).await.unwrap());
|
||||||
|
assert_eq!(servercert_db.send(FetchName { cert: cert1.clone() }).await.unwrap().unwrap(), "test3");
|
||||||
|
|
||||||
|
assert!(servercert_db.send(RegisterServer { cert: cert2.clone(), name: "test4".into() }).await.unwrap().is_ok());
|
||||||
|
assert!(servercert_db.send(IsNameRegistered { name: "test4".into() }).await.unwrap());
|
||||||
|
assert_eq!(servercert_db.send(FetchName { cert: cert2.clone() }).await.unwrap().unwrap(), "test4");
|
||||||
|
|
||||||
|
assert_eq!(servercert_db.send(UnregisterServer { cert: cert1.clone() }).await.unwrap().unwrap(), "test3");
|
||||||
|
assert!(!servercert_db.send(IsNameRegistered { name: "test3".into() }).await.unwrap());
|
||||||
|
assert!(servercert_db.send(FetchName { cert: cert1.clone() }).await.unwrap().is_none());
|
||||||
|
assert!(servercert_db.send(UnregisterServer { cert: cert1.clone() }).await.unwrap().is_none());
|
||||||
|
|
||||||
|
assert!(servercert_db.send(RegisterServer { cert: cert2.clone(), name: "test4".into() }).await.unwrap().is_err());
|
||||||
|
assert!(servercert_db.send(IsNameRegistered { name: "test4".into() }).await.unwrap());
|
||||||
|
assert_eq!(servercert_db.send(FetchName { cert: cert2.clone() }).await.unwrap().unwrap(), "test4");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -208,6 +208,7 @@ async fn main() -> std::io::Result<()> {
|
|||||||
ServerContext { identity: server_ident, broker_root: broker_root.clone(), my_name: my_name.clone() }
|
ServerContext { identity: server_ident, broker_root: broker_root.clone(), my_name: my_name.clone() }
|
||||||
} else if exists_serverident {
|
} else if exists_serverident {
|
||||||
// Yes Server -> Use Server file as identity
|
// Yes Server -> Use Server file as identity
|
||||||
|
info!("Server Identity found. Confirming...");
|
||||||
let server_ident = LeafCertPair::load_from_file(serverident_path).unwrap();
|
let server_ident = LeafCertPair::load_from_file(serverident_path).unwrap();
|
||||||
let ctx = ServerContext { identity: server_ident, broker_root: broker_root.clone(), my_name: my_name.clone() };
|
let ctx = ServerContext { identity: server_ident, broker_root: broker_root.clone(), my_name: my_name.clone() };
|
||||||
server_name_confirmation(&ctx).await.unwrap();
|
server_name_confirmation(&ctx).await.unwrap();
|
||||||
|
|||||||
Reference in New Issue
Block a user