use core::fmt; use std::fmt::Display; // TODO im so angry tokio-tungstenite makes me import this! use futures_util::{StreamExt, SinkExt}; use tokio::sync::mpsc; use tokio_tungstenite::tungstenite::Message; use crate::proto::{http::RegisterResponse, ws::{Event, EventInner}}; #[derive(Debug, thiserror::Error)] pub enum ChatError { #[error("failed registering to chat: {0:?}")] Register(#[from] reqwest::Error), #[error("connection error with chat: {0:?}")] Connect(#[from] tokio_tungstenite::tungstenite::Error), } #[derive(Debug)] pub struct ChatEvent(chrono::DateTime<chrono::Utc>, ChatLine); #[derive(Debug)] pub enum ChatLine { Message { user: String, body: String, }, Error(String), Rename(String, String), Join(String), Leave(String), Connect(String), Raw(String), } impl Display for ChatEvent { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { let time = self.0.format("%H:%M:%S"); match &self.1 { ChatLine::Error(err) => write!(f, "{time} /!\\ {err}"), ChatLine::Rename(before, after) => write!(f, "{time} user <{before}> renamed to <{after}>"), ChatLine::Join(user) => write!(f, "{time} [{user} joined]"), ChatLine::Leave(user) => write!(f, "{time} [{user} left]"), ChatLine::Connect(user) => write!(f, "{time} [{user} connected]"), ChatLine::Message { user, body } => { let pre = format!("{time} <{user}> "); let pretty_body = dissolve::strip_html_tags(body).join("\n"); write!(f, "{pre}{pretty_body}") }, ChatLine::Raw(s) => { let without_html = dissolve::strip_html_tags(s).join("\n"); write!(f, "{time} {without_html}") }, } } } pub struct Chat { sink: ChatSink, rx: mpsc::UnboundedReceiver<ChatEvent>, } impl Chat { pub async fn register(server: &str, token: Option<String>) -> Result<Chat, ChatError> { let token = match token { Some(t) => t, None => { let registration : RegisterResponse = reqwest::Client::new() .post(format!("https://{server}/api/chat/register")) .send() .await? .json() .await?; registration.access_token }, }; let ws_url = format!("wss://{server}/ws?accessToken={token}"); let (ws, _response) = tokio_tungstenite::connect_async(ws_url).await?; let (sink, stream) = ws.split(); let (tx, rx) = mpsc::unbounded_channel(); tokio::spawn(async move { if let Err(e) = Chat::worker(stream, tx).await { eprintln!("chat channel closed while sending a message: {e}"); } }); Ok(Chat { sink, rx }) } pub async fn send(&mut self, message: crate::proto::ws::Action) -> Result<(), tokio_tungstenite::tungstenite::Error> { self.sink.send( Message::Text( serde_json::to_string(&message).expect("error serializing ChatAction") ) ) .await } #[inline] // because mpsc::recv is cancelable, but this wrapper may not be pub async fn recv(&mut self) -> Option<ChatEvent> { self.rx.recv().await } async fn worker(mut stream: ChatStream, chan: mpsc::UnboundedSender<ChatEvent>) -> Result<(), mpsc::error::SendError<ChatEvent>> { while let Some(next) = stream.next().await { match next { Err(e) => chan.send(ChatEvent(chrono::Utc::now(), ChatLine::Error(format!("failed receiving message: {e}"))))?, Ok(msg) => match msg { Message::Close(_) => break, Message::Binary(_) => chan.send(ChatEvent(chrono::Utc::now(), ChatLine::Error("unexpected binary payload".to_string())))?, Message::Frame(_) => chan.send(ChatEvent(chrono::Utc::now(), ChatLine::Error("unexpected raw frame".to_string())))?, Message::Ping(_) | tokio_tungstenite::tungstenite::Message::Pong(_) => {}, // ignore Message::Text(payload) => for line in payload.lines() { match serde_json::from_str::<Event>(line) { Err(e) => { chan.send(ChatEvent(chrono::Utc::now(), ChatLine::Error(format!("failed parsing message: {e}"))))?; chan.send(ChatEvent(chrono::Utc::now(), ChatLine::Raw(line.to_string())))?; }, Ok(event) => match event.inner { EventInner::Chat { body, user, visible } => chan.send(ChatEvent(event.timestamp, if visible { ChatLine::Message { user: user.display_name, body } } else { ChatLine::Raw("| REDACTED |".into()) }))?, EventInner::ConnectedUserInfo { user } => chan.send(ChatEvent(event.timestamp, ChatLine::Connect(user.display_name)))?, EventInner::UserJoined { user } => chan.send(ChatEvent(event.timestamp, ChatLine::Join(user.display_name)))?, EventInner::UserParted { user } => chan.send(ChatEvent(event.timestamp, ChatLine::Leave(user.display_name)))?, EventInner::NameChange { old_name, user } => chan.send(ChatEvent(event.timestamp, ChatLine::Rename(old_name, user.display_name)))?, EventInner::ChatAction { body } => chan.send(ChatEvent(event.timestamp, ChatLine::Raw(body)))?, } } }, } } } Ok(()) } } type ChatStream = futures::stream::SplitStream<tokio_tungstenite::WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>>; type ChatSink = futures::stream::SplitSink<tokio_tungstenite::WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>, Message>;