From 3770939b9f0da78c6ead75a4b6ec116e592554ce Mon Sep 17 00:00:00 2001 From: alemi Date: Sun, 10 Nov 2024 02:00:51 +0100 Subject: [PATCH] feat: simple ws server impl --- Cargo.toml | 11 ++++++++ src/server/main.rs | 64 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 75 insertions(+) create mode 100644 src/server/main.rs diff --git a/Cargo.toml b/Cargo.toml index 8c1d7aa..d8e43e2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,9 +10,20 @@ name = "mmp-port" path = "src/port/main.rs" required-features = ["port"] +[[bin]] +name = "mmp-server" +path = "src/server/main.rs" +required-features = ["server"] + [dependencies] midi-msg = { version = "0.7", optional = true } midir = { version = "0.10", optional = true } +tokio = { version = "1.41", features = ["net", "rt", "macros", "sync"], optional = true } +tokio-tungstenite = { version = "0.24", optional = true } +futures-util = { version = "0.3", optional = true } +serde_json = { version = "1.0", optional = true } + [features] default = [] port = ["dep:midi-msg", "dep:midir"] +server = ["dep:tokio", "dep:tokio-tungstenite", "dep:futures-util", "dep:serde_json"] diff --git a/src/server/main.rs b/src/server/main.rs new file mode 100644 index 0000000..404d5e7 --- /dev/null +++ b/src/server/main.rs @@ -0,0 +1,64 @@ +use std::{env, io::Error}; + +use futures_util::TryStreamExt; +use tokio::{io::AsyncWriteExt, net::{TcpListener, TcpStream}, sync::mpsc}; + +#[tokio::main(flavor = "current_thread")] +async fn main() -> Result<(), Error> { + let addr = env::args().nth(1).unwrap_or_else(|| "127.0.0.1:64080".to_string()); + let addr2 = env::args().nth(2).unwrap_or_else(|| "0.0.0.0:60080".to_string()); + + let listener = TcpListener::bind(&addr).await.unwrap(); + println!("Listening on: {}", addr); + + let (tx, mut rx) = mpsc::unbounded_channel::<[u8;2]>(); + + tokio::spawn(async move { + let control_socket = TcpListener::bind(&addr2).await.unwrap(); + while let Ok((mut stream, addr)) = control_socket.accept().await { + // don't spawn a new task: we only serve controls to one user at a time + println!("user {addr} connected, sending control signals..."); + while let Some([ctrl, val]) = rx.recv().await { + if let Err(e) = stream.write_all(&[ctrl, val]).await { + eprintln!("error sending control, disconnecting -- {e}"); + break; + } + } + } + }); + + while let Ok((stream, _)) = listener.accept().await { + tokio::spawn(accept_connection(stream, tx.clone())); + } + + Ok(()) +} + +async fn accept_connection(stream: TcpStream, tx: mpsc::UnboundedSender<[u8;2]>) { + let addr = stream.peer_addr().expect("connected streams should have a peer address"); + println!("Peer address: {}", addr); + + let mut ws_stream = tokio_tungstenite::accept_async(stream) + .await + .expect("Error during the websocket handshake occurred"); + + println!("New WebSocket connection: {}", addr); + + loop { + match ws_stream.try_next().await { + Err(e) => break eprintln!("user {addr} disconnected with error: {e}"), + Ok(None) => break println!("user {addr} disconnected"), + Ok(Some(msg)) => match msg { + tokio_tungstenite::tungstenite::Message::Text(txt) => { + let Ok([cmd, val]) = serde_json::from_str::<[u8;2]>(&txt) else { + break eprintln!("invalid websocket command: {txt}"); + }; + if let Err(e) = tx.send([cmd, val]) { + eprintln!("control channel is dead!"); + } + }, + m => eprintln!("unexpected message: {m:?}"), + }, + } + } +}