feat: add barebones wss transport

tokio already came back...
This commit is contained in:
əlemi 2024-06-29 00:40:01 +02:00
parent 58c3975139
commit a0071ddc38
Signed by: alemi
GPG key ID: A4895B84D311642C
4 changed files with 179 additions and 0 deletions

View file

@ -7,3 +7,18 @@ edition = "2021"
[dependencies]
tracing = "0.1"
thiserror = "1.0.61"
tokio = { version = "1.38.0", features = ["full"], optional = true }
tokio-tungstenite = { version = "0.23.1", features = ["tokio-native-tls"], optional = true }
futures-util = "0.3.30"
async-trait = "0.1.80"
serde = { version = "1.0", features = ["derive"] }
serde_json = { version = "1.0.118", optional = true }
scct-model = { path = "../model/" }
[features]
default = ["websocket"]
websocket = ["dep:tokio", "dep:tokio-tungstenite", "dep:serde_json"]
laminar = []

View file

@ -0,0 +1,28 @@
use super::{Transport, TransportResult, TransportSink, TransportStream};
use scct_model::proto::c2s;
pub struct LaminarTransport;
#[async_trait::async_trait]
impl Transport for LaminarTransport {
async fn serve(self, _addr: String) -> TransportResult<tokio::sync::mpsc::Receiver<(LaminarSink, LaminarStream)>> {
todo!()
}
}
pub struct LaminarStream;
pub struct LaminarSink;
#[async_trait::async_trait]
impl TransportStream for LaminarStream {
async fn pop(&mut self) -> TransportResult<Option<c2s::s::Packet>> {
todo!()
}
}
#[async_trait::async_trait]
impl TransportSink for LaminarSink {
async fn push(&mut self, msg: c2s::c::Packet) -> TransportResult<()> {
Ok(())
}
}

View file

@ -0,0 +1,72 @@
//! this part is mostly temporary, to get us quickly running with something web-ready (websockets)
//!
//! once we got most things running we can swap out websockets for laminar (or whatever we settle
//! on) just by respecting this trait. once the swap is complete, transport and server can be
//! coupled more, if necessary
//!
//! note that web app will still need a websocket, but it's probably better to make a bridge
//! translating server's laminar/whatev into websockets, rather than having each server also bundle
//! a websocket server just in case
//!
//! so basically most likely everything down here will get coupled more and merged with the rest
#[cfg(feature = "websocket")]
pub mod websocket;
#[cfg(feature = "laminar")]
pub mod laminar;
#[derive(Debug, thiserror::Error)]
pub enum TransportError {
#[cfg(feature = "websocket")]
#[error("websocket error: {0}")]
WebSocket(#[from] tokio_tungstenite::tungstenite::Error),
#[cfg(feature = "websocket")]
#[error("invalid json: {0}")]
Json(#[from] serde_json::Error),
#[error("system i/o error: {0}")]
IO(#[from] std::io::Error),
}
pub type TransportResult<T> = Result<T, TransportError>;
use scct_model::proto::c2s;
#[async_trait::async_trait]
pub trait Transport : Sized {
// TODO can we get rid of tokio-specifc channel here without losing async blocking and throwing
// in new crates?
async fn serve(self, addr: String) -> Result<tokio::sync::mpsc::Receiver<(impl TransportSink, impl TransportStream)>, TransportError>;
}
// i hate these names: "pop" and "push" are for queues, but "send" and "recv" are used by
// underlying transport so it gets a name clash mess and trait disambiguations are nasty
// TODO rename these?
#[async_trait::async_trait]
pub trait TransportStream: Send + Sync {
async fn pop(&mut self) -> Result<Option<c2s::s::Packet>, TransportError>;
}
#[async_trait::async_trait]
pub trait TransportSink: Send + Sync {
async fn push(&mut self, msg: c2s::c::Packet) -> Result<(), TransportError>;
}
#[allow(unreachable_code)]
pub fn new() -> impl Transport {
#[cfg(feature = "laminar")]
return laminar::LaminarTransport;
#[cfg(feature = "websocket")]
return websocket::WSTransport;
panic!("no availabe C2S transport available in this binary!");
}

View file

@ -0,0 +1,64 @@
use super::{Transport, TransportError, TransportResult, TransportSink, TransportStream};
use futures_util::{stream::{SplitSink, SplitStream, TryStreamExt}, SinkExt, StreamExt};
use scct_model::proto::c2s;
use tokio::{sync::mpsc, net::{TcpListener, TcpStream}};
use tokio_tungstenite::{tungstenite::protocol::Message, WebSocketStream};
pub type WSRx = SplitStream<WebSocketStream<TcpStream>>;
pub type WSTx = SplitSink<WebSocketStream<TcpStream>, Message>;
pub struct WSTransport;
#[async_trait::async_trait]
impl Transport for WSTransport {
async fn serve(self, addr: String) -> Result<mpsc::Receiver<(WSTx, WSRx)>, TransportError> {
let listener = TcpListener::bind(&addr).await?;
tracing::info!("serving websocket transport on: {addr}");
// basically as the k+chan_size user connects, if clients since k have not been served yet,
// block to alleviate backpressure
let (tx, rx) = mpsc::channel(4);
// spawn a background task to handle websocket listener
tokio::spawn(async move {
while let Ok((stream, addr)) = listener.accept().await {
tracing::debug!("accepted connection from {addr}");
match tokio_tungstenite::accept_async(stream).await {
Err(e) => tracing::error!("failed creating websocket stream for {addr}: {e}"),
Ok(ws_stream) => {
if let Err(e) = tx.send(ws_stream.split()).await {
tracing::error!("could not serve {addr}: {e}");
}
}
}
}
});
Ok(rx)
}
}
#[async_trait::async_trait]
impl TransportStream for WSRx {
async fn pop(&mut self) -> TransportResult<Option<c2s::s::Packet>> {
// TODO this could forever, maybe add a timeout?
loop {
match self.try_next().await? {
None => return Ok(None),
Some(Message::Close(_frame)) => return Ok(None),
Some(Message::Text(x)) => return Ok(Some(serde_json::from_str::<c2s::s::Packet>(&x)?)),
x => tracing::debug!("websocket transport message: {x:?}"),
}
}
}
}
#[async_trait::async_trait]
impl TransportSink for WSTx {
async fn push(&mut self, msg: c2s::c::Packet) -> TransportResult<()> {
self.send(Message::Text(serde_json::to_string(&msg)?)).await?;
Ok(())
}
}