From 550e47b1d1ec5d0b576474ab9d940b4b1b5a7252 Mon Sep 17 00:00:00 2001 From: alemi Date: Tue, 20 Feb 2024 02:52:26 +0100 Subject: [PATCH] feat: session with persistent connection --- src/session.rs | 79 ++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 76 insertions(+), 3 deletions(-) diff --git a/src/session.rs b/src/session.rs index 57a3977..8d596b3 100644 --- a/src/session.rs +++ b/src/session.rs @@ -1,11 +1,23 @@ -use std::net::SocketAddr; +use std::{collections::HashMap, net::SocketAddr, sync::{atomic::AtomicBool, Arc}}; -use tokio::net::UdpSocket; +use tokio::{net::UdpSocket, sync::{mpsc::{self, error::TrySendError}, watch, RwLock}}; -use crate::{tcp::{proto, control::ControlChannel}, udp::proto::{PingPacket, PongPacket}}; +use crate::{model::User, tcp::{control::ControlChannel, proto}, udp::proto::{PingPacket, PongPacket}}; pub struct Session { + pub users: RwLock>, + sync: watch::Receiver, + drop: mpsc::Sender<()>, +} +impl Drop for Session { + fn drop(&mut self) { + match self.drop.try_send(()) { + Ok(()) => {}, + Err(TrySendError::Full(())) => tracing::warn!("session stop channel full"), + Err(TrySendError::Closed(())) => tracing::warn!("session stop channel already closed"), + } + } } impl Session { @@ -80,4 +92,65 @@ impl Session { } } } + + pub async fn connect(host: &str, port: Option, username: Option, password: Option) -> std::io::Result> { + let username = username.unwrap_or_else(|| ".mumble-stats-api".to_string()); + let mut channel = ControlChannel::new(host, port).await?; + let version = proto::Version { + version_v1: None, + version_v2: Some(281496485429248), + release: Some("1.5.517".into()), + os: None, + os_version: None, + }; + let authenticate = proto::Authenticate { + username: Some(username.clone()), + password, + tokens: Vec::new(), + celt_versions: Vec::new(), + opus: Some(true), + client_type: Some(1), + }; + + for pkt in [ + proto::Packet::Version(version), + proto::Packet::Authenticate(authenticate), + ] { + channel.send(pkt).await?; + } + + let (tx, mut rx) = mpsc::channel(1); + let (ready_tx, ready) = watch::channel(false); + + let s = Arc::new(Session { + users : RwLock::new(HashMap::new()), + sync: ready, + drop: tx, + }); + + let session = s.clone(); + tokio::spawn(async move { + loop { + match rx.try_recv() { + Ok(()) => break, + Err(mpsc::error::TryRecvError::Empty) => {}, + Err(mpsc::error::TryRecvError::Disconnected) => break tracing::warn!("all session dropped without stopping this task, stopping..."), + } + match channel.recv().await { + Err(e) => break tracing::warn!("disconnected from server: {}", e), + // Ok(tcp::proto::Packet::TextMessage(msg)) => tracing::info!("{}", msg.message), + // Ok(tcp::proto::Packet::ChannelState(channel)) => tracing::info!("discovered channel: {:?}", channel.name), + Ok(proto::Packet::ServerSync(_sync)) => ready_tx.send(true).unwrap(), + Ok(proto::Packet::UserState(user)) => { + if user.name.as_ref().is_some_and(|n| n != &username) { + session.users.write().await.insert(user.user_id(), User::from(user)); + } + }, + Ok(pkt) => tracing::debug!("ignoring packet {:#?}", pkt), + } + } + }); + + Ok(s) + } }