fix: update list and send event for user removal

This commit is contained in:
əlemi 2024-02-21 21:06:35 +01:00
parent 425fa62213
commit e4b33c11c9
Signed by: alemi
GPG key ID: A4895B84D311642C
2 changed files with 23 additions and 5 deletions

View file

@ -100,9 +100,14 @@ async fn server_ws(ws: WebSocketUpgrade, State(session): State<Arc<Session>>) ->
ws.on_upgrade(|socket| handle_ws(socket, sub)) ws.on_upgrade(|socket| handle_ws(socket, sub))
} }
async fn handle_ws(mut socket: WebSocket, mut sub: broadcast::Receiver<model::User>) { async fn handle_ws(mut socket: WebSocket, mut sub: broadcast::Receiver<session::SessionEvent>) {
while let Ok(event) = sub.recv().await { while let Ok(event) = sub.recv().await {
socket.send(Message::Text(serde_json::to_string(&event).unwrap())).await.unwrap(); match event {
session::SessionEvent::AddUser(user) =>
socket.send(Message::Text(serde_json::to_string(&user).expect("could not serialize user"))).await.unwrap(),
session::SessionEvent::RemoveUser(id) =>
socket.send(Message::Text(format!("{{\"remove\":{id}}}"))).await.unwrap(),
}
} }
} }

View file

@ -11,7 +11,13 @@ pub struct Session {
host: String, host: String,
sync: watch::Receiver<bool>, sync: watch::Receiver<bool>,
drop: mpsc::Sender<()>, drop: mpsc::Sender<()>,
events: broadcast::Sender<User>, events: broadcast::Sender<SessionEvent>,
}
#[derive(Debug, Clone)]
pub enum SessionEvent {
AddUser(User),
RemoveUser(u32),
} }
impl Drop for Session { impl Drop for Session {
@ -75,7 +81,7 @@ impl Session {
self.host.to_string() self.host.to_string()
} }
pub fn events(&self) -> broadcast::Receiver<User> { pub fn events(&self) -> broadcast::Receiver<SessionEvent> {
self.events.subscribe() self.events.subscribe()
} }
@ -129,6 +135,11 @@ impl Session {
Err(e) => break tracing::warn!("disconnected from server: {}", e), Err(e) => break tracing::warn!("disconnected from server: {}", e),
// Ok(tcp::proto::Packet::TextMessage(msg)) => tracing::info!("{}", msg.message), // Ok(tcp::proto::Packet::TextMessage(msg)) => tracing::info!("{}", msg.message),
// Ok(tcp::proto::Packet::ChannelState(channel)) => tracing::info!("discovered channel: {:?}", channel.name), // Ok(tcp::proto::Packet::ChannelState(channel)) => tracing::info!("discovered channel: {:?}", channel.name),
Ok(proto::Packet::UserRemove(user)) => {
tracing::info!("remove user: {:?}", user);
session.users.write().await.remove(&user.session);
let _ = session.events.send(SessionEvent::RemoveUser(user.session));
},
Ok(proto::Packet::ServerSync(_sync)) => { Ok(proto::Packet::ServerSync(_sync)) => {
tracing::info!("synched: {:?}", _sync); tracing::info!("synched: {:?}", _sync);
ready.send(true).unwrap(); ready.send(true).unwrap();
@ -141,7 +152,9 @@ impl Session {
Some(u) => u.update(user), Some(u) => u.update(user),
None => { users.insert(user.session(), User::from(user)); }, None => { users.insert(user.session(), User::from(user)); },
} }
let _ = session.events.send(users.get(&id).cloned().expect("just inserted")); // if it fails nobody is listening let _ = session.events.send(
SessionEvent::AddUser(users.get(&id).cloned().expect("just inserted"))
); // if it fails nobody is listening
}, },
Ok(pkt) => tracing::info!("ignoring packet {:?}", pkt), Ok(pkt) => tracing::info!("ignoring packet {:?}", pkt),
} }