persisting messages as incoming to one global chat and each connection being a new user

This commit is contained in:
cqql 2024-08-12 19:44:38 +02:00
parent 5dfefcc62c
commit 495b409adf
9 changed files with 116 additions and 51 deletions

1
.gitignore vendored
View file

@ -1 +1,2 @@
/target /target
scct.db

87
Cargo.lock generated
View file

@ -495,17 +495,6 @@ dependencies = [
"serde", "serde",
] ]
[[package]]
name = "derivative"
version = "2.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fcc3dd5e9e9c0b295d6e1e4d811fb6f157d5ffd784b8d202fc62eac8035a770b"
dependencies = [
"proc-macro2",
"quote",
"syn 1.0.109",
]
[[package]] [[package]]
name = "digest" name = "digest"
version = "0.10.7" version = "0.10.7"
@ -524,6 +513,18 @@ version = "0.15.7"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1aaf95b3e5c8f23aa320147307562d361db0ae0d51242340f558153b4eb2439b" checksum = "1aaf95b3e5c8f23aa320147307562d361db0ae0d51242340f558153b4eb2439b"
[[package]]
name = "educe"
version = "0.5.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e4bd92664bf78c4d3dba9b7cdafce6fa15b13ed3ed16175218196942e99168a8"
dependencies = [
"enum-ordinalize",
"proc-macro2",
"quote",
"syn 2.0.67",
]
[[package]] [[package]]
name = "either" name = "either"
version = "1.12.0" version = "1.12.0"
@ -533,6 +534,26 @@ dependencies = [
"serde", "serde",
] ]
[[package]]
name = "enum-ordinalize"
version = "4.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fea0dcfa4e54eeb516fe454635a95753ddd39acda650ce703031c6973e315dd5"
dependencies = [
"enum-ordinalize-derive",
]
[[package]]
name = "enum-ordinalize-derive"
version = "4.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0d28318a75d4aead5c4db25382e8ef717932d0346600cacae6357eb5941bc5ff"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.67",
]
[[package]] [[package]]
name = "equivalent" name = "equivalent"
version = "1.0.1" version = "1.0.1"
@ -1605,6 +1626,7 @@ checksum = "f3cb5ba0dc43242ce17de99c180e96db90b235b8a9fdc9543c96d2209116bd9f"
name = "scct-cli" name = "scct-cli"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"chrono",
"clap", "clap",
"scct-migrations", "scct-migrations",
"scct-server", "scct-server",
@ -1637,14 +1659,17 @@ name = "scct-server"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"async-trait", "async-trait",
"chrono",
"futures-util", "futures-util",
"scct-model", "scct-model",
"sea-orm",
"serde", "serde",
"serde_json", "serde_json",
"thiserror", "thiserror",
"tokio", "tokio",
"tokio-tungstenite", "tokio-tungstenite",
"tracing", "tracing",
"uuid",
] ]
[[package]] [[package]]
@ -1677,9 +1702,9 @@ dependencies = [
[[package]] [[package]]
name = "sea-orm" name = "sea-orm"
version = "0.12.15" version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c8814e37dc25de54398ee62228323657520b7f29713b8e238649385dbe473ee0" checksum = "f9d4ec1cdd8bdd3553d3c946079f58efa33fedc477f32603652652abcef96fe6"
dependencies = [ dependencies = [
"async-stream", "async-stream",
"async-trait", "async-trait",
@ -1705,9 +1730,9 @@ dependencies = [
[[package]] [[package]]
name = "sea-orm-cli" name = "sea-orm-cli"
version = "0.12.15" version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "620bc560062ae251b1366bde43b3f1508445cab5c2c8cbdb397034638ab1b357" checksum = "d525eee597817631f800857b0c2fe8645ec9c65b4304176a44bf14b93366009e"
dependencies = [ dependencies = [
"chrono", "chrono",
"clap", "clap",
@ -1722,9 +1747,9 @@ dependencies = [
[[package]] [[package]]
name = "sea-orm-macros" name = "sea-orm-macros"
version = "0.12.15" version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5e115c6b078e013aa963cc2d38c196c2c40b05f03d0ac872fe06b6e0d5265603" checksum = "f363ead48b625a6f8f905322a820464f728fa4fe4f1c222bed5234ccf8fb8555"
dependencies = [ dependencies = [
"heck 0.4.1", "heck 0.4.1",
"proc-macro2", "proc-macro2",
@ -1736,9 +1761,9 @@ dependencies = [
[[package]] [[package]]
name = "sea-orm-migration" name = "sea-orm-migration"
version = "0.12.15" version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ee8269bc6ff71afd6b78aa4333ac237a69eebd2cdb439036291e64fb4b8db23c" checksum = "5df62369752f91b9295f8d71c146d84f818301a9ae7295106ebe8bbaa989a072"
dependencies = [ dependencies = [
"async-trait", "async-trait",
"clap", "clap",
@ -1753,13 +1778,13 @@ dependencies = [
[[package]] [[package]]
name = "sea-query" name = "sea-query"
version = "0.30.7" version = "0.31.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4166a1e072292d46dc91f31617c2a1cdaf55a8be4b5c9f4bf2ba248e3ac4999b" checksum = "7e5073b2cfed767511a57d18115f3b3d8bcb5690bf8c89518caec6cb22c0cd74"
dependencies = [ dependencies = [
"bigdecimal", "bigdecimal",
"chrono", "chrono",
"derivative", "educe",
"inherent", "inherent",
"ordered-float", "ordered-float",
"rust_decimal", "rust_decimal",
@ -1771,9 +1796,9 @@ dependencies = [
[[package]] [[package]]
name = "sea-query-binder" name = "sea-query-binder"
version = "0.5.0" version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "36bbb68df92e820e4d5aeb17b4acd5cc8b5d18b2c36a4dd6f4626aabfa7ab1b9" checksum = "754965d4aee6145bec25d0898e5c931e6c22859789ce62fd85a42a15ed5a8ce3"
dependencies = [ dependencies = [
"bigdecimal", "bigdecimal",
"chrono", "chrono",
@ -1800,9 +1825,9 @@ dependencies = [
[[package]] [[package]]
name = "sea-schema" name = "sea-schema"
version = "0.14.2" version = "0.15.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "30d148608012d25222442d1ebbfafd1228dbc5221baf4ec35596494e27a2394e" checksum = "ad52149fc81836ea7424c3425d8f6ed8ad448dd16d2e4f6a3907ba46f3f2fd78"
dependencies = [ dependencies = [
"futures", "futures",
"sea-query", "sea-query",
@ -1811,14 +1836,14 @@ dependencies = [
[[package]] [[package]]
name = "sea-schema-derive" name = "sea-schema-derive"
version = "0.2.0" version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c6f686050f76bffc4f635cda8aea6df5548666b830b52387e8bc7de11056d11e" checksum = "debdc8729c37fdbf88472f97fd470393089f997a909e535ff67c544d18cfccf0"
dependencies = [ dependencies = [
"heck 0.4.1", "heck 0.4.1",
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 1.0.109", "syn 2.0.67",
] ]
[[package]] [[package]]
@ -2236,9 +2261,9 @@ checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f"
[[package]] [[package]]
name = "strum" name = "strum"
version = "0.25.0" version = "0.26.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "290d54ea6f91c969195bdbcd7442c8c2a2ba87da8bf60a7ee86a235d4bc1e125" checksum = "8fec0f0aef304996cf250b31b5a10dee7980c85da9d759361292b8bca5a18f06"
[[package]] [[package]]
name = "subtle" name = "subtle"

View file

@ -18,11 +18,12 @@ path = "main.rs"
tracing = "0.1" tracing = "0.1"
tracing-subscriber = "0.3" tracing-subscriber = "0.3"
clap = { version = "4.5", features = ["derive"] } clap = { version = "4.5", features = ["derive"] }
sea-orm = { version = "0.12", features = ["runtime-tokio-native-tls", "sqlx-mysql", "sqlx-sqlite", "sqlx-postgres"] } sea-orm = { version = "1.0", features = ["runtime-tokio-native-tls", "sqlx-mysql", "sqlx-sqlite", "sqlx-postgres"] }
tokio = { version = "1.38.0", features = ["full"] } tokio = { version = "1.38.0", features = ["full"] }
scct-migrations = { path = "migrations", optional = true } scct-migrations = { path = "migrations", optional = true }
scct-server = { path = "server", optional = true } scct-server = { path = "server", optional = true }
chrono = "0.4.38"
[features] [features]
default = ["server", "migrations"] default = ["server", "migrations"]

View file

@ -55,7 +55,7 @@ async fn main() {
#[cfg(feature = "server")] #[cfg(feature = "server")]
Command::Serve { addr } => Command::Serve { addr } =>
scct_server::serve(&addr) scct_server::serve(&addr, db)
.await .await
.expect("scct server terminated with exception"), .expect("scct server terminated with exception"),
} }

View file

@ -6,4 +6,4 @@ edition = "2021"
[lib] [lib]
[dependencies] [dependencies]
sea-orm-migration = "0.12" sea-orm-migration = "1.0"

View file

@ -6,7 +6,7 @@ edition = "2021"
[lib] [lib]
[dependencies] [dependencies]
sea-orm = "0.12" sea-orm = "1.0"
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
uuid = { version = "1.9.1", features = ["v4"] } uuid = { version = "1.9.1", features = ["v4"] }
chrono = { version = "0.4.38", features = ["serde"] } chrono = { version = "0.4.38", features = ["serde"] }

View file

@ -3,7 +3,7 @@
pub mod s { pub mod s {
#[derive(Debug, serde::Serialize, serde::Deserialize)] #[derive(Debug, serde::Serialize, serde::Deserialize)]
pub enum Packet { pub enum Packet {
Chat(crate::messages::Model), Message(crate::messages::Model),
} }
} }
@ -12,7 +12,7 @@ pub mod s {
pub mod c { pub mod c {
#[derive(Debug, serde::Serialize, serde::Deserialize)] #[derive(Debug, serde::Serialize, serde::Deserialize)]
pub enum Packet { pub enum Packet {
Chat(crate::messages::Model), Message(crate::messages::Model),
} }
} }
@ -24,7 +24,7 @@ mod test {
panic!( panic!(
"{}", "{}",
serde_json::to_string( serde_json::to_string(
&super::c::Packet::Chat( &super::c::Packet::Message(
crate::messages::Model { crate::messages::Model {
id: uuid::Uuid::new_v4(), id: uuid::Uuid::new_v4(),
user_id: uuid::Uuid::new_v4(), user_id: uuid::Uuid::new_v4(),

View file

@ -14,6 +14,11 @@ futures-util = "0.3.30"
async-trait = "0.1.80" async-trait = "0.1.80"
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
serde_json = { version = "1.0.118", optional = true } serde_json = { version = "1.0.118", optional = true }
sea-orm = { version = "1.0" }
chrono = "0.4.38"
uuid = { version = "1.9.1", features = ["v4"] }
scct-model = { path = "../model/" } scct-model = { path = "../model/" }

View file

@ -1,13 +1,15 @@
use std::sync::Arc; use std::sync::Arc;
use futures_util::FutureExt; use futures_util::FutureExt;
use scct_model::proto::c2s; use scct_model::{chats, proto::c2s, users};
use sea_orm::{ActiveModelTrait, ConnectionTrait, DatabaseConnection, IntoActiveModel};
use tokio::sync::Mutex; use tokio::sync::Mutex;
use transport::{Transport, TransportSink, TransportStream}; use transport::{Transport, TransportSink, TransportStream};
use uuid;
pub mod transport; pub mod transport;
pub async fn serve(addr: &str) -> Result<(), Box<dyn std::error::Error>> { pub async fn serve(addr: &str, db_connection: DatabaseConnection) -> Result<(), Box<dyn std::error::Error>> {
let state = Arc::new(Mutex::new(Vec::new())); let state = Arc::new(Mutex::new(Vec::new()));
let c2s = transport::new(); let c2s = transport::new();
@ -15,38 +17,69 @@ pub async fn serve(addr: &str) -> Result<(), Box<dyn std::error::Error>> {
tracing::info!("preparing C2S transport"); tracing::info!("preparing C2S transport");
let mut listener = c2s.serve(addr.to_string()).await?; let mut listener = c2s.serve(addr.to_string()).await?;
let chat = chats::Model {
id: uuid::Uuid::nil(), // temporarily we have only one chat
name: String::from("global")
};
chat.clone().into_active_model().insert(&db_connection).await.expect("failed to create global chat");
tracing::info!("listening for connections"); tracing::info!("listening for connections");
while let Some((tx, rx)) = listener.recv().await { while let Some((tx, rx)) = listener.recv().await {
tracing::info!("new connection, starting handler"); tracing::info!("new connection, starting handler");
state.lock().await.push(tx); state.lock().await.push(tx);
let _state = state.clone(); let _state = state.clone();
let handler_db_conn = db_connection.clone();
// TODO need .boxed() because https://github.com/rust-lang/rust/issues/100013 // TODO need .boxed() because https://github.com/rust-lang/rust/issues/100013
tokio::spawn(async move { handle(_state, rx).boxed().await }); tokio::spawn(async move { handle(_state, rx, handler_db_conn).boxed().await });
} }
Ok(()) Ok(())
} }
async fn handle(state: Arc<Mutex<Vec<impl TransportSink>>>, mut rx: impl TransportStream) { /// Connection handler for handling a connecting client
async fn handle(active_client_peers: Arc<Mutex<Vec<impl TransportSink>>>, mut rx: impl TransportStream, db_connection: impl ConnectionTrait) {
// for now we're creating a new user for each connection
let new_client_id = uuid::Uuid::new_v4();
let user = users::Model{
id: new_client_id,
username: new_client_id.to_string(),
domain: String::from("localhost"),
display_name: Some(new_client_id.to_string()),
description: None,
avatar: None,
created: chrono::offset::Utc::now(),
updated: None
};
user.clone().into_active_model().insert(&db_connection).await.expect("failed adding a new user for the connection");
// continuously waiting then receiving new messages
loop { loop {
match rx.pop().await { match rx.pop().await { //
Err(e) => { Err(e) => {
tracing::error!("error receiving from client: {e} -- stopping"); tracing::error!("error receiving from client: {e} -- stopping");
break; break;
}, },
Ok(None) => break, Ok(None) => break, // clean exit
Ok(Some(pkt)) => match pkt { Ok(Some(pkt)) => match pkt {
c2s::s::Packet::Chat(msg) => { c2s::s::Packet::Message(mut msg) => {
let mut to_remove = Vec::new(); msg.user_id = user.id;
for (i, tx) in state.lock().await.iter_mut().enumerate() { msg.chat_id = uuid::Uuid::nil(); // temporarily we have only one chat
if let Err(e) = tx.push(c2s::c::Packet::Chat(msg.clone())).await { if let Err(err) = msg.clone().into_active_model().insert(&db_connection).await {
tracing::error!("failed saving incoming message from a client to the database: {}", err);
}
let mut disconnected_peers_to_remove = Vec::new();
for (i, tx) in active_client_peers.lock().await.iter_mut().enumerate() {
if let Err(e) = tx.push(c2s::c::Packet::Message(msg.clone())).await {
tracing::error!("failed broadcasting: {e}"); tracing::error!("failed broadcasting: {e}");
to_remove.push(i); disconnected_peers_to_remove.push(i);
} }
} }
for i in to_remove.into_iter().rev() { for i in disconnected_peers_to_remove.into_iter().rev() {
state.lock().await.remove(i); active_client_peers.lock().await.remove(i);
} }
}, },
}, },