diff --git a/src/server/main.rs b/src/server/main.rs index a507690..8ad6054 100644 --- a/src/server/main.rs +++ b/src/server/main.rs @@ -1,224 +1,16 @@ -use std::collections::VecDeque; -use std::{pin::Pin, sync::Arc}; +pub mod actor; +pub mod service; -use tokio_stream::wrappers::ReceiverStream; -use tracing::{info, debug, warn, error}; +use std::sync::Arc; -use operational_transform::OperationSeq; -use state::{AlterState, StateManager}; -use tonic::{transport::Server, Request, Response, Status}; +use tracing::{debug, error, info, warn}; -pub mod proto { - tonic::include_proto!("workspace"); - tonic::include_proto!("buffer"); -} +use tonic::transport::Server; -use tokio::sync::{mpsc, broadcast}; -use tokio_stream::{Stream, StreamExt}; // TODO example used this? - -use proto::buffer_server::{Buffer, BufferServer}; -use proto::workspace_server::{Workspace, WorkspaceServer}; -use proto::{Operation, SessionRequest, SessionResponse}; - -use tonic::Streaming; -use workspace::BufferView; -//use futures::{Stream, StreamExt}; - -use crate::workspace::Workspace as WorkspaceInstance; // TODO fuck x2! - -pub mod state; -pub mod workspace; - -type OperationStream = Pin> + Send>>; - -pub struct BufferService { - state: Arc, -} - -fn op_seq(o: &Operation) -> OperationSeq { todo!() } -fn op_net(o: &OperationSeq) -> Operation { todo!() } - -// async fn buffer_worker(tx: mpsc::Sender>, mut rx:Streaming, mut rx_core: mpsc::Receiver) { -async fn buffer_worker(bv: BufferView, mut client_rx: Streaming, tx_client:mpsc::Sender>, mut rx_core:broadcast::Receiver<(String, OperationSeq)>) { - let mut queue : VecDeque = VecDeque::new(); - loop { - tokio::select! { - client_op = client_rx.next() => { - if let Some(result) = client_op { - match result { - Ok(op) => { - bv.op(op_seq(&op)).await.unwrap(); // TODO make OpSeq from network Operation pkt! - queue.push_back(op); - }, - Err(status) => { - error!("error receiving op from client: {:?}", status); - break; - } - } - } - }, - - server_op = rx_core.recv() => { - if let Ok(oop) = server_op { - let mut send_op = true; - for (i, _op) in queue.iter().enumerate() { - if true { // TODO must compare underlying OperationSeq here! (op.equals(server_op)) - queue.remove(i); - send_op = false; - break; - } else { - // serv_op.transform(op); // TODO transform OpSeq ! - } - } - if send_op { - tx_client.send(Ok(op_net(&oop.1))).await.unwrap(); - } - } - } - } - } -} - -#[tonic::async_trait] -impl Buffer for BufferService { - // type ServerStreamingEchoStream = ResponseStream; - type AttachStream = OperationStream; - - async fn attach( - &self, - req: Request>, - ) -> Result, Status> { - let session_id : String; - if let Some(sid) = req.metadata().get("session_id") { - session_id = sid.to_str().unwrap().to_string(); - } else { - return Err(Status::failed_precondition("Missing metadata key 'session_id'")); - } - - let path : String; - if let Some(p) = req.metadata().get("path") { - path = p.to_str().unwrap().to_string(); - } else { - return Err(Status::failed_precondition("Missing metadata key 'path'")); - } - // TODO make these above nicer? more concise? idk - - if let Some(workspace) = self.state.workspaces.borrow().get(&session_id) { - let in_stream = req.into_inner(); - let (tx_og, rx) = mpsc::channel::>(128); - - let b: BufferView = workspace.buffers.borrow().get(&path).unwrap().clone(); - let w = workspace.clone(); - tokio::spawn(async move { buffer_worker(b, in_stream, tx_og, w.bus.subscribe()).await; }); - - // echo just write the same data that was received - let out_stream = ReceiverStream::new(rx); - - return Ok(Response::new(Box::pin(out_stream) as Self::AttachStream)); - } else { - return Err(Status::not_found(format!("Norkspace with session_id {}", session_id))); - } - } -} - -#[derive(Debug)] -pub struct WorkspaceService { - state: Arc, -} - -#[tonic::async_trait] -impl Workspace for WorkspaceService { - async fn create( - &self, - request: Request, - ) -> Result, Status> { - debug!("create request: {:?}", request); - let r = request.into_inner(); - - let w = WorkspaceInstance::new(r.session_key.clone()); - - let reply = proto::SessionResponse { - session_key: r.session_key.clone(), - accepted: true, - content: None, // Some(w.content.clone()), - hash: None, - }; - - // self.tx.send(AlterState::ADD{key: r.session_key.clone(), w}).await.unwrap(); - - Ok(Response::new(reply)) - } - - async fn sync( - &self, - request: Request, - ) -> Result, Status> { - debug!("sync request: {:?}", request); - let r = request.into_inner(); - - if let Some(w) = self.state.workspaces.borrow().get(&r.session_key) { - if let Some(buf) = w.buffers.borrow().get(&r.session_key) { - let reply = proto::SessionResponse { - session_key: r.session_key, - accepted: true, - content: Some(buf.content.borrow().clone()), - hash: None, - }; - - Ok(Response::new(reply)) - } else { - Err(Status::out_of_range("fuck you".to_string())) - } - } else { - Err(Status::out_of_range("fuck you".to_string())) - } - } - - // TODO make it do something - async fn join( - &self, - request: Request, - ) -> Result, Status> { - debug!("join request: {:?}", request); - - let reply = proto::SessionResponse { - session_key: request.into_inner().session_key, - accepted: true, - content: None, - hash: None, - }; - - Ok(Response::new(reply)) - } - - async fn leave( - &self, - request: Request, - ) -> Result, Status> { - debug!("leave request: {:?}", request); - let r = request.into_inner(); - let mut removed = false; - - if self.state.workspaces.borrow().get(&r.session_key).is_some() { - self.state.op_tx - .send(AlterState::REMOVE { - key: r.session_key.clone(), - }) - .await - .unwrap(); - removed = true; // TODO this is a lie! Verify it - } - - let reply = proto::SessionResponse { - session_key: r.session_key, - accepted: removed, - content: None, - hash: None, - }; - - Ok(Response::new(reply)) - } -} +use crate::{ + actor::state::StateManager, + service::{buffer::BufferService, session::SessionService, workspace::WorkspaceService}, +}; #[tokio::main] async fn main() -> Result<(), Box> { @@ -228,33 +20,13 @@ async fn main() -> Result<(), Box> { let state = Arc::new(StateManager::new()); - let greeter = WorkspaceService { state: state.clone() }; - let processor = BufferService { state: state.clone() }; - info!("Starting server"); Server::builder() - .add_service(WorkspaceServer::new(greeter)) - .add_service(BufferServer::new(processor)) + .add_service(WorkspaceService::server(state.clone())) + .add_service(BufferService::server(state.clone())) .serve(addr) .await?; -/* - -fn main() { - // install global collector configured based on RUST_LOG env var. - tracing_subscriber::fmt::init(); - - let number_of_yaks = 3; - // this creates a new event, outside of any spans. - info!(number_of_yaks, "preparing to shave yaks"); - - let number_shaved = yak_shave::shave_all(number_of_yaks); - info!( - all_yaks_shaved = number_shaved == number_of_yaks, - "yak shaving completed." - ); -} -*/ Ok(()) } diff --git a/src/server/service/buffer.rs b/src/server/service/buffer.rs new file mode 100644 index 0000000..fb07ae4 --- /dev/null +++ b/src/server/service/buffer.rs @@ -0,0 +1,153 @@ +use std::collections::VecDeque; +use std::{pin::Pin, sync::Arc}; + +use tokio_stream::wrappers::ReceiverStream; +use tracing::{debug, error, info, warn}; + +use operational_transform::OperationSeq; +use tonic::{transport::Server, Request, Response, Status}; + +pub mod proto { + tonic::include_proto!("session"); + tonic::include_proto!("workspace"); + tonic::include_proto!("buffer"); +} + +use tokio::sync::{broadcast, mpsc}; +use tokio_stream::{Stream, StreamExt}; // TODO example used this? + +use proto::buffer_server::{Buffer, BufferServer}; +use proto::session_server::{Session, SessionServer}; +use proto::workspace_server::{Workspace, WorkspaceServer}; +use proto::{BufferList, Event, Operation, SessionRequest, SessionResponse, WorkspaceRequest}; + +use tonic::Streaming; +//use futures::{Stream, StreamExt}; + +use crate::actor::{buffer::BufferView, state::{AlterState, StateManager}, workspace::Workspace as WorkspaceInstance}; + +use self::proto::{BufferPayload, BufferResponse}; // TODO fuck x2! + +type OperationStream = Pin> + Send>>; + +pub struct BufferService { + state: Arc, +} + +fn op_seq(o: &Operation) -> OperationSeq { + todo!() +} +fn op_net(o: &OperationSeq) -> Operation { + todo!() +} + +// async fn buffer_worker(tx: mpsc::Sender>, mut rx:Streaming, mut rx_core: mpsc::Receiver) { +async fn buffer_worker( + bv: BufferView, + mut client_rx: Streaming, + tx_client: mpsc::Sender>, + mut rx_core: broadcast::Receiver<(String, OperationSeq)>, +) { + let mut queue: VecDeque = VecDeque::new(); + loop { + tokio::select! { + client_op = client_rx.next() => { + if let Some(result) = client_op { + match result { + Ok(op) => { + bv.op(op_seq(&op)).await.unwrap(); // TODO make OpSeq from network Operation pkt! + queue.push_back(op); + }, + Err(status) => { + error!("error receiving op from client: {:?}", status); + break; + } + } + } + }, + + server_op = rx_core.recv() => { + if let Ok(oop) = server_op { + let mut send_op = true; + for (i, _op) in queue.iter().enumerate() { + if true { // TODO must compare underlying OperationSeq here! (op.equals(server_op)) + queue.remove(i); + send_op = false; + break; + } else { + // serv_op.transform(op); // TODO transform OpSeq ! + } + } + if send_op { + tx_client.send(Ok(op_net(&oop.1))).await.unwrap(); + } + } + } + } + } +} + +#[tonic::async_trait] +impl Buffer for BufferService { + // type ServerStreamingEchoStream = ResponseStream; + type AttachStream = OperationStream; + + async fn attach( + &self, + req: Request>, + ) -> Result, Status> { + let session_id: String; + if let Some(sid) = req.metadata().get("session_id") { + session_id = sid.to_str().unwrap().to_string(); + } else { + return Err(Status::failed_precondition( + "Missing metadata key 'session_id'", + )); + } + + let path: String; + if let Some(p) = req.metadata().get("path") { + path = p.to_str().unwrap().to_string(); + } else { + return Err(Status::failed_precondition("Missing metadata key 'path'")); + } + // TODO make these above nicer? more concise? idk + + if let Some(workspace) = self.state.workspaces.borrow().get(&session_id) { + let in_stream = req.into_inner(); + let (tx_og, rx) = mpsc::channel::>(128); + + let b: BufferView = workspace.buffers.borrow().get(&path).unwrap().clone(); + let w = workspace.clone(); + tokio::spawn(async move { + buffer_worker(b, in_stream, tx_og, w.bus.subscribe()).await; + }); + + // echo just write the same data that was received + let out_stream = ReceiverStream::new(rx); + + return Ok(Response::new(Box::pin(out_stream) as Self::AttachStream)); + } else { + return Err(Status::not_found(format!( + "Norkspace with session_id {}", + session_id + ))); + } + } + + async fn push(&self, req:Request) -> Result, Status> { + todo!() + } + + async fn pull(&self, req:Request) -> Result, Status> { + todo!() + } + +} + +impl BufferService { + // TODO is this smart? Should I let main() instantiate servers? + pub fn server(state: Arc) -> BufferServer { + BufferServer::new(BufferService { state }) + } +} diff --git a/src/server/service/mod.rs b/src/server/service/mod.rs new file mode 100644 index 0000000..a43d4c0 --- /dev/null +++ b/src/server/service/mod.rs @@ -0,0 +1,3 @@ +pub mod buffer; +pub mod session; +pub mod workspace; diff --git a/src/server/service/session.rs b/src/server/service/session.rs new file mode 100644 index 0000000..ff5ae1c --- /dev/null +++ b/src/server/service/session.rs @@ -0,0 +1,88 @@ +pub mod proto { + tonic::include_proto!("session"); +} + +use std::sync::Arc; + +use tracing::{debug, error, info, warn}; + +use tokio_stream::{Stream, StreamExt}; // TODO example used this? + +use tonic::{transport::Server, Request, Response, Status}; + +use proto::session_server::{Session, SessionServer}; +use proto::{SessionRequest, SessionResponse}; + +use crate::actor::{ + buffer::BufferView, + state::{AlterState, StateManager}, + workspace::Workspace as WorkspaceInstance, // TODO fuck x2! +}; + +#[derive(Debug)] +pub struct SessionService { + state: Arc, +} + +#[tonic::async_trait] +impl Session for SessionService { + async fn create( + &self, + request: Request, + ) -> Result, Status> { + debug!("create request: {:?}", request); + let r = request.into_inner(); + + let w = WorkspaceInstance::new(r.session_key.clone()); + + let reply = proto::SessionResponse { + session_key: r.session_key.clone(), + accepted: true, + }; + + // self.tx.send(AlterState::ADD{key: r.session_key.clone(), w}).await.unwrap(); + + Ok(Response::new(reply)) + } + + async fn join( + &self, + request: Request, + ) -> Result, Status> { + debug!("join request: {:?}", request); + + let reply = proto::SessionResponse { + session_key: request.into_inner().session_key, + accepted: true, + }; + + Ok(Response::new(reply)) + } + + async fn leave( + &self, + request: Request, + ) -> Result, Status> { + debug!("leave request: {:?}", request); + let r = request.into_inner(); + let mut removed = false; + + if self.state.workspaces.borrow().get(&r.session_key).is_some() { + self.state + .op_tx + .send(AlterState::REMOVE { + key: r.session_key.clone(), + }) + .await + .unwrap(); + removed = true; // TODO this is a lie! Verify it + } + + let reply = proto::SessionResponse { + session_key: r.session_key, + accepted: removed, + }; + + Ok(Response::new(reply)) + } +} diff --git a/src/server/service/workspace.rs b/src/server/service/workspace.rs new file mode 100644 index 0000000..5bd1d17 --- /dev/null +++ b/src/server/service/workspace.rs @@ -0,0 +1,48 @@ +use std::{pin::Pin, sync::Arc}; + +// use tracing::{debug, error, info, warn}; + +use tonic::{Request, Response, Status}; + +pub mod proto { + tonic::include_proto!("workspace"); +} + +use tokio_stream::Stream; // TODO example used this? + +use proto::workspace_server::{Workspace, WorkspaceServer}; +use proto::{BufferList, Event, WorkspaceRequest}; + +use crate::actor::state::StateManager; + +type EventStream = Pin> + Send>>; + +#[derive(Debug)] +pub struct WorkspaceService { + state: Arc, +} + +#[tonic::async_trait] +impl Workspace for WorkspaceService { + type SubscribeStream = EventStream; + + async fn subscribe( + &self, + req: Request, + ) -> Result, Status> { + todo!() + } + + async fn buffers( + &self, + req: Request, + ) -> Result, Status> { + todo!() + } +} + +impl WorkspaceService { + pub fn server(state: Arc) -> WorkspaceServer { + WorkspaceServer::new(WorkspaceService { state }) + } +}