Compare commits
4 commits
dev
...
sending-ch
Author | SHA1 | Date | |
---|---|---|---|
63e59f092e | |||
d1e73c5f44 | |||
0f5dfb2c5c | |||
d2985c3be8 |
6 changed files with 223 additions and 17 deletions
102
.vscode/launch.json
vendored
Normal file
102
.vscode/launch.json
vendored
Normal file
|
@ -0,0 +1,102 @@
|
||||||
|
{
|
||||||
|
// Use IntelliSense to learn about possible attributes.
|
||||||
|
// Hover to view descriptions of existing attributes.
|
||||||
|
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
|
||||||
|
"version": "0.2.0",
|
||||||
|
"configurations": [
|
||||||
|
{
|
||||||
|
"type": "lldb",
|
||||||
|
"request": "launch",
|
||||||
|
"name": "Debug unit tests in library 'scct_migrations'",
|
||||||
|
"cargo": {
|
||||||
|
"args": [
|
||||||
|
"test",
|
||||||
|
"--no-run",
|
||||||
|
"--lib",
|
||||||
|
"--package=scct-migrations"
|
||||||
|
],
|
||||||
|
"filter": {
|
||||||
|
"name": "scct_migrations",
|
||||||
|
"kind": "lib"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"args": [],
|
||||||
|
"cwd": "${workspaceFolder}"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"type": "lldb",
|
||||||
|
"request": "launch",
|
||||||
|
"name": "Debug unit tests in library 'scct_server'",
|
||||||
|
"cargo": {
|
||||||
|
"args": [
|
||||||
|
"test",
|
||||||
|
"--no-run",
|
||||||
|
"--lib",
|
||||||
|
"--package=scct-server"
|
||||||
|
],
|
||||||
|
"filter": {
|
||||||
|
"name": "scct_server",
|
||||||
|
"kind": "lib"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"args": [],
|
||||||
|
"cwd": "${workspaceFolder}"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"type": "lldb",
|
||||||
|
"request": "launch",
|
||||||
|
"name": "Debug unit tests in library 'scct_model'",
|
||||||
|
"cargo": {
|
||||||
|
"args": [
|
||||||
|
"test",
|
||||||
|
"--no-run",
|
||||||
|
"--lib",
|
||||||
|
"--package=scct-model"
|
||||||
|
],
|
||||||
|
"filter": {
|
||||||
|
"name": "scct_model",
|
||||||
|
"kind": "lib"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"args": [],
|
||||||
|
"cwd": "${workspaceFolder}"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"type": "lldb",
|
||||||
|
"request": "launch",
|
||||||
|
"name": "Debug executable 'scct' server",
|
||||||
|
"cargo": {
|
||||||
|
"args": [
|
||||||
|
"build",
|
||||||
|
"--bin=scct",
|
||||||
|
"--package=scct-cli"
|
||||||
|
],
|
||||||
|
"filter": {
|
||||||
|
"name": "scct",
|
||||||
|
"kind": "bin"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"args": ["--debug", "serve"],
|
||||||
|
"cwd": "${workspaceFolder}"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"type": "lldb",
|
||||||
|
"request": "launch",
|
||||||
|
"name": "Debug unit tests in executable 'scct'",
|
||||||
|
"cargo": {
|
||||||
|
"args": [
|
||||||
|
"test",
|
||||||
|
"--no-run",
|
||||||
|
"--bin=scct",
|
||||||
|
"--package=scct-cli"
|
||||||
|
],
|
||||||
|
"filter": {
|
||||||
|
"name": "scct",
|
||||||
|
"kind": "bin"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"args": [],
|
||||||
|
"cwd": "${workspaceFolder}"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
17
README.md
17
README.md
|
@ -11,3 +11,20 @@ tldr:
|
||||||
* sea_orm and sqlx are huge but ehh db flexibility is nice feature
|
* sea_orm and sqlx are huge but ehh db flexibility is nice feature
|
||||||
* i have no clue honestly
|
* i have no clue honestly
|
||||||
* super cool chat thing yayyyyyyyy
|
* super cool chat thing yayyyyyyyy
|
||||||
|
|
||||||
|
## Example serverbound messages
|
||||||
|
|
||||||
|
### Request chat history of chat room
|
||||||
|
|
||||||
|
The uuid is the chat room id
|
||||||
|
|
||||||
|
```json
|
||||||
|
{"RequestChatHistory":"00000000-0000-0000-0000-000000000000"}
|
||||||
|
```
|
||||||
|
|
||||||
|
### Send message to a chat room
|
||||||
|
|
||||||
|
```json
|
||||||
|
{"Message":{"id":"bef168ac-cee2-4319-9020-ef14a0778338","user_id":"67662e56-751a-4124-b611-0b54e53bd983","chat_id":"00000000-0000-0000-0000-000000000000","content":"hello world!","created":"2024-09-01T11:29:35.777275126Z","updated":null}}
|
||||||
|
```
|
||||||
|
|
||||||
|
|
|
@ -4,6 +4,7 @@ pub mod s {
|
||||||
#[derive(Debug, serde::Serialize, serde::Deserialize)]
|
#[derive(Debug, serde::Serialize, serde::Deserialize)]
|
||||||
pub enum Packet {
|
pub enum Packet {
|
||||||
Message(crate::messages::Model),
|
Message(crate::messages::Model),
|
||||||
|
RequestChatHistory(uuid::Uuid)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -13,6 +14,7 @@ pub mod c {
|
||||||
#[derive(Debug, serde::Serialize, serde::Deserialize)]
|
#[derive(Debug, serde::Serialize, serde::Deserialize)]
|
||||||
pub enum Packet {
|
pub enum Packet {
|
||||||
Message(crate::messages::Model),
|
Message(crate::messages::Model),
|
||||||
|
MessagesHistory(Vec<crate::messages::Model>),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -20,15 +22,15 @@ pub mod c {
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod test {
|
mod test {
|
||||||
#[test]
|
#[test]
|
||||||
fn how_does_this_serialize_again() {
|
fn example_message() {
|
||||||
panic!(
|
panic!(
|
||||||
"{}",
|
"{}",
|
||||||
serde_json::to_string(
|
serde_json::to_string(
|
||||||
&super::c::Packet::Message(
|
&super::s::Packet::Message(
|
||||||
crate::messages::Model {
|
crate::messages::Model {
|
||||||
id: uuid::Uuid::new_v4(),
|
id: uuid::Uuid::new_v4(),
|
||||||
user_id: uuid::Uuid::new_v4(),
|
user_id: uuid::Uuid::new_v4(),
|
||||||
chat_id: uuid::Uuid::new_v4(),
|
chat_id: uuid::Uuid::nil(),
|
||||||
content: Some("hello world!".to_string()),
|
content: Some("hello world!".to_string()),
|
||||||
created: chrono::Utc::now(),
|
created: chrono::Utc::now(),
|
||||||
updated: None,
|
updated: None,
|
||||||
|
@ -37,4 +39,16 @@ mod test {
|
||||||
).expect("uhmmm???")
|
).expect("uhmmm???")
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn example_request_chat_history() {
|
||||||
|
panic!(
|
||||||
|
"{}",
|
||||||
|
serde_json::to_string(
|
||||||
|
&super::s::Packet::RequestChatHistory(
|
||||||
|
uuid::Uuid::nil()
|
||||||
|
)
|
||||||
|
).expect("uhmmm???")
|
||||||
|
)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,10 +1,10 @@
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use futures_util::FutureExt;
|
use futures_util::FutureExt;
|
||||||
use scct_model::{chats, proto::c2s, users};
|
use scct_model::{chats, messages, proto::c2s, users};
|
||||||
use sea_orm::{ActiveModelTrait, ConnectionTrait, DatabaseConnection, IntoActiveModel};
|
use sea_orm::{ActiveModelTrait, ConnectionTrait, DatabaseConnection, EntityTrait, IntoActiveModel, ColumnTrait, QueryFilter, QueryOrder};
|
||||||
use tokio::sync::Mutex;
|
use tokio::sync::Mutex;
|
||||||
use transport::{Transport, TransportSink, TransportStream};
|
use transport::{Transport, client_connection::ClientConnection};
|
||||||
use uuid;
|
use uuid;
|
||||||
|
|
||||||
pub mod transport;
|
pub mod transport;
|
||||||
|
@ -17,29 +17,36 @@ pub async fn serve(addr: &str, db_connection: DatabaseConnection) -> Result<(),
|
||||||
tracing::info!("preparing C2S transport");
|
tracing::info!("preparing C2S transport");
|
||||||
let mut listener = c2s.serve(addr.to_string()).await?;
|
let mut listener = c2s.serve(addr.to_string()).await?;
|
||||||
|
|
||||||
let chat = chats::Model {
|
// create the dev-test chat if it doesn't exist
|
||||||
id: uuid::Uuid::nil(), // temporarily we have only one chat
|
if let Ok(None) = chats::Entity::find_by_id(uuid::Uuid::nil()).one(&db_connection).await {
|
||||||
name: String::from("global")
|
let chat = chats::Model {
|
||||||
};
|
id: uuid::Uuid::nil(), // temporarily we have only one chat
|
||||||
chat.clone().into_active_model().insert(&db_connection).await.expect("failed to create global chat");
|
name: String::from("global")
|
||||||
|
};
|
||||||
|
match chat.clone().into_active_model().insert(&db_connection).await {
|
||||||
|
Ok(_) => {},
|
||||||
|
Err(_) => {},
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
tracing::info!("listening for connections");
|
tracing::info!("listening for connections");
|
||||||
while let Some((tx, rx)) = listener.recv().await {
|
while let Some((tx, rx)) = listener.recv().await {
|
||||||
|
let client = Arc::new(Mutex::new(ClientConnection::new((tx, rx))));
|
||||||
tracing::info!("new connection, starting handler");
|
tracing::info!("new connection, starting handler");
|
||||||
state.lock().await.push(tx);
|
state.lock().await.push(client.clone());
|
||||||
let _state = state.clone();
|
let _state = state.clone();
|
||||||
|
|
||||||
let handler_db_conn = db_connection.clone();
|
let handler_db_conn = db_connection.clone();
|
||||||
|
|
||||||
// TODO need .boxed() because https://github.com/rust-lang/rust/issues/100013
|
// TODO need .boxed() because https://github.com/rust-lang/rust/issues/100013
|
||||||
tokio::spawn(async move { handle(_state, rx, handler_db_conn).boxed().await });
|
tokio::spawn(async move { handle_client_connection(_state, client, handler_db_conn).boxed().await });
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Connection handler for handling a connecting client
|
/// Connection handler for handling a connecting client
|
||||||
async fn handle(active_client_peers: Arc<Mutex<Vec<impl TransportSink>>>, mut rx: impl TransportStream, db_connection: impl ConnectionTrait) {
|
async fn handle_client_connection(active_client_peers: Arc<Mutex<Vec<Arc<Mutex<ClientConnection>>>>>, client: Arc<Mutex<ClientConnection>>, db_connection: impl ConnectionTrait) {
|
||||||
// for now we're creating a new user for each connection
|
// for now we're creating a new user for each connection
|
||||||
let new_client_id = uuid::Uuid::new_v4();
|
let new_client_id = uuid::Uuid::new_v4();
|
||||||
let user = users::Model{
|
let user = users::Model{
|
||||||
|
@ -53,11 +60,16 @@ async fn handle(active_client_peers: Arc<Mutex<Vec<impl TransportSink>>>, mut rx
|
||||||
updated: None
|
updated: None
|
||||||
};
|
};
|
||||||
|
|
||||||
|
let fn_span = tracing::span!(tracing::Level::TRACE, "handle_client_connection", client_uuid = client.lock().await.uuid.to_string());
|
||||||
|
let _enter = fn_span.enter();
|
||||||
|
|
||||||
|
tracing::debug!("Inside handler for client connection: {}", client.lock().await.uuid);
|
||||||
|
|
||||||
user.clone().into_active_model().insert(&db_connection).await.expect("failed adding a new user for the connection");
|
user.clone().into_active_model().insert(&db_connection).await.expect("failed adding a new user for the connection");
|
||||||
|
|
||||||
// continuously waiting then receiving new messages
|
// continuously waiting then receiving new messages
|
||||||
loop {
|
loop {
|
||||||
match rx.pop().await { //
|
match client.lock().await.rx.lock().await.pop().await {
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
tracing::error!("error receiving from client: {e} -- stopping");
|
tracing::error!("error receiving from client: {e} -- stopping");
|
||||||
break;
|
break;
|
||||||
|
@ -72,16 +84,42 @@ async fn handle(active_client_peers: Arc<Mutex<Vec<impl TransportSink>>>, mut rx
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut disconnected_peers_to_remove = Vec::new();
|
let mut disconnected_peers_to_remove = Vec::new();
|
||||||
for (i, tx) in active_client_peers.lock().await.iter_mut().enumerate() {
|
for (i, other_client) in active_client_peers.lock().await.iter_mut().enumerate() {
|
||||||
if let Err(e) = tx.push(c2s::c::Packet::Message(msg.clone())).await {
|
tracing::debug!("sending message from {} to {}", client.lock().await.uuid, other_client.lock().await.uuid);
|
||||||
|
if other_client.lock().await.uuid == client.lock().await.uuid {
|
||||||
|
// let's not transmit our message to ourselves
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// try transmitting the message and if it fails, treat the client as disconnected
|
||||||
|
if let Err(e) = other_client.lock().await.tx.lock().await.push(c2s::c::Packet::Message(msg.clone())).await {
|
||||||
tracing::error!("failed broadcasting: {e}");
|
tracing::error!("failed broadcasting: {e}");
|
||||||
disconnected_peers_to_remove.push(i);
|
disconnected_peers_to_remove.push(i);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// remove all disconnected clients
|
||||||
for i in disconnected_peers_to_remove.into_iter().rev() {
|
for i in disconnected_peers_to_remove.into_iter().rev() {
|
||||||
active_client_peers.lock().await.remove(i);
|
active_client_peers.lock().await.remove(i);
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
c2s::s::Packet::RequestChatHistory(chat_uuid) => {
|
||||||
|
// send the user all messages in the history of this chat
|
||||||
|
match messages::Entity::find()
|
||||||
|
.filter(messages::Column::ChatId.eq(chat_uuid))
|
||||||
|
.order_by_asc(messages::Column::Created)
|
||||||
|
.all(&db_connection)
|
||||||
|
.await {
|
||||||
|
Err(_) => { tracing::error!("failed to query all previously sent messages"); }
|
||||||
|
Ok(messages) => {
|
||||||
|
tracing::info!("responding to RequestChatHistory: {}", serde_json::to_string(&messages).unwrap());
|
||||||
|
match client.lock().await.tx.lock().await.push(c2s::c::Packet::MessagesHistory(messages)).await {
|
||||||
|
Ok(_) => {tracing::debug!("responding to RequestChatHistory was successful.")},
|
||||||
|
Err(e) => {tracing::error!("Failed to respond to RequestChatHistory: {}", e)},
|
||||||
|
};
|
||||||
|
}
|
||||||
|
};
|
||||||
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
33
server/src/transport/client_connection.rs
Normal file
33
server/src/transport/client_connection.rs
Normal file
|
@ -0,0 +1,33 @@
|
||||||
|
// Wrapper around a (TransportStream, TransportSink) that represents a single
|
||||||
|
// client connection. It's self-aware - it gets a uuid so we can distinguish
|
||||||
|
// different connections and find the same one even if it's in different
|
||||||
|
// variables.
|
||||||
|
// Later into the life of a connection, it can be assigned a user uuid,
|
||||||
|
// when a user logs in using that connection. That can also be unassigned
|
||||||
|
// when the user logs out.
|
||||||
|
// Basically lots of helper facilities.
|
||||||
|
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use tokio::sync::Mutex;
|
||||||
|
use uuid::Uuid;
|
||||||
|
|
||||||
|
use super::{TransportSink, TransportStream};
|
||||||
|
|
||||||
|
pub struct ClientConnection {
|
||||||
|
pub uuid: Uuid,
|
||||||
|
pub tx: Arc<Mutex<dyn TransportSink>>,
|
||||||
|
pub rx: Arc<Mutex<dyn TransportStream>>,
|
||||||
|
pub user_uuid: Option<Uuid> // gets set if and only if a user is logged in with this connection
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ClientConnection {
|
||||||
|
pub fn new((tx, rx): (impl TransportSink + 'static, impl TransportStream + 'static)) -> ClientConnection {
|
||||||
|
ClientConnection {
|
||||||
|
uuid: Uuid::new_v4(),
|
||||||
|
tx: Arc::new(Mutex::new(tx)),
|
||||||
|
rx: Arc::new(Mutex::new(rx)),
|
||||||
|
user_uuid: None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -10,6 +10,8 @@
|
||||||
//!
|
//!
|
||||||
//! so basically most likely everything down here will get coupled more and merged with the rest
|
//! so basically most likely everything down here will get coupled more and merged with the rest
|
||||||
|
|
||||||
|
pub mod client_connection;
|
||||||
|
|
||||||
#[cfg(feature = "websocket")]
|
#[cfg(feature = "websocket")]
|
||||||
pub mod websocket;
|
pub mod websocket;
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue