From 332f9a5809d7f995c038d44c5ef179878f0d1d98 Mon Sep 17 00:00:00 2001 From: alemi Date: Wed, 21 Feb 2024 19:36:27 +0100 Subject: [PATCH] feat: initial websocket implementation --- Cargo.toml | 3 ++- src/main.rs | 15 ++++++++++++++- src/session.rs | 11 +++++++++-- 3 files changed, 25 insertions(+), 4 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 3634fa0..a79505a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,12 +11,13 @@ description = "check mumble server stats using http requests" #license = "LICENSE" [dependencies] -axum = "0.7.4" +axum = { version = "0.7.4", features = ["ws"] } chrono = "0.4" clap = { version = "4.5", features = ["derive"] } native-tls = "0.2.11" prost = "0.12.3" serde = { version = "1.0.196", features = ["derive"] } +serde_json = "1.0.114" tokio = { version = "1.36", features = ["net", "macros", "rt-multi-thread", "io-util"] } tokio-native-tls = "0.3.1" tracing = "0.1.40" diff --git a/src/main.rs b/src/main.rs index 83472cf..f111852 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,8 +4,9 @@ use std::{net::ToSocketAddrs, sync::Arc}; use clap::Parser; -use axum::{extract::{Query, State}, routing::get, Json, Router}; +use axum::{extract::{ws::{Message, WebSocket}, Query, State, WebSocketUpgrade}, response::Response, routing::get, Json, Router}; use session::Session; +use tokio::sync::broadcast; mod tcp; mod udp; @@ -75,6 +76,7 @@ async fn main() { let app = app .route("/info", get(server_info)) .route("/users", get(server_users)) + .route("/ws", get(server_ws)) .with_state(session); tracing::info!("serving mumble-stats-api"); @@ -93,6 +95,17 @@ async fn server_users(State(session): State>) -> Result>) -> Response { + let sub = session.events(); + ws.on_upgrade(|socket| handle_ws(socket, sub)) +} + +async fn handle_ws(mut socket: WebSocket, mut sub: broadcast::Receiver) { + while let Ok(event) = sub.recv().await { + socket.send(Message::Text(serde_json::to_string(&event).unwrap())).await.unwrap(); + } +} + async fn ping_server(Query(options): Query) -> Result, String> { let tuple = (options.host, options.port.unwrap_or(64738)); match tuple.to_socket_addrs() { // TODO do it manually so we have control diff --git a/src/session.rs b/src/session.rs index 6f3075e..1a79253 100644 --- a/src/session.rs +++ b/src/session.rs @@ -1,6 +1,6 @@ use std::{borrow::Borrow, collections::HashMap, net::SocketAddr, sync::Arc}; -use tokio::{net::UdpSocket, sync::{mpsc::{self, error::TrySendError}, watch, RwLock}}; +use tokio::{net::UdpSocket, sync::{broadcast, mpsc::{self, error::TrySendError}, watch, RwLock}}; use crate::{model::User, tcp::{control::ControlChannel, proto}, udp::proto::{PingPacket, PongPacket}}; @@ -11,6 +11,7 @@ pub struct Session { host: String, sync: watch::Receiver, drop: mpsc::Sender<()>, + events: broadcast::Sender, } impl Drop for Session { @@ -74,6 +75,10 @@ impl Session { self.host.to_string() } + pub fn events(&self) -> broadcast::Receiver { + self.events.subscribe() + } + pub async fn connect(host: &str, port: Option, username: Option, password: Option) -> std::io::Result> { let username = username.unwrap_or_else(|| ".mumble-stats-api".to_string()); let channel = Arc::new(ControlChannel::new(host, port).await?); @@ -102,9 +107,10 @@ impl Session { let (drop, mut stop) = mpsc::channel(1); let (ready, sync) = watch::channel(false); + let (events, _) = broadcast::channel(64); let s = Arc::new(Session { - drop, sync, + drop, sync, events, username: username.clone(), users : RwLock::new(HashMap::new()), host: host.to_string(), @@ -130,6 +136,7 @@ impl Session { Ok(proto::Packet::UserState(user)) => { tracing::info!("user state: {:?}", user); let mut users = session.users.write().await; + let _ = session.events.send(User::from(user.clone())); // if it fails nobody is listening match users.get_mut(&user.session()) { Some(u) => u.update(user), None => { users.insert(user.session(), User::from(user)); },