From a0071ddc38298c992eeda571bae7fb3f71c09e7a Mon Sep 17 00:00:00 2001 From: alemi Date: Sat, 29 Jun 2024 00:40:01 +0200 Subject: [PATCH] feat: add barebones wss transport tokio already came back... --- server/Cargo.toml | 15 +++++++ server/src/transport/laminar.rs | 28 ++++++++++++ server/src/transport/mod.rs | 72 +++++++++++++++++++++++++++++++ server/src/transport/websocket.rs | 64 +++++++++++++++++++++++++++ 4 files changed, 179 insertions(+) create mode 100644 server/src/transport/laminar.rs create mode 100644 server/src/transport/mod.rs create mode 100644 server/src/transport/websocket.rs diff --git a/server/Cargo.toml b/server/Cargo.toml index caf53bc..395d4d6 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -7,3 +7,18 @@ edition = "2021" [dependencies] tracing = "0.1" +thiserror = "1.0.61" +tokio = { version = "1.38.0", features = ["full"], optional = true } +tokio-tungstenite = { version = "0.23.1", features = ["tokio-native-tls"], optional = true } +futures-util = "0.3.30" +async-trait = "0.1.80" +serde = { version = "1.0", features = ["derive"] } +serde_json = { version = "1.0.118", optional = true } + +scct-model = { path = "../model/" } + + +[features] +default = ["websocket"] +websocket = ["dep:tokio", "dep:tokio-tungstenite", "dep:serde_json"] +laminar = [] diff --git a/server/src/transport/laminar.rs b/server/src/transport/laminar.rs new file mode 100644 index 0000000..8a9b210 --- /dev/null +++ b/server/src/transport/laminar.rs @@ -0,0 +1,28 @@ +use super::{Transport, TransportResult, TransportSink, TransportStream}; +use scct_model::proto::c2s; + +pub struct LaminarTransport; + +#[async_trait::async_trait] +impl Transport for LaminarTransport { + async fn serve(self, _addr: String) -> TransportResult> { + todo!() + } +} + +pub struct LaminarStream; +pub struct LaminarSink; + +#[async_trait::async_trait] +impl TransportStream for LaminarStream { + async fn pop(&mut self) -> TransportResult> { + todo!() + } +} + +#[async_trait::async_trait] +impl TransportSink for LaminarSink { + async fn push(&mut self, msg: c2s::c::Packet) -> TransportResult<()> { + Ok(()) + } +} diff --git a/server/src/transport/mod.rs b/server/src/transport/mod.rs new file mode 100644 index 0000000..56b5361 --- /dev/null +++ b/server/src/transport/mod.rs @@ -0,0 +1,72 @@ +//! this part is mostly temporary, to get us quickly running with something web-ready (websockets) +//! +//! once we got most things running we can swap out websockets for laminar (or whatever we settle +//! on) just by respecting this trait. once the swap is complete, transport and server can be +//! coupled more, if necessary +//! +//! note that web app will still need a websocket, but it's probably better to make a bridge +//! translating server's laminar/whatev into websockets, rather than having each server also bundle +//! a websocket server just in case +//! +//! so basically most likely everything down here will get coupled more and merged with the rest + +#[cfg(feature = "websocket")] +pub mod websocket; + +#[cfg(feature = "laminar")] +pub mod laminar; + +#[derive(Debug, thiserror::Error)] +pub enum TransportError { + + #[cfg(feature = "websocket")] + #[error("websocket error: {0}")] + WebSocket(#[from] tokio_tungstenite::tungstenite::Error), + + #[cfg(feature = "websocket")] + #[error("invalid json: {0}")] + Json(#[from] serde_json::Error), + + #[error("system i/o error: {0}")] + IO(#[from] std::io::Error), +} + +pub type TransportResult = Result; + + + +use scct_model::proto::c2s; + +#[async_trait::async_trait] +pub trait Transport : Sized { + // TODO can we get rid of tokio-specifc channel here without losing async blocking and throwing + // in new crates? + async fn serve(self, addr: String) -> Result, TransportError>; +} + + +// i hate these names: "pop" and "push" are for queues, but "send" and "recv" are used by +// underlying transport so it gets a name clash mess and trait disambiguations are nasty +// TODO rename these? + +#[async_trait::async_trait] +pub trait TransportStream: Send + Sync { + async fn pop(&mut self) -> Result, TransportError>; +} + +#[async_trait::async_trait] +pub trait TransportSink: Send + Sync { + async fn push(&mut self, msg: c2s::c::Packet) -> Result<(), TransportError>; +} + + +#[allow(unreachable_code)] +pub fn new() -> impl Transport { + #[cfg(feature = "laminar")] + return laminar::LaminarTransport; + + #[cfg(feature = "websocket")] + return websocket::WSTransport; + + panic!("no availabe C2S transport available in this binary!"); +} diff --git a/server/src/transport/websocket.rs b/server/src/transport/websocket.rs new file mode 100644 index 0000000..db126b7 --- /dev/null +++ b/server/src/transport/websocket.rs @@ -0,0 +1,64 @@ +use super::{Transport, TransportError, TransportResult, TransportSink, TransportStream}; + +use futures_util::{stream::{SplitSink, SplitStream, TryStreamExt}, SinkExt, StreamExt}; + +use scct_model::proto::c2s; +use tokio::{sync::mpsc, net::{TcpListener, TcpStream}}; +use tokio_tungstenite::{tungstenite::protocol::Message, WebSocketStream}; + +pub type WSRx = SplitStream>; +pub type WSTx = SplitSink, Message>; +pub struct WSTransport; + +#[async_trait::async_trait] +impl Transport for WSTransport { + async fn serve(self, addr: String) -> Result, TransportError> { + let listener = TcpListener::bind(&addr).await?; + tracing::info!("serving websocket transport on: {addr}"); + + // basically as the k+chan_size user connects, if clients since k have not been served yet, + // block to alleviate backpressure + let (tx, rx) = mpsc::channel(4); + + // spawn a background task to handle websocket listener + tokio::spawn(async move { + while let Ok((stream, addr)) = listener.accept().await { + tracing::debug!("accepted connection from {addr}"); + + match tokio_tungstenite::accept_async(stream).await { + Err(e) => tracing::error!("failed creating websocket stream for {addr}: {e}"), + Ok(ws_stream) => { + if let Err(e) = tx.send(ws_stream.split()).await { + tracing::error!("could not serve {addr}: {e}"); + } + } + } + } + }); + + Ok(rx) + } +} + +#[async_trait::async_trait] +impl TransportStream for WSRx { + async fn pop(&mut self) -> TransportResult> { + // TODO this could forever, maybe add a timeout? + loop { + match self.try_next().await? { + None => return Ok(None), + Some(Message::Close(_frame)) => return Ok(None), + Some(Message::Text(x)) => return Ok(Some(serde_json::from_str::(&x)?)), + x => tracing::debug!("websocket transport message: {x:?}"), + } + } + } +} + +#[async_trait::async_trait] +impl TransportSink for WSTx { + async fn push(&mut self, msg: c2s::c::Packet) -> TransportResult<()> { + self.send(Message::Text(serde_json::to_string(&msg)?)).await?; + Ok(()) + } +}