feat: session with persistent connection

This commit is contained in:
əlemi 2024-02-20 02:52:26 +01:00
parent 34ad2a8681
commit 550e47b1d1
Signed by: alemi
GPG key ID: A4895B84D311642C

View file

@ -1,11 +1,23 @@
use std::net::SocketAddr; use std::{collections::HashMap, net::SocketAddr, sync::{atomic::AtomicBool, Arc}};
use tokio::net::UdpSocket; use tokio::{net::UdpSocket, sync::{mpsc::{self, error::TrySendError}, watch, RwLock}};
use crate::{tcp::{proto, control::ControlChannel}, udp::proto::{PingPacket, PongPacket}}; use crate::{model::User, tcp::{control::ControlChannel, proto}, udp::proto::{PingPacket, PongPacket}};
pub struct Session { pub struct Session {
pub users: RwLock<HashMap<u32, User>>,
sync: watch::Receiver<bool>,
drop: mpsc::Sender<()>,
}
impl Drop for Session {
fn drop(&mut self) {
match self.drop.try_send(()) {
Ok(()) => {},
Err(TrySendError::Full(())) => tracing::warn!("session stop channel full"),
Err(TrySendError::Closed(())) => tracing::warn!("session stop channel already closed"),
}
}
} }
impl Session { impl Session {
@ -80,4 +92,65 @@ impl Session {
} }
} }
} }
pub async fn connect(host: &str, port: Option<u16>, username: Option<String>, password: Option<String>) -> std::io::Result<Arc<Self>> {
let username = username.unwrap_or_else(|| ".mumble-stats-api".to_string());
let mut channel = ControlChannel::new(host, port).await?;
let version = proto::Version {
version_v1: None,
version_v2: Some(281496485429248),
release: Some("1.5.517".into()),
os: None,
os_version: None,
};
let authenticate = proto::Authenticate {
username: Some(username.clone()),
password,
tokens: Vec::new(),
celt_versions: Vec::new(),
opus: Some(true),
client_type: Some(1),
};
for pkt in [
proto::Packet::Version(version),
proto::Packet::Authenticate(authenticate),
] {
channel.send(pkt).await?;
}
let (tx, mut rx) = mpsc::channel(1);
let (ready_tx, ready) = watch::channel(false);
let s = Arc::new(Session {
users : RwLock::new(HashMap::new()),
sync: ready,
drop: tx,
});
let session = s.clone();
tokio::spawn(async move {
loop {
match rx.try_recv() {
Ok(()) => break,
Err(mpsc::error::TryRecvError::Empty) => {},
Err(mpsc::error::TryRecvError::Disconnected) => break tracing::warn!("all session dropped without stopping this task, stopping..."),
}
match channel.recv().await {
Err(e) => break tracing::warn!("disconnected from server: {}", e),
// Ok(tcp::proto::Packet::TextMessage(msg)) => tracing::info!("{}", msg.message),
// Ok(tcp::proto::Packet::ChannelState(channel)) => tracing::info!("discovered channel: {:?}", channel.name),
Ok(proto::Packet::ServerSync(_sync)) => ready_tx.send(true).unwrap(),
Ok(proto::Packet::UserState(user)) => {
if user.name.as_ref().is_some_and(|n| n != &username) {
session.users.write().await.insert(user.user_id(), User::from(user));
}
},
Ok(pkt) => tracing::debug!("ignoring packet {:#?}", pkt),
}
}
});
Ok(s)
}
} }