feat: initial websocket implementation

This commit is contained in:
əlemi 2024-02-21 19:36:27 +01:00
parent 6cb230d75c
commit 332f9a5809
Signed by: alemi
GPG key ID: A4895B84D311642C
3 changed files with 25 additions and 4 deletions

View file

@ -11,12 +11,13 @@ description = "check mumble server stats using http requests"
#license = "LICENSE" #license = "LICENSE"
[dependencies] [dependencies]
axum = "0.7.4" axum = { version = "0.7.4", features = ["ws"] }
chrono = "0.4" chrono = "0.4"
clap = { version = "4.5", features = ["derive"] } clap = { version = "4.5", features = ["derive"] }
native-tls = "0.2.11" native-tls = "0.2.11"
prost = "0.12.3" prost = "0.12.3"
serde = { version = "1.0.196", features = ["derive"] } serde = { version = "1.0.196", features = ["derive"] }
serde_json = "1.0.114"
tokio = { version = "1.36", features = ["net", "macros", "rt-multi-thread", "io-util"] } tokio = { version = "1.36", features = ["net", "macros", "rt-multi-thread", "io-util"] }
tokio-native-tls = "0.3.1" tokio-native-tls = "0.3.1"
tracing = "0.1.40" tracing = "0.1.40"

View file

@ -4,8 +4,9 @@ use std::{net::ToSocketAddrs, sync::Arc};
use clap::Parser; use clap::Parser;
use axum::{extract::{Query, State}, routing::get, Json, Router}; use axum::{extract::{ws::{Message, WebSocket}, Query, State, WebSocketUpgrade}, response::Response, routing::get, Json, Router};
use session::Session; use session::Session;
use tokio::sync::broadcast;
mod tcp; mod tcp;
mod udp; mod udp;
@ -75,6 +76,7 @@ async fn main() {
let app = app let app = app
.route("/info", get(server_info)) .route("/info", get(server_info))
.route("/users", get(server_users)) .route("/users", get(server_users))
.route("/ws", get(server_ws))
.with_state(session); .with_state(session);
tracing::info!("serving mumble-stats-api"); tracing::info!("serving mumble-stats-api");
@ -93,6 +95,17 @@ async fn server_users(State(session): State<Arc<Session>>) -> Result<Json<Vec<mo
Ok(Json(session.users().await)) Ok(Json(session.users().await))
} }
async fn server_ws(ws: WebSocketUpgrade, State(session): State<Arc<Session>>) -> Response {
let sub = session.events();
ws.on_upgrade(|socket| handle_ws(socket, sub))
}
async fn handle_ws(mut socket: WebSocket, mut sub: broadcast::Receiver<model::User>) {
while let Ok(event) = sub.recv().await {
socket.send(Message::Text(serde_json::to_string(&event).unwrap())).await.unwrap();
}
}
async fn ping_server(Query(options): Query<model::PingOptions>) -> Result<Json<model::ServerInfo>, String> { async fn ping_server(Query(options): Query<model::PingOptions>) -> Result<Json<model::ServerInfo>, String> {
let tuple = (options.host, options.port.unwrap_or(64738)); let tuple = (options.host, options.port.unwrap_or(64738));
match tuple.to_socket_addrs() { // TODO do it manually so we have control match tuple.to_socket_addrs() { // TODO do it manually so we have control

View file

@ -1,6 +1,6 @@
use std::{borrow::Borrow, collections::HashMap, net::SocketAddr, sync::Arc}; use std::{borrow::Borrow, collections::HashMap, net::SocketAddr, sync::Arc};
use tokio::{net::UdpSocket, sync::{mpsc::{self, error::TrySendError}, watch, RwLock}}; use tokio::{net::UdpSocket, sync::{broadcast, mpsc::{self, error::TrySendError}, watch, RwLock}};
use crate::{model::User, tcp::{control::ControlChannel, proto}, udp::proto::{PingPacket, PongPacket}}; use crate::{model::User, tcp::{control::ControlChannel, proto}, udp::proto::{PingPacket, PongPacket}};
@ -11,6 +11,7 @@ pub struct Session {
host: String, host: String,
sync: watch::Receiver<bool>, sync: watch::Receiver<bool>,
drop: mpsc::Sender<()>, drop: mpsc::Sender<()>,
events: broadcast::Sender<User>,
} }
impl Drop for Session { impl Drop for Session {
@ -74,6 +75,10 @@ impl Session {
self.host.to_string() self.host.to_string()
} }
pub fn events(&self) -> broadcast::Receiver<User> {
self.events.subscribe()
}
pub async fn connect(host: &str, port: Option<u16>, username: Option<String>, password: Option<String>) -> std::io::Result<Arc<Self>> { pub async fn connect(host: &str, port: Option<u16>, username: Option<String>, password: Option<String>) -> std::io::Result<Arc<Self>> {
let username = username.unwrap_or_else(|| ".mumble-stats-api".to_string()); let username = username.unwrap_or_else(|| ".mumble-stats-api".to_string());
let channel = Arc::new(ControlChannel::new(host, port).await?); let channel = Arc::new(ControlChannel::new(host, port).await?);
@ -102,9 +107,10 @@ impl Session {
let (drop, mut stop) = mpsc::channel(1); let (drop, mut stop) = mpsc::channel(1);
let (ready, sync) = watch::channel(false); let (ready, sync) = watch::channel(false);
let (events, _) = broadcast::channel(64);
let s = Arc::new(Session { let s = Arc::new(Session {
drop, sync, drop, sync, events,
username: username.clone(), username: username.clone(),
users : RwLock::new(HashMap::new()), users : RwLock::new(HashMap::new()),
host: host.to_string(), host: host.to_string(),
@ -130,6 +136,7 @@ impl Session {
Ok(proto::Packet::UserState(user)) => { Ok(proto::Packet::UserState(user)) => {
tracing::info!("user state: {:?}", user); tracing::info!("user state: {:?}", user);
let mut users = session.users.write().await; let mut users = session.users.write().await;
let _ = session.events.send(User::from(user.clone())); // if it fails nobody is listening
match users.get_mut(&user.session()) { match users.get_mut(&user.session()) {
Some(u) => u.update(user), Some(u) => u.update(user),
None => { users.insert(user.session(), User::from(user)); }, None => { users.insert(user.session(), User::from(user)); },