From 52ed43745d1530f9a010ce7f2d47281782c0003e Mon Sep 17 00:00:00 2001 From: alemi Date: Wed, 5 Jun 2024 03:21:15 +0200 Subject: [PATCH] feat: now usable! still needs auth tho --- Cargo.toml | 1 + src/app.rs | 115 ++++++++++++++++++++++++++++++++------- src/chat.rs | 150 +++++++++++++++++++++++++++++++++++---------------- src/main.rs | 9 +--- src/proto.rs | 20 ++++--- 5 files changed, 214 insertions(+), 81 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 30ef6c8..94616bd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,6 +23,7 @@ ratatui = { version = "0.26", features = ["all-widgets", "crossterm"] } crossterm = { version = "0.27", features = ["event-stream"] } async-trait = "0.1.80" futures = "0.3.30" +thiserror = "1.0.61" [features] default = ["native-tls"] diff --git a/src/app.rs b/src/app.rs index a6fe118..e221a5d 100644 --- a/src/app.rs +++ b/src/app.rs @@ -1,13 +1,51 @@ use crossterm::event::{Event, KeyCode, KeyModifiers}; -use ratatui::{prelude::*, widgets::{Block, Borders, Paragraph}}; +use ratatui::{prelude::*, widgets::{Block, Borders, Paragraph, Wrap}}; use futures::StreamExt; -use crate::chat::Timeline; + +use crate::{chat::{Chat, ChatEvent}, proto::ws::Action}; + +#[derive(Debug, Default)] +struct History { + lines: Vec, + offset: u16, + count: u16, + height: u16, + width: u16, +} + +impl History { + fn up(&mut self, x: u16) { + self.offset = self.offset.saturating_sub(x); + } + + fn down(&mut self, x: u16) { + if self.count < self.height { return }; + let delta = self.count - self.height; + if self.offset >= delta { return } + self.offset += std::cmp::min(x, delta - self.offset); + } + + fn recalculate(&mut self) { + self.count = 0; + for line in &self.lines { + let len = (format!("{line}").len() as u16 / self.width) + 1; + self.count += len; + } + self.down(self.count); + } +} -pub async fn run(term: &mut Terminal, timeline: Timeline) -> std::io::Result<()> { +pub async fn run(term: &mut Terminal, args: crate::Args) -> Result<(), Box> { + let mut chat = Chat::register(&args.server).await?; + let mut stream = crossterm::event::EventStream::new(); - let mut last_draw = tokio::time::Instant::now() - std::time::Duration::from_secs(1); let mut input = String::new(); + let mut history = History { + height: term.size()?.height - 3, + width: term.size()?.width, + ..Default::default() + }; loop { tokio::select! { @@ -15,56 +53,93 @@ pub async fn run(term: &mut Terminal, timeline: res = stream.next() => match res { None => break Ok(()), // EOF - Some(Err(e)) => break Err(e), + Some(Err(e)) => break Err(Box::new(e)), Some(Ok(ev)) => match ev { - Event::FocusGained | Event::FocusLost | Event::Resize(_, _) => {}, + Event::FocusGained | Event::FocusLost => {}, Event::Mouse(_) => {}, // TODO handle mouse? eventually :tm: Event::Paste(p) => input.push_str(&p), // TODO paste at cursor + Event::Resize(c, r) => { + history.height = r - 3; + history.width = c; + history.recalculate(); + }, Event::Key(k) => { if k.modifiers == KeyModifiers::CONTROL && matches!(k.code, KeyCode::Char('q' | 'c')) { break Ok(()); } match k.code { - KeyCode::Enter => {}, - KeyCode::Backspace => {}, + KeyCode::Backspace => { input.pop(); }, KeyCode::Char(c) => input.push(c), + KeyCode::Up => history.up(1), + KeyCode::PageUp => history.up(10), + KeyCode::Down => history.down(1), + KeyCode::PageDown => history.down(10), + KeyCode::Enter => { + if let Err(_e) = chat.send(Action::new(input.clone())).await { + // TODO log it; + } else { + input = String::new(); + } + }, _ => {}, } }, }, }, - // redraw anyway every second - _ = tokio::time::sleep_until(last_draw + std::time::Duration::from_secs(1)) => {}, + res = chat.recv() => match res { + None => break Ok(()), // stream closed, TODO maybe say it and dont exit? + Some(event) => { + let lines = (format!("{event}").len() as u16 / history.width) + 1; + history.count += lines; + history.down(lines); + history.lines.push(event); + }, + }, }; let tl_input = input.clone(); - let mut tl_text = timeline.read().await.join("\n"); + let tl_input_len = ((tl_input.len() as u16 + 3) / history.width) + 1; + let tl_header = args.server.clone(); + // TODO only render bottom lines + let tl_text = history + .lines + .iter() + .map(|x| format!("{x}")) + .collect::>() + .join("\n"); + term.draw(|frame| { - - tl_text.push_str(&format!("\n{}", chrono::Utc::now())); - let layout = Layout::default() .direction(Direction::Vertical) .constraints(vec![ Constraint::Percentage(100), - Constraint::Min(3), + Constraint::Min(1 + tl_input_len), ]) .split(frame.size()); frame.render_widget( Paragraph::new(tl_text) - .block(Block::new().borders(Borders::ALL)), + .wrap(Wrap { trim: false }) + .scroll((history.offset, 0)) + .block( + Block::new() + .borders(Borders::TOP) + .title(tl_header) + .title_alignment(Alignment::Right) + ), layout[0] ); frame.render_widget( - Paragraph::new(tl_input) - .block(Block::new().borders(Borders::ALL)), + Paragraph::new(format!("> {tl_input}_")) + .wrap(Wrap { trim: false }) + .block( + Block::new() + .borders(Borders::TOP) + ), layout[1] ); })?; - - last_draw = tokio::time::Instant::now(); } } diff --git a/src/chat.rs b/src/chat.rs index 69d0e5a..7780b28 100644 --- a/src/chat.rs +++ b/src/chat.rs @@ -1,73 +1,129 @@ -use std::sync::Arc; +use core::fmt; +use std::fmt::Display; // TODO im so angry tokio-tungstenite makes me import this! -use futures_util::StreamExt; -use tokio::sync::RwLock; +use futures_util::{StreamExt, SinkExt}; +use tokio::sync::mpsc; use tokio_tungstenite::tungstenite::Message; -use crate::proto::{http::RegisterResponse, ws::{ChatEvent, ChatEventInner}}; +use crate::proto::{http::RegisterResponse, ws::{Event, EventInner}}; -pub type Timeline = Arc>>; - -#[async_trait::async_trait] -pub trait PushableTimeline { - async fn push(&self, msg: String); +#[derive(Debug, thiserror::Error)] +pub enum ChatError { + #[error("failed registering to chat: {0:?}")] + Register(#[from] reqwest::Error), + + #[error("connection error with chat: {0:?}")] + Connect(#[from] tokio_tungstenite::tungstenite::Error), } -#[async_trait::async_trait] -impl PushableTimeline for Timeline { - async fn push(&self, msg: String) { - self.write().await.push(msg) +#[derive(Debug)] +pub struct ChatEvent(chrono::DateTime, ChatLine); + +#[derive(Debug)] +pub enum ChatLine { + Message { + user: String, + body: String, + }, + Error(String), + Join(String), + Leave(String), + Connect(String), +} + +impl Display for ChatEvent { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let time = self.0.format("%H:%M:%S"); + match &self.1 { + ChatLine::Error(err) => write!(f, "{time} /!\\ {err}"), + ChatLine::Join(user) => write!(f, "{time} [{user} joined]"), + ChatLine::Leave(user) => write!(f, "{time} [{user} left]"), + ChatLine::Connect(user) => write!(f, "{time} [{user} connected]"), + ChatLine::Message { user, body } => { + let pre = format!("{time} <{user}> "); + let pretty_body = dissolve::strip_html_tags(body).join("\n"); + write!(f, "{pre}{pretty_body}") + }, + } } } -pub async fn worker(server: String, timeline: Timeline) { - timeline.push("[ ] connecting...".to_string()).await; - let registration : RegisterResponse = reqwest::Client::new() - .post(format!("https://{server}/api/chat/register")) - .send() - .await - .expect("failed sending registration request") - .json() - .await - .expect("failed parsing registration response"); +pub struct Chat { + sink: ChatSink, + rx: mpsc::UnboundedReceiver, +} - timeline.push(format!("registered to chat: {registration:?}")).await; +impl Chat { + pub async fn register(server: &str) -> Result { + let registration : RegisterResponse = reqwest::Client::new() + .post(format!("https://{server}/api/chat/register")) + .send() + .await? + .json() + .await?; - // TODO use url crate? - let ws_url = format!("wss://{server}/ws?accessToken={}", registration.access_token); + let ws_url = format!("wss://{server}/ws?accessToken={}", registration.access_token); - timeline.push(format!("connecting to {ws_url}")).await; + let (ws, _response) = tokio_tungstenite::connect_async(ws_url).await?; - match tokio_tungstenite::connect_async(ws_url).await { - Err(e) => timeline.push(format!("[!] failed connecting: {e}")).await, - Ok((mut stream, _)) => while let Some(next) = stream.next().await { + let (sink, stream) = ws.split(); + let (tx, rx) = mpsc::unbounded_channel(); + + tokio::spawn(async move { + if let Err(e) = Chat::worker(stream, tx).await { + eprintln!("chat channel closed while sending a message: {e}"); + } + }); + + Ok(Chat { sink, rx }) + } + + pub async fn send(&mut self, message: crate::proto::ws::Action) -> Result<(), tokio_tungstenite::tungstenite::Error> { + self.sink.send( + Message::Text( + serde_json::to_string(&message).expect("error serializing ChatAction") + ) + ) + .await + } + + #[inline] // because mpsc::recv is cancelable, but this wrapper may not be + pub async fn recv(&mut self) -> Option { + self.rx.recv().await + } + + async fn worker(mut stream: ChatStream, chan: mpsc::UnboundedSender) -> Result<(), mpsc::error::SendError> { + while let Some(next) = stream.next().await { match next { - Err(e) => timeline.push(format!("[!] error receiving chat message: {e}")).await, + Err(e) => chan.send(ChatEvent(chrono::Utc::now(), ChatLine::Error(format!("failed receiving message: {e}"))))?, Ok(msg) => match msg { - Message::Binary(_) => timeline.push("[!] received unexpected binary payload".to_string()).await, - Message::Frame(_) => timeline.push("[!] received unexpected raw frame".to_string()).await, + Message::Close(_) => break, + Message::Binary(_) => chan.send(ChatEvent(chrono::Utc::now(), ChatLine::Error("unexpected binary payload".to_string())))?, + Message::Frame(_) => chan.send(ChatEvent(chrono::Utc::now(), ChatLine::Error("unexpected raw frame".to_string())))?, Message::Ping(_) | tokio_tungstenite::tungstenite::Message::Pong(_) => {}, // ignore - Message::Close(_) => timeline.push("[x] stream is closing".to_string()).await, Message::Text(payload) => for line in payload.lines() { - match serde_json::from_str::(line) { - Err(e) => timeline.push(format!("[!] failed deserializing chat message: {e} -- {payload}")).await, + match serde_json::from_str::(line) { + Err(e) => chan.send(ChatEvent(chrono::Utc::now(), ChatLine::Error(format!("failed parsing message: {e}"))))?, Ok(event) => match event.inner { - ChatEventInner::Chat { body, visible: _ } => - timeline.push(format!("{} | [{}]> {}", event.timestamp, event.user.display_name, dissolve::strip_html_tags(&body).join(" "))).await, - ChatEventInner::ConnectedUserInfo => - timeline.push(format!("{} | {} connected", event.timestamp, event.user.display_name)).await, - ChatEventInner::UserJoined => - timeline.push(format!("{} | {} joined", event.timestamp, event.user.display_name)).await, - ChatEventInner::UserParted => - timeline.push(format!("{} | {} left", event.timestamp, event.user.display_name)).await, + EventInner::Chat { body, visible: _ } => + chan.send(ChatEvent(event.timestamp, ChatLine::Message { user: event.user.display_name, body }))?, + EventInner::ConnectedUserInfo => + chan.send(ChatEvent(event.timestamp, ChatLine::Connect(event.user.display_name)))?, + EventInner::UserJoined => + chan.send(ChatEvent(event.timestamp, ChatLine::Join(event.user.display_name)))?, + EventInner::UserParted => + chan.send(ChatEvent(event.timestamp, ChatLine::Leave(event.user.display_name)))?, } } }, } } - }, - } + } - timeline.write().await.push("[x] stream closed".to_string()); + Ok(()) + } } + +type ChatStream = futures::stream::SplitStream>>; +type ChatSink = futures::stream::SplitSink>, Message>; diff --git a/src/main.rs b/src/main.rs index 0de7584..d6d6039 100644 --- a/src/main.rs +++ b/src/main.rs @@ -15,13 +15,6 @@ struct Args { async fn main() -> Result<(), Box> { let args = Args::parse(); - let timeline = chat::Timeline::default(); - let _tl = timeline.clone(); - let _server = args.server.clone(); - tokio::spawn(async move { - crate::chat::worker(_server, _tl).await - }); - // setup terminal crossterm::terminal::enable_raw_mode()?; let mut stdout = std::io::stdout(); @@ -33,7 +26,7 @@ async fn main() -> Result<(), Box> { let mut terminal = ratatui::Terminal::new(backend)?; terminal.hide_cursor()?; - let res = app::run(&mut terminal, timeline).await; + let res = app::run(&mut terminal, args).await; // restore terminal crossterm::terminal::disable_raw_mode()?; diff --git a/src/proto.rs b/src/proto.rs index 0915f63..a64efb6 100644 --- a/src/proto.rs +++ b/src/proto.rs @@ -16,24 +16,32 @@ pub mod http { pub mod ws { #[derive(Debug, serde::Serialize)] #[allow(unused)] - pub struct ChatAction { + pub struct Action { pub r#type: String, // CHAT, ??? pub body: String, } + + impl Action { + pub fn new(body: String) -> Self { + Action { + body, r#type: "CHAT".to_string(), + } + } + } #[derive(Debug, serde::Deserialize)] - pub struct ChatEvent { + pub struct Event { pub id: String, pub timestamp: chrono::DateTime, - pub user: ChatUser, + pub user: User, #[serde(flatten)] - pub inner: ChatEventInner, + pub inner: EventInner, } // TODO can i avoid repeating id,timestamp,user in each msg type?? #[derive(Debug, serde::Deserialize)] #[serde(tag = "type")] - pub enum ChatEventInner { + pub enum EventInner { #[serde(rename = "CHAT")] Chat { body: String, @@ -50,7 +58,7 @@ pub mod ws { #[derive(Debug, serde::Deserialize)] #[serde(rename_all = "camelCase")] #[allow(unused)] - pub struct ChatUser { + pub struct User { pub created_at: chrono::DateTime, pub name_changed_at: chrono::DateTime, pub id: String,