From 2287793cd9584cc2b693f21648c47241c81ad460 Mon Sep 17 00:00:00 2001 From: alemidev Date: Sat, 30 Jul 2022 03:02:38 +0200 Subject: [PATCH] feat: added tracing, added buffer_worker --- Cargo.toml | 2 + src/server/main.rs | 197 +++++++++++++++++++++++++++++---------------- 2 files changed, 130 insertions(+), 69 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 9b5c310..200d37d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,6 +16,8 @@ name = "codemp-client" path = "src/client/main.rs" [dependencies] +tracing = "0.1" +tracing-subscriber = "0.3" tonic = "0.7" prost = "0.10" futures = "0.3" diff --git a/src/server/main.rs b/src/server/main.rs index 84d6f51..a507690 100644 --- a/src/server/main.rs +++ b/src/server/main.rs @@ -1,6 +1,11 @@ -use std::{collections::HashMap, pin::Pin, sync::Arc}; +use std::collections::VecDeque; +use std::{pin::Pin, sync::Arc}; -use state::AlterState; +use tokio_stream::wrappers::ReceiverStream; +use tracing::{info, debug, warn, error}; + +use operational_transform::OperationSeq; +use state::{AlterState, StateManager}; use tonic::{transport::Server, Request, Response, Status}; pub mod proto { @@ -8,24 +13,71 @@ pub mod proto { tonic::include_proto!("buffer"); } -use tokio::sync::{mpsc, watch}; -use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt}; // TODO example used this? +use tokio::sync::{mpsc, broadcast}; +use tokio_stream::{Stream, StreamExt}; // TODO example used this? use proto::buffer_server::{Buffer, BufferServer}; use proto::workspace_server::{Workspace, WorkspaceServer}; use proto::{Operation, SessionRequest, SessionResponse}; use tonic::Streaming; +use workspace::BufferView; //use futures::{Stream, StreamExt}; -use crate::workspace::Workspace as WorkspaceInstance; // TODO fuck! +use crate::workspace::Workspace as WorkspaceInstance; // TODO fuck x2! pub mod state; pub mod workspace; type OperationStream = Pin> + Send>>; -pub struct BufferService {} +pub struct BufferService { + state: Arc, +} + +fn op_seq(o: &Operation) -> OperationSeq { todo!() } +fn op_net(o: &OperationSeq) -> Operation { todo!() } + +// async fn buffer_worker(tx: mpsc::Sender>, mut rx:Streaming, mut rx_core: mpsc::Receiver) { +async fn buffer_worker(bv: BufferView, mut client_rx: Streaming, tx_client:mpsc::Sender>, mut rx_core:broadcast::Receiver<(String, OperationSeq)>) { + let mut queue : VecDeque = VecDeque::new(); + loop { + tokio::select! { + client_op = client_rx.next() => { + if let Some(result) = client_op { + match result { + Ok(op) => { + bv.op(op_seq(&op)).await.unwrap(); // TODO make OpSeq from network Operation pkt! + queue.push_back(op); + }, + Err(status) => { + error!("error receiving op from client: {:?}", status); + break; + } + } + } + }, + + server_op = rx_core.recv() => { + if let Ok(oop) = server_op { + let mut send_op = true; + for (i, _op) in queue.iter().enumerate() { + if true { // TODO must compare underlying OperationSeq here! (op.equals(server_op)) + queue.remove(i); + send_op = false; + break; + } else { + // serv_op.transform(op); // TODO transform OpSeq ! + } + } + if send_op { + tx_client.send(Ok(op_net(&oop.1))).await.unwrap(); + } + } + } + } + } +} #[tonic::async_trait] impl Buffer for BufferService { @@ -36,61 +88,42 @@ impl Buffer for BufferService { &self, req: Request>, ) -> Result, Status> { - println!("EchoServer::bidirectional_streaming_echo"); + let session_id : String; + if let Some(sid) = req.metadata().get("session_id") { + session_id = sid.to_str().unwrap().to_string(); + } else { + return Err(Status::failed_precondition("Missing metadata key 'session_id'")); + } - let mut in_stream = req.into_inner(); - let (tx_og, rx) = mpsc::channel(128); + let path : String; + if let Some(p) = req.metadata().get("path") { + path = p.to_str().unwrap().to_string(); + } else { + return Err(Status::failed_precondition("Missing metadata key 'path'")); + } + // TODO make these above nicer? more concise? idk - // this spawn here is required if you want to handle connection error. - // If we just map `in_stream` and write it back as `out_stream` the `out_stream` - // will be drooped when connection error occurs and error will never be propagated - // to mapped version of `in_stream`. - let tx = tx_og.clone(); - tokio::spawn(async move { - while let Some(result) = in_stream.next().await { - match result { - Ok(v) => tx - .send(Ok(Operation { - action: 1, - row: 0, - column: 0, - op_id: 0, - text: None, - })) - .await - .expect("working rx"), - Err(err) => { - // if let Some(io_err) = match_for_io_error(&err) { - // if io_err.kind() == ErrorKind::BrokenPipe { - // // here you can handle special case when client - // // disconnected in unexpected way - // eprintln!("\tclient disconnected: broken pipe"); - // break; - // } - // } - eprintln!("Error receiving operation from client"); + if let Some(workspace) = self.state.workspaces.borrow().get(&session_id) { + let in_stream = req.into_inner(); + let (tx_og, rx) = mpsc::channel::>(128); - match tx.send(Err(err)).await { - Ok(_) => (), - Err(_err) => break, // response was droped - } - } - } - } - println!("\tstream ended"); - }); + let b: BufferView = workspace.buffers.borrow().get(&path).unwrap().clone(); + let w = workspace.clone(); + tokio::spawn(async move { buffer_worker(b, in_stream, tx_og, w.bus.subscribe()).await; }); - // echo just write the same data that was received - let out_stream = ReceiverStream::new(rx); + // echo just write the same data that was received + let out_stream = ReceiverStream::new(rx); - Ok(Response::new(Box::pin(out_stream) as Self::AttachStream)) + return Ok(Response::new(Box::pin(out_stream) as Self::AttachStream)); + } else { + return Err(Status::not_found(format!("Norkspace with session_id {}", session_id))); + } } } #[derive(Debug)] pub struct WorkspaceService { - tx: mpsc::Sender, - rx: watch::Receiver>>, + state: Arc, } #[tonic::async_trait] @@ -99,10 +132,10 @@ impl Workspace for WorkspaceService { &self, request: Request, ) -> Result, Status> { - println!("Got a request: {:?}", request); + debug!("create request: {:?}", request); let r = request.into_inner(); - // let w = WorkspaceInstance::new(r.session_key.clone(), r.content.unwrap_or("".to_string())); + let w = WorkspaceInstance::new(r.session_key.clone()); let reply = proto::SessionResponse { session_key: r.session_key.clone(), @@ -120,18 +153,22 @@ impl Workspace for WorkspaceService { &self, request: Request, ) -> Result, Status> { - println!("Got a request: {:?}", request); + debug!("sync request: {:?}", request); let r = request.into_inner(); - if let Some(w) = self.rx.borrow().get(&r.session_key) { - let reply = proto::SessionResponse { - session_key: r.session_key, - accepted: true, - content: Some(w.content.clone()), - hash: None, - }; + if let Some(w) = self.state.workspaces.borrow().get(&r.session_key) { + if let Some(buf) = w.buffers.borrow().get(&r.session_key) { + let reply = proto::SessionResponse { + session_key: r.session_key, + accepted: true, + content: Some(buf.content.borrow().clone()), + hash: None, + }; - Ok(Response::new(reply)) + Ok(Response::new(reply)) + } else { + Err(Status::out_of_range("fuck you".to_string())) + } } else { Err(Status::out_of_range("fuck you".to_string())) } @@ -142,7 +179,7 @@ impl Workspace for WorkspaceService { &self, request: Request, ) -> Result, Status> { - println!("Got a request: {:?}", request); + debug!("join request: {:?}", request); let reply = proto::SessionResponse { session_key: request.into_inner().session_key, @@ -158,12 +195,12 @@ impl Workspace for WorkspaceService { &self, request: Request, ) -> Result, Status> { - println!("Got a request: {:?}", request); + debug!("leave request: {:?}", request); let r = request.into_inner(); let mut removed = false; - if self.rx.borrow().get(&r.session_key).is_some() { - self.tx + if self.state.workspaces.borrow().get(&r.session_key).is_some() { + self.state.op_tx .send(AlterState::REMOVE { key: r.session_key.clone(), }) @@ -185,17 +222,39 @@ impl Workspace for WorkspaceService { #[tokio::main] async fn main() -> Result<(), Box> { + tracing_subscriber::fmt::init(); + let addr = "[::1]:50051".parse()?; - let (tx, rx) = state::run_state_manager(); - let greeter = WorkspaceService { tx, rx }; - let processor = BufferService {}; + let state = Arc::new(StateManager::new()); + + let greeter = WorkspaceService { state: state.clone() }; + let processor = BufferService { state: state.clone() }; + + info!("Starting server"); Server::builder() .add_service(WorkspaceServer::new(greeter)) .add_service(BufferServer::new(processor)) .serve(addr) .await?; +/* + +fn main() { + // install global collector configured based on RUST_LOG env var. + tracing_subscriber::fmt::init(); + + let number_of_yaks = 3; + // this creates a new event, outside of any spans. + info!(number_of_yaks, "preparing to shave yaks"); + + let number_shaved = yak_shave::shave_all(number_of_yaks); + info!( + all_yaks_shaved = number_shaved == number_of_yaks, + "yak shaving completed." + ); +} +*/ Ok(()) }