feat: track channels

no way to get them yet tho but they should get tracked too now
oh users have a channel_id field
This commit is contained in:
əlemi 2024-02-22 04:55:58 +01:00
parent cbeb54d5fc
commit e105d40308
Signed by: alemi
GPG key ID: A4895B84D311642C
2 changed files with 83 additions and 8 deletions

View file

@ -38,6 +38,9 @@ pub struct User {
/// Registered user ID if the user is registered. /// Registered user ID if the user is registered.
pub user_id: Option<u32>, pub user_id: Option<u32>,
/// Channel on which the user is.
pub channel_id: u32,
/// User comment if it is less than 128 bytes. /// User comment if it is less than 128 bytes.
pub comment: Option<String>, pub comment: Option<String>,
@ -70,6 +73,7 @@ impl User {
if let Some(name) = state.name { self.name = name } if let Some(name) = state.name { self.name = name }
if let Some(user_id) = state.user_id { self.user_id = Some(user_id) } if let Some(user_id) = state.user_id { self.user_id = Some(user_id) }
if let Some(comment) = state.comment { self.comment = Some(comment) } if let Some(comment) = state.comment { self.comment = Some(comment) }
if let Some(channel_id) = state.channel_id { self.channel_id = channel_id }
if let Some(mute) = state.mute { self.properties.mute = mute } if let Some(mute) = state.mute { self.properties.mute = mute }
if let Some(deaf) = state.deaf { self.properties.deaf = deaf } if let Some(deaf) = state.deaf { self.properties.deaf = deaf }
if let Some(suppress) = state.suppress { self.properties.suppress = suppress } if let Some(suppress) = state.suppress { self.properties.suppress = suppress }
@ -89,6 +93,7 @@ impl From<crate::tcp::proto::UserState> for User {
name: value.name().to_string(), name: value.name().to_string(),
user_id: value.user_id, user_id: value.user_id,
comment: value.comment.clone(), comment: value.comment.clone(),
channel_id: value.channel_id(),
properties: UserProperties { properties: UserProperties {
mute: value.mute(), mute: value.mute(),
deaf: value.deaf(), deaf: value.deaf(),
@ -101,3 +106,54 @@ impl From<crate::tcp::proto::UserState> for User {
} }
} }
} }
#[derive(Debug, Clone, serde::Serialize)]
pub struct Channel {
/// Unique ID for the channel within the server.
pub id: u32,
/// channel_id of the parent channel.
pub parent: Option<u32>,
/// UTF-8 encoded channel name.
pub name: String,
/// UTF-8 encoded channel description
pub description: String,
/// True if the channel is temporary.
pub temporary: bool,
/// Position weight to tweak the channel position in the channel list.
pub position: i32,
/// Maximum number of users allowed in the channel. If this value is zero,
/// the maximum number of users allowed in the channel is given by the
/// server's "usersperchannel" setting.
pub max_users: u32,
}
impl From<crate::tcp::proto::ChannelState> for Channel {
fn from(value: crate::tcp::proto::ChannelState) -> Self {
Channel {
id: value.channel_id(),
parent: value.parent,
name: value.name().to_string(),
description: value.description().to_string(),
temporary: value.temporary(),
position: value.position(),
max_users: value.max_users()
}
}
}
impl Channel {
pub fn update(&mut self, value: crate::tcp::proto::ChannelState) {
if self.id != value.channel_id() { tracing::warn!("updating channel with different id") }
if let Some(parent) = value.parent { self.parent = Some(parent) }
if let Some(name) = value.name { self.name = name }
if let Some(description) = value.description { self.description = description }
if let Some(temporary) = value.temporary { self.temporary = temporary }
if let Some(max_users) = value.max_users { self.max_users = max_users }
}
}

View file

@ -2,12 +2,13 @@ use std::{borrow::Borrow, collections::HashMap, net::SocketAddr, sync::{atomic::
use tokio::{net::UdpSocket, sync::{broadcast, RwLock}}; use tokio::{net::UdpSocket, sync::{broadcast, RwLock}};
use crate::{model::User, tcp::{control::ControlChannel, proto}, udp::proto::{PingPacket, PongPacket}}; use crate::{model::{Channel, User}, tcp::{control::ControlChannel, proto}, udp::proto::{PingPacket, PongPacket}};
#[derive(Debug)] #[derive(Debug)]
pub struct Session { pub struct Session {
options: Arc<SessionOptions>, options: Arc<SessionOptions>,
users: Arc<RwLock<HashMap<u32, User>>>, users: Arc<RwLock<HashMap<u32, User>>>,
channels: Arc<RwLock<HashMap<u32, Channel>>>,
// sync: watch::Receiver<bool>, // sync: watch::Receiver<bool>,
run: Arc<AtomicBool>, run: Arc<AtomicBool>,
events: Arc<broadcast::Sender<SessionEvent>>, events: Arc<broadcast::Sender<SessionEvent>>,
@ -96,6 +97,7 @@ impl Session {
options: Arc<SessionOptions>, options: Arc<SessionOptions>,
run: Arc<AtomicBool>, run: Arc<AtomicBool>,
users: Arc<RwLock<HashMap<u32, User>>>, users: Arc<RwLock<HashMap<u32, User>>>,
channels: Arc<RwLock<HashMap<u32, Channel>>>,
events: Arc<broadcast::Sender<SessionEvent>>, events: Arc<broadcast::Sender<SessionEvent>>,
) -> std::io::Result<()> { ) -> std::io::Result<()> {
let channel = Arc::new(ControlChannel::new(&options.host, Some(options.port)).await?); let channel = Arc::new(ControlChannel::new(&options.host, Some(options.port)).await?);
@ -136,6 +138,18 @@ impl Session {
users.write().await.remove(&user.session); users.write().await.remove(&user.session);
let _ = events.send(SessionEvent::RemoveUser(user.session)); let _ = events.send(SessionEvent::RemoveUser(user.session));
}, },
Ok(proto::Packet::ChannelRemove(channel)) => {
tracing::debug!("removing channel: {:?}", channel);
channels.write().await.remove(&channel.channel_id);
},
Ok(proto::Packet::ChannelState(channel)) => {
tracing::debug!("updating channel state: {:?}", channel);
let mut channels = channels.write().await;
match channels.get_mut(&channel.channel_id()) {
Some(c) => c.update(channel),
None => { channels.insert(channel.channel_id(), Channel::from(channel)); },
}
},
Ok(proto::Packet::UserState(user)) => { Ok(proto::Packet::UserState(user)) => {
tracing::debug!("updating user state: {:?}", user); tracing::debug!("updating user state: {:?}", user);
let mut users = users.write().await; let mut users = users.write().await;
@ -152,6 +166,7 @@ impl Session {
} }
} }
users.write().await.clear(); users.write().await.clear();
channels.write().await.clear();
}); });
tasks.spawn(async move { tasks.spawn(async move {
@ -173,9 +188,10 @@ impl Session {
let username = username.unwrap_or_else(|| ".mumble-stats-api".to_string()); let username = username.unwrap_or_else(|| ".mumble-stats-api".to_string());
let (events, _) = broadcast::channel(64); let (events, _) = broadcast::channel(64);
let s = Session { let session = Session {
events: Arc::new(events), events: Arc::new(events),
users : Arc::new(RwLock::new(HashMap::new())), users : Arc::new(RwLock::new(HashMap::new())),
channels : Arc::new(RwLock::new(HashMap::new())),
run: Arc::new(AtomicBool::new(true)), run: Arc::new(AtomicBool::new(true)),
options: Arc::new(SessionOptions { options: Arc::new(SessionOptions {
username, password, username, password,
@ -184,13 +200,16 @@ impl Session {
}), }),
}; };
let options = s.options.clone(); let options = session.options.clone();
let run = s.run.clone(); let run = session.run.clone();
let users = s.users.clone(); let users = session.users.clone();
let events = s.events.clone(); let channels = session.channels.clone();
let events = session.events.clone();
tokio::spawn(async move { tokio::spawn(async move {
while run.load(std::sync::atomic::Ordering::Relaxed) { while run.load(std::sync::atomic::Ordering::Relaxed) {
if let Err(e) = Self::connect_session(options.clone(), run.clone(), users.clone(), events.clone()).await { if let Err(e) = Self::connect_session(
options.clone(), run.clone(), users.clone(), channels.clone(), events.clone()
).await {
tracing::error!("could not connect to mumble: {e}"); tracing::error!("could not connect to mumble: {e}");
} }
tokio::time::sleep(std::time::Duration::from_secs(10)).await; tokio::time::sleep(std::time::Duration::from_secs(10)).await;
@ -198,6 +217,6 @@ impl Session {
} }
}); });
s session
} }
} }