feat: simple ws server impl
This commit is contained in:
parent
13a619866d
commit
3770939b9f
2 changed files with 75 additions and 0 deletions
11
Cargo.toml
11
Cargo.toml
|
@ -10,9 +10,20 @@ name = "mmp-port"
|
||||||
path = "src/port/main.rs"
|
path = "src/port/main.rs"
|
||||||
required-features = ["port"]
|
required-features = ["port"]
|
||||||
|
|
||||||
|
[[bin]]
|
||||||
|
name = "mmp-server"
|
||||||
|
path = "src/server/main.rs"
|
||||||
|
required-features = ["server"]
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
midi-msg = { version = "0.7", optional = true }
|
midi-msg = { version = "0.7", optional = true }
|
||||||
midir = { version = "0.10", optional = true }
|
midir = { version = "0.10", optional = true }
|
||||||
|
tokio = { version = "1.41", features = ["net", "rt", "macros", "sync"], optional = true }
|
||||||
|
tokio-tungstenite = { version = "0.24", optional = true }
|
||||||
|
futures-util = { version = "0.3", optional = true }
|
||||||
|
serde_json = { version = "1.0", optional = true }
|
||||||
|
|
||||||
[features]
|
[features]
|
||||||
default = []
|
default = []
|
||||||
port = ["dep:midi-msg", "dep:midir"]
|
port = ["dep:midi-msg", "dep:midir"]
|
||||||
|
server = ["dep:tokio", "dep:tokio-tungstenite", "dep:futures-util", "dep:serde_json"]
|
||||||
|
|
64
src/server/main.rs
Normal file
64
src/server/main.rs
Normal file
|
@ -0,0 +1,64 @@
|
||||||
|
use std::{env, io::Error};
|
||||||
|
|
||||||
|
use futures_util::TryStreamExt;
|
||||||
|
use tokio::{io::AsyncWriteExt, net::{TcpListener, TcpStream}, sync::mpsc};
|
||||||
|
|
||||||
|
#[tokio::main(flavor = "current_thread")]
|
||||||
|
async fn main() -> Result<(), Error> {
|
||||||
|
let addr = env::args().nth(1).unwrap_or_else(|| "127.0.0.1:64080".to_string());
|
||||||
|
let addr2 = env::args().nth(2).unwrap_or_else(|| "0.0.0.0:60080".to_string());
|
||||||
|
|
||||||
|
let listener = TcpListener::bind(&addr).await.unwrap();
|
||||||
|
println!("Listening on: {}", addr);
|
||||||
|
|
||||||
|
let (tx, mut rx) = mpsc::unbounded_channel::<[u8;2]>();
|
||||||
|
|
||||||
|
tokio::spawn(async move {
|
||||||
|
let control_socket = TcpListener::bind(&addr2).await.unwrap();
|
||||||
|
while let Ok((mut stream, addr)) = control_socket.accept().await {
|
||||||
|
// don't spawn a new task: we only serve controls to one user at a time
|
||||||
|
println!("user {addr} connected, sending control signals...");
|
||||||
|
while let Some([ctrl, val]) = rx.recv().await {
|
||||||
|
if let Err(e) = stream.write_all(&[ctrl, val]).await {
|
||||||
|
eprintln!("error sending control, disconnecting -- {e}");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
while let Ok((stream, _)) = listener.accept().await {
|
||||||
|
tokio::spawn(accept_connection(stream, tx.clone()));
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn accept_connection(stream: TcpStream, tx: mpsc::UnboundedSender<[u8;2]>) {
|
||||||
|
let addr = stream.peer_addr().expect("connected streams should have a peer address");
|
||||||
|
println!("Peer address: {}", addr);
|
||||||
|
|
||||||
|
let mut ws_stream = tokio_tungstenite::accept_async(stream)
|
||||||
|
.await
|
||||||
|
.expect("Error during the websocket handshake occurred");
|
||||||
|
|
||||||
|
println!("New WebSocket connection: {}", addr);
|
||||||
|
|
||||||
|
loop {
|
||||||
|
match ws_stream.try_next().await {
|
||||||
|
Err(e) => break eprintln!("user {addr} disconnected with error: {e}"),
|
||||||
|
Ok(None) => break println!("user {addr} disconnected"),
|
||||||
|
Ok(Some(msg)) => match msg {
|
||||||
|
tokio_tungstenite::tungstenite::Message::Text(txt) => {
|
||||||
|
let Ok([cmd, val]) = serde_json::from_str::<[u8;2]>(&txt) else {
|
||||||
|
break eprintln!("invalid websocket command: {txt}");
|
||||||
|
};
|
||||||
|
if let Err(e) = tx.send([cmd, val]) {
|
||||||
|
eprintln!("control channel is dead!");
|
||||||
|
}
|
||||||
|
},
|
||||||
|
m => eprintln!("unexpected message: {m:?}"),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in a new issue