diff --git a/bonknet_broker/src/main.rs b/bonknet_broker/src/main.rs index 7b983be..5c1bb06 100644 --- a/bonknet_broker/src/main.rs +++ b/bonknet_broker/src/main.rs @@ -102,6 +102,7 @@ async fn main() { let guestserver_root_cert_der = Arc::clone(&guestserver_root_cert_der); let server_root_cert = Arc::clone(&server_root_cert); let scdb_addr = scdb_addr.clone(); + let pdcm_addr = pdcm_addr.clone(); let sm_addr = sm_addr.clone(); // Set up TLS service factory @@ -114,13 +115,15 @@ async fn main() { let guestserver_root_cert_der = Arc::clone(&guestserver_root_cert_der); let server_root_cert = Arc::clone(&server_root_cert); let scdb_addr = scdb_addr.clone(); + let pdcm_addr = pdcm_addr.clone(); let sm_addr = sm_addr.clone(); async move { let peer_certs = stream.get_ref().1.peer_certificates().unwrap(); let peer_cert_bytes = peer_certs.first().unwrap().to_vec(); let peer_root_cert_der = peer_certs.last().unwrap().clone(); - let scdb_addr = scdb_addr.clone(); - let sm_addr = sm_addr.clone(); + // let scdb_addr = scdb_addr.clone(); + // let pdcm_addr = pdcm_addr.clone(); + // let sm_addr = sm_addr.clone(); if peer_root_cert_der == *server_root_cert_der { info!("Server connection"); let mut transport = Framed::new(stream, LengthDelimitedCodec::new()); @@ -155,6 +158,10 @@ async fn main() { transport.send(rmp_serde::to_vec(&reply).unwrap().into()).await.unwrap(); server_subscribe_handler(transport, name, sm_addr).await; } + OpenDataStream(conn_id) => { + info!("OpenDataStream with {:?}", conn_id); + // TODO: OpenDataStream + } } } Err(e) => { @@ -171,7 +178,6 @@ async fn main() { guestserver_handler(transport, scdb_addr, &server_root_cert).await; } else if peer_root_cert_der == *client_root_cert_der { info!("Client connection"); - //pdcm_addr let codec = LengthDelimitedCodec::new(); let transport = Framed::new(stream, codec); client_handler(transport, sm_addr).await; @@ -339,6 +345,10 @@ async fn client_handler(mut transport: TransportStream, sm_addr: Addr { + info!("Upgrade to DataStream with conn_id {:?}", conn_id); + // TODO: Upgrade to DataStream + } } } Err(e) => { diff --git a/bonknet_broker/src/servermanager.rs b/bonknet_broker/src/servermanager.rs index ac79b48..5b2d89e 100644 --- a/bonknet_broker/src/servermanager.rs +++ b/bonknet_broker/src/servermanager.rs @@ -22,22 +22,15 @@ enum SendMsgResult { Rejected, } -#[derive(Message)] -#[rtype(result = "SendMsgResult")] -struct SendMsg { - msg: ToServerMessageBody, - reply_channel: oneshot::Sender -} - #[derive(Error, Debug)] -pub enum AsyncSendMsgError { +pub enum SendMsgError { #[error("Generic Failure")] GenericFailure, } #[derive(Message)] -#[rtype(result = "Result")] -struct AsyncSendMsg { +#[rtype(result = "Result")] +struct SendMsg { msg: ToServerMessageBody, } @@ -48,38 +41,6 @@ struct ServerTransporter { reply_channels: HashMap>, } -impl Handler for ServerTransporter { - type Result = ResponseFuture>; - - fn handle(&mut self, msg: AsyncSendMsg, _ctx: &mut Self::Context) -> Self::Result { - let (reply_channel_tx, reply_channel_rx) = oneshot::channel(); - let mut reply_id: u64; - if self.reply_channels.len() == u64::MAX as usize { - return Box::pin(fut::ready(Err(AsyncSendMsgError::GenericFailure))); - } - loop { - reply_id = random(); - if !self.reply_channels.contains_key(&reply_id) { - break; - } - } - self.reply_channels.insert(reply_id, reply_channel_tx); - let msg = ToServerMessage::Msg { - reply_id, - body: msg.msg, - }; - let payload: Bytes = rmp_serde::to_vec(&msg).unwrap().into(); - let arc_tx = self.tx.clone(); - Box::pin(async move { - arc_tx.lock().await.send(payload).await.map_err(|e| AsyncSendMsgError::GenericFailure)?; - info!("msg sent"); - let r = reply_channel_rx.await.unwrap(); - info!("reply received"); - Ok(r) - }) - } -} - impl ServerTransporter { fn new(transport: TransportStream) -> Self { let internal = transport.into_inner(); @@ -133,12 +94,13 @@ impl Actor for ServerTransporter { } impl Handler for ServerTransporter { - type Result = SendMsgResult; + type Result = ResponseFuture>; - fn handle(&mut self, msg: SendMsg, ctx: &mut Self::Context) -> Self::Result { + fn handle(&mut self, msg: SendMsg, _ctx: &mut Self::Context) -> Self::Result { + let (reply_channel_tx, reply_channel_rx) = oneshot::channel(); let mut reply_id: u64; if self.reply_channels.len() == u64::MAX as usize { - return SendMsgResult::Rejected; + return Box::pin(fut::ready(Err(SendMsgError::GenericFailure))); } loop { reply_id = random(); @@ -146,19 +108,20 @@ impl Handler for ServerTransporter { break; } } - self.reply_channels.insert(reply_id, msg.reply_channel); + self.reply_channels.insert(reply_id, reply_channel_tx); let msg = ToServerMessage::Msg { reply_id, body: msg.msg, }; let payload: Bytes = rmp_serde::to_vec(&msg).unwrap().into(); let arc_tx = self.tx.clone(); - ctx.spawn(async move { - arc_tx.lock().await.send(payload).await - }.into_actor(self).map(|res, _a, _ctx| { - info!("ToServerMsg sent result: {:?}", res); - })); - SendMsgResult::Accepted + Box::pin(async move { + arc_tx.lock().await.send(payload).await.map_err(|e| SendMsgError::GenericFailure)?; + info!("msg sent"); + let r = reply_channel_rx.await.unwrap(); + info!("reply received"); + Ok(r) + }) } } @@ -311,7 +274,7 @@ impl Handler for ServerManager { match pdcdb_addr.send(NewPendingConn { server_conn_id, client_conn_id }).await.unwrap() { Ok(_) => { let msg = ToServerMessageBody::Request { conn_id: server_conn_id }; - match sh_addr.send(AsyncSendMsg { msg }).await.unwrap() { + match sh_addr.send(SendMsg { msg }).await.unwrap() { Ok(reply) => match reply { FromServerReplyBody::RequestAccepted => { Ok(client_conn_id)