Solved conundrum about Server separation of Subscribe and SendCommand streams

This commit is contained in:
2024-02-14 15:28:08 +01:00
parent 9e3d4c5fe3
commit 37c76aba22
3 changed files with 254 additions and 110 deletions

View File

@@ -44,7 +44,7 @@ async fn main() -> std::io::Result<()> {
let mut myserver_prkey: Option<PrivatePkcs8KeyDer> = None;
match transport.next().await {
None => {
info!("None in the transport.next() ???");
panic!("None in the transport");
}
Some(item) => match item {
Ok(buf) => {
@@ -69,17 +69,112 @@ async fn main() -> std::io::Result<()> {
}
}
}
transport.close().await.unwrap();
if let (Some(server_cert), Some(server_prkey)) = (myserver_cert, myserver_prkey) {
let tlsconfig = ClientConfig::builder()
let tlsconfig = Arc::new(ClientConfig::builder()
.with_root_certificates(broker_root_cert_store)
.with_client_auth_cert(vec![server_cert, root_server_cert], server_prkey.into())
.unwrap();
let connector = TlsConnector::from(Arc::new(tlsconfig));
.unwrap());
let connector = TlsConnector::from(Arc::clone(&tlsconfig));
let dnsname = ServerName::try_from("localhost").unwrap();
let stream = TcpStream::connect("localhost:2541").await?;
let stream = connector.connect(dnsname, stream).await?;
let transport = Framed::new(stream, LengthDelimitedCodec::new());
let mut transport = Framed::new(stream, LengthDelimitedCodec::new());
let msg = FromServerConnTypeMessage::SendCommand;
transport.send(rmp_serde::to_vec(&msg).unwrap().into()).await.unwrap();
match transport.next().await {
None => {
panic!("None in the transport");
}
Some(item) => match item {
Ok(buf) => {
use ToServerConnTypeReply::*;
let msg: ToServerConnTypeReply = rmp_serde::from_slice(&buf).unwrap();
info!("{:?}", msg);
match msg {
OkSendCommand => {
info!("Stream set in SendCommand mode");
}
OkSubscribe => {
panic!("Unexpected OkSubscribe");
}
GenericFailure => {
panic!("Generic Failure during SendCommand");
}
}
}
Err(e) => {
info!("Disconnection: {:?}", e);
}
}
}
// Begin WhoAmI
let msg = FromServerCommandMessage::WhoAmI;
transport.send(rmp_serde::to_vec(&msg).unwrap().into()).await.unwrap();
match transport.next().await {
None => {
panic!("None in the transport");
}
Some(item) => match item {
Ok(buf) => {
use ToServerCommandReply::*;
let msg: ToServerCommandReply = rmp_serde::from_slice(&buf).unwrap();
info!("{:?}", msg);
match msg {
YouAre { name } => {
info!("I am {}", name);
}
GenericFailure => {
panic!("Generic failure during WhoAmI");
}
_ => {
panic!("Unexpected reply");
}
}
}
Err(e) => {
info!("Disconnection: {:?}", e);
}
}
}
transport.close().await.expect("Error during transport stream close");
// Start Subscribe Stream
let connector = TlsConnector::from(Arc::clone(&tlsconfig));
let dnsname = ServerName::try_from("localhost").unwrap();
let stream = TcpStream::connect("localhost:2541").await?;
let stream = connector.connect(dnsname, stream).await?;
let mut transport = Framed::new(stream, LengthDelimitedCodec::new());
let msg = FromServerConnTypeMessage::Subscribe;
transport.send(rmp_serde::to_vec(&msg).unwrap().into()).await.unwrap();
match transport.next().await {
None => {
panic!("None in the transport");
}
Some(item) => match item {
Ok(buf) => {
use ToServerConnTypeReply::*;
let msg: ToServerConnTypeReply = rmp_serde::from_slice(&buf).unwrap();
info!("{:?}", msg);
match msg {
OkSubscribe => {
info!("Stream set in Subscribe mode");
}
OkSendCommand => {
panic!("Unexpected OkSendCommand");
}
GenericFailure => {
panic!("Generic Failure during SendCommand");
}
}
}
Err(e) => {
info!("Disconnection: {:?}", e);
}
}
}
// Subscribe consume
transport.for_each(|item| async move {
match item {
Ok(buf) => {
@@ -89,14 +184,6 @@ async fn main() -> std::io::Result<()> {
Required { id } => {
info!("I'm required with Connection ID {}", id);
}
YouAre(name) => match name {
YouAreValues::Registered { name } => {
info!("I am {}", name);
}
YouAreValues::NotRegistered => {
info!("I'm not registered");
}
}
}
}
Err(e) => {