use std::sync::Arc; // TODO im so angry tokio-tungstenite makes me import this! use futures_util::StreamExt; use tokio::sync::RwLock; use tokio_tungstenite::tungstenite::Message; use crate::proto::{http::RegisterResponse, ws::{ChatEvent, ChatEventInner}}; pub type Timeline = Arc<RwLock<Vec<String>>>; #[async_trait::async_trait] pub trait PushableTimeline { async fn push(&self, msg: String); } #[async_trait::async_trait] impl PushableTimeline for Timeline { async fn push(&self, msg: String) { self.write().await.push(msg) } } pub async fn worker(server: String, timeline: Timeline) { timeline.push("[ ] connecting...".to_string()).await; let registration : RegisterResponse = reqwest::Client::new() .post(format!("https://{server}/api/chat/register")) .send() .await .expect("failed sending registration request") .json() .await .expect("failed parsing registration response"); timeline.push(format!("registered to chat: {registration:?}")).await; // TODO use url crate? let ws_url = format!("wss://{server}/ws?accessToken={}", registration.access_token); timeline.push(format!("connecting to {ws_url}")).await; match tokio_tungstenite::connect_async(ws_url).await { Err(e) => timeline.push(format!("[!] failed connecting: {e}")).await, Ok((mut stream, _)) => while let Some(next) = stream.next().await { match next { Err(e) => timeline.push(format!("[!] error receiving chat message: {e}")).await, Ok(msg) => match msg { Message::Binary(_) => timeline.push("[!] received unexpected binary payload".to_string()).await, Message::Frame(_) => timeline.push("[!] received unexpected raw frame".to_string()).await, Message::Ping(_) | tokio_tungstenite::tungstenite::Message::Pong(_) => {}, // ignore Message::Close(_) => timeline.push("[x] stream is closing".to_string()).await, Message::Text(payload) => for line in payload.lines() { match serde_json::from_str::<ChatEvent>(line) { Err(e) => timeline.push(format!("[!] failed deserializing chat message: {e} -- {payload}")).await, Ok(event) => match event.inner { ChatEventInner::Chat { body, visible: _ } => timeline.push(format!("{} | [{}]> {}", event.timestamp, event.user.display_name, dissolve::strip_html_tags(&body).join(" "))).await, ChatEventInner::ConnectedUserInfo => timeline.push(format!("{} | {} connected", event.timestamp, event.user.display_name)).await, ChatEventInner::UserJoined => timeline.push(format!("{} | {} joined", event.timestamp, event.user.display_name)).await, ChatEventInner::UserParted => timeline.push(format!("{} | {} left", event.timestamp, event.user.display_name)).await, } } }, } } }, } timeline.write().await.push("[x] stream closed".to_string()); }