i'm trying, okay?

This commit is contained in:
cqql 2024-08-30 13:04:07 +02:00
parent 0f5dfb2c5c
commit d1e73c5f44
2 changed files with 25 additions and 12 deletions

View file

@ -33,20 +33,20 @@ pub async fn serve(addr: &str, db_connection: DatabaseConnection) -> Result<(),
while let Some((tx, rx)) = listener.recv().await {
let client = ClientConnection::new((tx, rx));
tracing::info!("new connection, starting handler");
state.lock().await.push(tx);
state.lock().await.push(&client);
let _state = state.clone();
let handler_db_conn = db_connection.clone();
// TODO need .boxed() because https://github.com/rust-lang/rust/issues/100013
tokio::spawn(async move { handle(_state, rx, handler_db_conn).boxed().await });
tokio::spawn(async move { handle_client_connection(_state, client, handler_db_conn).boxed().await });
}
Ok(())
}
/// Connection handler for handling a connecting client
async fn handle(active_client_peers: Arc<Mutex<Vec<impl TransportSink>>>, mut rx: impl TransportStream, db_connection: impl ConnectionTrait) {
async fn handle_client_connection(active_client_peers: Arc<Mutex<Vec<&ClientConnection>>>, mut client: ClientConnection, db_connection: impl ConnectionTrait) {
// for now we're creating a new user for each connection
let new_client_id = uuid::Uuid::new_v4();
let user = users::Model{
@ -60,11 +60,14 @@ async fn handle(active_client_peers: Arc<Mutex<Vec<impl TransportSink>>>, mut rx
updated: None
};
let fn_span = tracing::span!(tracing::Level::TRACE, "handle_client_connection", client_uuid = client.uuid.to_string());
let _enter = fn_span.enter();
user.clone().into_active_model().insert(&db_connection).await.expect("failed adding a new user for the connection");
// continuously waiting then receiving new messages
loop {
match rx.pop().await { //
match client.rx.pop().await { //
Err(e) => {
tracing::error!("error receiving from client: {e} -- stopping");
break;
@ -79,12 +82,20 @@ async fn handle(active_client_peers: Arc<Mutex<Vec<impl TransportSink>>>, mut rx
}
let mut disconnected_peers_to_remove = Vec::new();
for (i, tx) in active_client_peers.lock().await.iter_mut().enumerate() {
if let Err(e) = tx.push(c2s::c::Packet::Message(msg.clone())).await {
for (i, other_client) in active_client_peers.lock().await.iter_mut().enumerate() {
if other_client.uuid == client.uuid {
// let's not transmit our message to ourselves
continue;
}
// try transmitting the message and if it fails, treat the client as disconnected
if let Err(e) = other_client.tx.push(c2s::c::Packet::Message(msg.clone())).await {
tracing::error!("failed broadcasting: {e}");
disconnected_peers_to_remove.push(i);
}
}
// remove all disconnected clients
for i in disconnected_peers_to_remove.into_iter().rev() {
active_client_peers.lock().await.remove(i);
}
@ -99,8 +110,10 @@ async fn handle(active_client_peers: Arc<Mutex<Vec<impl TransportSink>>>, mut rx
Err(_) => { tracing::error!("failed to query all previously sent messages"); }
Ok(messages) => {
tracing::info!("responding to RequestChatHistory: {}", serde_json::to_string(&messages).unwrap());
// how do i know which TransportSink is the sink of the handlee? (which tx is me?)
todo!();
match client.tx.push(c2s::c::Packet::MessagesHistory(messages)).await {
Ok(_) => {tracing::debug!("responding to RequestChatHistory was successful.")},
Err(e) => {tracing::error!("Failed to respond to RequestChatHistory: {}", e)},
};
}
};
},

View file

@ -12,10 +12,10 @@ use uuid::Uuid;
use super::{TransportSink, TransportStream};
pub struct ClientConnection {
uuid: Uuid,
tx: Box<dyn TransportSink>,
rx: Box<dyn TransportStream>,
user_uuid: Option<Uuid> // gets set if and only if a user is logged in with this connection
pub uuid: Uuid,
pub tx: Box<dyn TransportSink>,
pub rx: Box<dyn TransportStream>,
pub user_uuid: Option<Uuid> // gets set if and only if a user is logged in with this connection
}
impl ClientConnection {