Implement the skeleton for the Server Session handling

This commit is contained in:
2024-02-15 18:01:47 +01:00
parent 37c76aba22
commit f8feb9db81
6 changed files with 317 additions and 71 deletions

View File

@@ -1,6 +1,11 @@
mod servercertdb;
use servercertdb::*;
use actix::prelude::*;
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::{Arc};
use std::time::{Instant, Duration};
use actix::fut::wrap_future;
use libbonknet::*;
use rustls::{RootCertStore, ServerConfig};
use rustls::server::WebPkiClientVerifier;
@@ -9,12 +14,18 @@ use actix_server::Server;
use actix_rt::net::TcpStream;
use actix_service::{ServiceFactoryExt as _};
use futures::{StreamExt, SinkExt};
use thiserror::Error;
use tokio_util::codec::{Framed, LengthDelimitedCodec};
use rand::random;
use tokio_util::codec::{Framed, FramedRead, FramedWrite, LengthDelimitedCodec};
use tracing::{info, error};
use rcgen::{Certificate, CertificateParams, DnType, KeyPair};
use tokio::io::{ReadHalf, WriteHalf};
use tokio_util::bytes::{Bytes, BytesMut};
use tokio::io::Error;
use tokio::sync::{oneshot, Mutex};
type TransportStream = Framed<TlsStream<TcpStream>, LengthDelimitedCodec>;
type TransportStreamTx = FramedWrite<WriteHalf<TlsStream<TcpStream>>, LengthDelimitedCodec>;
type TransportStreamRx = FramedRead<ReadHalf<TlsStream<TcpStream>>, LengthDelimitedCodec>;
struct ServerCert {
cert: Vec<u8>,
@@ -35,71 +46,128 @@ fn generate_server_cert(root_cert: &Certificate, name: &str) -> ServerCert {
ServerCert { cert, prkey }
}
#[derive(Error, Debug)]
enum DBError {
#[error("Certificate is already registered with name {0}")]
CertAlreadyRegistered(String),
// #[error("Generic Failure")]
// GenericFailure,
#[derive(MessageResponse)]
enum SendMsgResult {
Accepted,
Rejected,
}
#[derive(Message)]
#[rtype(result = "bool")]
struct IsNameRegistered {
name: String,
#[rtype(result = "SendMsgResult")]
struct SendMsg {
msg: ToServerMessageBody,
reply_channel: oneshot::Sender<FromServerReplyBody>
}
#[derive(Message)]
#[rtype(result = "Result<(), DBError>")]
struct RegisterServer {
cert: Vec<u8>,
name: String,
#[rtype(result = "()")]
struct SendPing;
struct ServerTransporter {
rx: Option<TransportStreamRx>,
tx: Arc<Mutex<TransportStreamTx>>,
last_transmission: Instant,
reply_channels: HashMap<u64, oneshot::Sender<FromServerReplyBody>>,
}
#[derive(Message)]
#[rtype(result = "Option<String>")]
struct FetchName {
cert: Vec<u8>,
}
// TODO: Move into Sqlite DB with unique check on col1 and col2!!!! Right now name is not unique
struct ServerCertDB {
db: HashMap<Vec<u8>, String>, // Cert to Name
}
impl Actor for ServerCertDB {
type Context = Context<Self>;
}
impl Handler<RegisterServer> for ServerCertDB {
type Result = Result<(), DBError>;
fn handle(&mut self, msg: RegisterServer, _ctx: &mut Self::Context) -> Self::Result {
match self.db.get(&msg.cert) {
None => {
self.db.insert(msg.cert, msg.name);
Ok(())
}
Some(name) => {
Err(DBError::CertAlreadyRegistered(name.clone()))
}
impl ServerTransporter {
fn new(transport: TransportStream) -> Self {
let internal = transport.into_inner();
let (srx, stx) = tokio::io::split(internal);
let codec = LengthDelimitedCodec::new();
let rx = FramedRead::new(srx, codec.clone());
let tx = FramedWrite::new(stx, codec.clone());
ServerTransporter {
rx: Some(rx),
tx: Arc::new(Mutex::new(tx)),
last_transmission: Instant::now(),
reply_channels: HashMap::new(),
}
}
}
impl Handler<IsNameRegistered> for ServerCertDB {
type Result = bool;
impl Actor for ServerTransporter {
type Context = Context<Self>;
fn handle(&mut self, msg: IsNameRegistered, _ctx: &mut Self::Context) -> Self::Result {
self.db.values().any(|x| *x == msg.name)
fn started(&mut self, ctx: &mut Self::Context) {
let rx = self.rx.take().expect("Rx Stream not found");
ctx.add_stream(rx);
ctx.run_interval(Duration::from_secs(60), |_s, c| {
c.notify(SendPing);
});
}
}
impl Handler<FetchName> for ServerCertDB {
type Result = Option<String>;
impl Handler<SendPing> for ServerTransporter {
type Result = ();
fn handle(&mut self, msg: FetchName, _ctx: &mut Self::Context) -> Self::Result {
self.db.get(&msg.cert).map(|s| s.to_owned())
fn handle(&mut self, _msg: SendPing, ctx: &mut Self::Context) -> Self::Result {
let msg = ToServerMessage::Ping;
let payload: Bytes = rmp_serde::to_vec(&msg).unwrap().into();
let arc_tx = self.tx.clone();
ctx.spawn(wrap_future::<_, Self>(async move {
arc_tx.lock().await.send(payload).await
}).map(|res, _a, _ctx| {
info!("Ping sent result: {:?}", res);
}));
}
}
impl Handler<SendMsg> for ServerTransporter {
type Result = SendMsgResult;
fn handle(&mut self, msg: SendMsg, ctx: &mut Self::Context) -> Self::Result {
let mut reply_id: u64;
if self.reply_channels.len() == u64::MAX as usize {
return SendMsgResult::Rejected;
}
loop {
reply_id = random();
if !self.reply_channels.contains_key(&reply_id) {
break;
}
}
self.reply_channels.insert(reply_id, msg.reply_channel);
let msg = ToServerMessage::Msg {
reply_id,
body: msg.msg,
};
let payload: Bytes = rmp_serde::to_vec(&msg).unwrap().into();
let arc_tx = self.tx.clone();
ctx.spawn(async move {
arc_tx.lock().await.send(payload).await
}.into_actor(self).map(|res, _a, _ctx| {
info!("ToServerMsg sent result: {:?}", res);
}));
SendMsgResult::Accepted
}
}
impl StreamHandler<Result<BytesMut, Error>> for ServerTransporter {
fn handle(&mut self, item: Result<BytesMut, Error>, _ctx: &mut Self::Context) {
match item {
Ok(buf) => {
use FromServerReply::*;
let msg: FromServerReply = rmp_serde::from_slice(&buf).unwrap();
match msg {
Pong => {
self.last_transmission = Instant::now();
}
Msg { reply_id, body } => match self.reply_channels.remove(&reply_id) {
None => {}
Some(reply_tx) => {
if let Err(_e) = reply_tx.send(body) {
error!("Oneshot channel {} got invalidated! No reply sent.", reply_id);
}
}
}
}
}
Err(e) => {
error!("ERROR {:?}", e);
}
}
}
}
@@ -189,7 +257,12 @@ async fn main() {
server_command_handler(transport, peer_cert_bytes, &server_db_addr).await;
}
Subscribe => {
info!("Subscribe Stream")
info!("Subscribe Stream");
let reply = ToServerConnTypeReply::OkSubscribe;
transport.send(rmp_serde::to_vec(&reply).unwrap().into()).await.unwrap();
// TODO: If I pass transport away and the service returns, what happen to the connection?
// in theory it will remain open but better check
server_subscribe_handler(transport).await;
}
}
}
@@ -220,6 +293,21 @@ async fn main() {
.unwrap();
}
async fn server_subscribe_handler(transport: TransportStream) {
let h = ServerTransporter::new(transport).start();
info!("Actor spawned");
tokio::time::sleep(Duration::from_secs(5)).await;
info!("5 seconds elapsed, sending msg");
let (tx, rx) = oneshot::channel();
h.send(SendMsg {
msg: ToServerMessageBody::Required { id: "session_id".to_string() },
reply_channel: tx,
}).await.unwrap();
if let Ok(item) = rx.await {
info!("Response: {:?}", item);
}
}
async fn server_command_handler(mut transport: TransportStream, peer_cert_bytes: Vec<u8>, server_db_addr: &Addr<ServerCertDB>) {
loop {
match transport.next().await {