Various experiments
This commit is contained in:
@@ -1,35 +1,84 @@
|
||||
use tokio::net::{TcpListener};
|
||||
use std::io::{BufReader, Error, ErrorKind};
|
||||
use std::collections::HashMap;
|
||||
use tokio::net::{TcpListener, TcpStream};
|
||||
use std::net::{SocketAddr};
|
||||
use std::sync::{Arc};
|
||||
use std::time::Instant;
|
||||
use futures::stream::{Stream, StreamExt};
|
||||
use tokio_rustls::{TlsAcceptor};
|
||||
use tokio_rustls::rustls::{RootCertStore, ServerConfig};
|
||||
use tokio_rustls::rustls::server::WebPkiClientVerifier;
|
||||
use rustls_pemfile::{read_one, Item};
|
||||
use tokio::io::{AsyncWriteExt, copy, split};
|
||||
use tokio_rustls::rustls::pki_types::{CertificateDer, PrivatePkcs8KeyDer};
|
||||
use serde::{Serialize, Deserialize};
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio_rustls::server::TlsStream;
|
||||
use tokio_util::codec::{Framed, LengthDelimitedCodec};
|
||||
use libbonknet::{load_prkey, load_cert};
|
||||
|
||||
fn load_cert(filename: &str) -> std::io::Result<CertificateDer> {
|
||||
let cert_file = std::fs::File::open(filename).unwrap();
|
||||
let mut buf = std::io::BufReader::new(cert_file);
|
||||
if let Item::X509Certificate(cert) = read_one(&mut buf).unwrap().unwrap() {
|
||||
Ok(cert)
|
||||
} else {
|
||||
eprintln!("File {} doesn't contain a X509 Certificate", filename);
|
||||
Err(Error::new(ErrorKind::InvalidInput, "no x509 cert"))
|
||||
}
|
||||
type FramedStream = Framed<TlsStream<TcpStream>,LengthDelimitedCodec>;
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
enum ClientMessage {
|
||||
Response { status_code: u32, msg: Option<String> },
|
||||
Announce { name: String },
|
||||
Required { id: String },
|
||||
NotRequired { id: String },
|
||||
}
|
||||
|
||||
fn load_prkey(filename: &str) -> std::io::Result<PrivatePkcs8KeyDer> {
|
||||
let prkey_file = std::fs::File::open(filename).unwrap();
|
||||
let mut buf = BufReader::new(prkey_file);
|
||||
if let Item::Pkcs8Key(pkey) = read_one(&mut buf).unwrap().unwrap() {
|
||||
Ok(pkey)
|
||||
} else {
|
||||
eprintln!("File {} doesn't contain a Pkcs8 Private Key", filename);
|
||||
Err(Error::new(ErrorKind::InvalidInput, "no pkcs8key"))
|
||||
}
|
||||
async fn process_client(stream: TlsStream<TcpStream>, peer_addr: SocketAddr) {
|
||||
let transport = Framed::new(stream, LengthDelimitedCodec::new());
|
||||
transport.for_each(|item| async move {
|
||||
match item {
|
||||
Ok(frame) => {
|
||||
let a: ClientMessage = rmp_serde::from_slice(&frame).unwrap();
|
||||
println!("{:?}: {:?}", peer_addr, a);
|
||||
},
|
||||
Err(e) => {
|
||||
println!("{:?}: ERROR: {}", peer_addr, e);
|
||||
}
|
||||
}
|
||||
}).await;
|
||||
}
|
||||
|
||||
struct ClientState {
|
||||
framedstream: FramedStream,
|
||||
last_life_signal: Instant,
|
||||
}
|
||||
|
||||
struct ClientConnectionManager {
|
||||
registered_clients: Arc<HashMap<String,ClientState>>,
|
||||
unreg_clients: Arc<Vec<ClientState>>,
|
||||
}
|
||||
|
||||
impl ClientConnectionManager {
|
||||
async fn new_and_initialize(port: u16, tlsconfig: ServerConfig) -> ClientConnectionManager {
|
||||
let acceptor = TlsAcceptor::from(Arc::new(tlsconfig));
|
||||
let listener = TcpListener::bind(format!("localhost:{}", port)).await.unwrap();
|
||||
let registered_clients = Arc::new(HashMap::new());
|
||||
let unreg_clients = Arc::new(Vec::new());
|
||||
tokio::spawn(async move {
|
||||
let task_acceptor = acceptor;
|
||||
loop {
|
||||
let (stream, peer_addr) = listener.accept().await.unwrap();
|
||||
let acceptor = task_acceptor.clone();
|
||||
let stream = acceptor.accept(stream).await.unwrap();
|
||||
|
||||
tokio::spawn(async move {
|
||||
//let transport = Framed::new(stream, LengthDelimitedCodec::new());
|
||||
process_client(stream, peer_addr).await;
|
||||
});
|
||||
}
|
||||
});
|
||||
let ccm = ClientConnectionManager { registered_clients, unreg_clients};
|
||||
ccm
|
||||
}
|
||||
|
||||
async fn process_new_client(&mut self, transport: FramedStream, _peer_addr: SocketAddr) {
|
||||
let state = ClientState{
|
||||
framedstream: transport,
|
||||
last_life_signal: Instant::now(),
|
||||
};
|
||||
self.unreg_clients.push(state);
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
@@ -52,24 +101,37 @@ async fn main() {
|
||||
|
||||
let listener = TcpListener::bind("localhost:6379").await.unwrap();
|
||||
|
||||
// Create Queue Binder
|
||||
|
||||
loop {
|
||||
let (stream, peer_addr) = listener.accept().await.unwrap();
|
||||
let acceptor = acceptor.clone();
|
||||
|
||||
let fut = async move {
|
||||
let stream = acceptor.accept(stream).await?;
|
||||
let (mut reader, mut writer) = split(stream);
|
||||
let n = copy(&mut reader, &mut writer).await?;
|
||||
writer.flush().await?;
|
||||
println!("Echo: {} - {}", peer_addr, n);
|
||||
|
||||
Ok(()) as std::io::Result<()>
|
||||
};
|
||||
let stream = acceptor.accept(stream).await.unwrap();
|
||||
|
||||
tokio::spawn(async move {
|
||||
if let Err(err) = fut.await {
|
||||
eprintln!("{:?}", err);
|
||||
}
|
||||
//let transport = Framed::new(stream, LengthDelimitedCodec::new());
|
||||
process_client(stream, peer_addr).await;
|
||||
});
|
||||
|
||||
// let msg1 = ClientMessage::Required { id: "Testo".into() };
|
||||
// let msg2 = ClientMessage::NotRequired { id: "Testo2".into() };
|
||||
// transport.send(rmp_serde::to_vec(&msg1).unwrap().into()).await.unwrap();
|
||||
// transport.send(rmp_serde::to_vec(&msg2).unwrap().into()).await.unwrap();
|
||||
|
||||
// let fut = async move {
|
||||
// let stream = acceptor.accept(stream).await?;
|
||||
// let (mut reader, mut writer) = split(stream);
|
||||
// let n = copy(&mut reader, &mut writer).await?;
|
||||
// writer.flush().await?;
|
||||
// println!("Echo: {} - {}", peer_addr, n);
|
||||
//
|
||||
// Ok(()) as std::io::Result<()>
|
||||
// };
|
||||
//
|
||||
// tokio::spawn(async move {
|
||||
// if let Err(err) = fut.await {
|
||||
// eprintln!("{:?}", err);
|
||||
// }
|
||||
// });
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user