From d1e73c5f44318081a056538ccaf37b49c9734384 Mon Sep 17 00:00:00 2001 From: cqql Date: Fri, 30 Aug 2024 13:04:07 +0200 Subject: [PATCH] i'm trying, okay? --- server/src/lib.rs | 29 ++++++++++++++++------- server/src/transport/client_connection.rs | 8 +++---- 2 files changed, 25 insertions(+), 12 deletions(-) diff --git a/server/src/lib.rs b/server/src/lib.rs index 3fcdd20..c4d9f3b 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -33,20 +33,20 @@ pub async fn serve(addr: &str, db_connection: DatabaseConnection) -> Result<(), while let Some((tx, rx)) = listener.recv().await { let client = ClientConnection::new((tx, rx)); tracing::info!("new connection, starting handler"); - state.lock().await.push(tx); + state.lock().await.push(&client); let _state = state.clone(); let handler_db_conn = db_connection.clone(); // TODO need .boxed() because https://github.com/rust-lang/rust/issues/100013 - tokio::spawn(async move { handle(_state, rx, handler_db_conn).boxed().await }); + tokio::spawn(async move { handle_client_connection(_state, client, handler_db_conn).boxed().await }); } Ok(()) } /// Connection handler for handling a connecting client -async fn handle(active_client_peers: Arc>>, mut rx: impl TransportStream, db_connection: impl ConnectionTrait) { +async fn handle_client_connection(active_client_peers: Arc>>, mut client: ClientConnection, db_connection: impl ConnectionTrait) { // for now we're creating a new user for each connection let new_client_id = uuid::Uuid::new_v4(); let user = users::Model{ @@ -60,11 +60,14 @@ async fn handle(active_client_peers: Arc>>, mut rx updated: None }; + let fn_span = tracing::span!(tracing::Level::TRACE, "handle_client_connection", client_uuid = client.uuid.to_string()); + let _enter = fn_span.enter(); + user.clone().into_active_model().insert(&db_connection).await.expect("failed adding a new user for the connection"); // continuously waiting then receiving new messages loop { - match rx.pop().await { // + match client.rx.pop().await { // Err(e) => { tracing::error!("error receiving from client: {e} -- stopping"); break; @@ -79,12 +82,20 @@ async fn handle(active_client_peers: Arc>>, mut rx } let mut disconnected_peers_to_remove = Vec::new(); - for (i, tx) in active_client_peers.lock().await.iter_mut().enumerate() { - if let Err(e) = tx.push(c2s::c::Packet::Message(msg.clone())).await { + for (i, other_client) in active_client_peers.lock().await.iter_mut().enumerate() { + if other_client.uuid == client.uuid { + // let's not transmit our message to ourselves + continue; + } + + // try transmitting the message and if it fails, treat the client as disconnected + if let Err(e) = other_client.tx.push(c2s::c::Packet::Message(msg.clone())).await { tracing::error!("failed broadcasting: {e}"); disconnected_peers_to_remove.push(i); } } + + // remove all disconnected clients for i in disconnected_peers_to_remove.into_iter().rev() { active_client_peers.lock().await.remove(i); } @@ -99,8 +110,10 @@ async fn handle(active_client_peers: Arc>>, mut rx Err(_) => { tracing::error!("failed to query all previously sent messages"); } Ok(messages) => { tracing::info!("responding to RequestChatHistory: {}", serde_json::to_string(&messages).unwrap()); - // how do i know which TransportSink is the sink of the handlee? (which tx is me?) - todo!(); + match client.tx.push(c2s::c::Packet::MessagesHistory(messages)).await { + Ok(_) => {tracing::debug!("responding to RequestChatHistory was successful.")}, + Err(e) => {tracing::error!("Failed to respond to RequestChatHistory: {}", e)}, + }; } }; }, diff --git a/server/src/transport/client_connection.rs b/server/src/transport/client_connection.rs index 945eaf6..29e48bc 100644 --- a/server/src/transport/client_connection.rs +++ b/server/src/transport/client_connection.rs @@ -12,10 +12,10 @@ use uuid::Uuid; use super::{TransportSink, TransportStream}; pub struct ClientConnection { - uuid: Uuid, - tx: Box, - rx: Box, - user_uuid: Option // gets set if and only if a user is logged in with this connection + pub uuid: Uuid, + pub tx: Box, + pub rx: Box, + pub user_uuid: Option // gets set if and only if a user is logged in with this connection } impl ClientConnection {