From ebbca24a99c8b0488981ffdddfa8101bca3669bd Mon Sep 17 00:00:00 2001 From: alemi Date: Fri, 7 Apr 2023 03:05:21 +0200 Subject: [PATCH] chore: dramatically simplified everything working on this was really hard, so i'm making simple things first. removed almost everything except bare buffer changes, and not even done in a smart way, but should be a working PoC? now trying to make a working client to test it out and actually work on a real prototype --- Cargo.toml | 40 +++-- build.rs | 2 - proto/buffer.proto | 29 ++-- proto/session.proto | 25 ---- proto/workspace.proto | 51 ------- src/client/cli/main.rs | 46 ++++++ src/client/dispatcher.rs | 96 ------------ src/client/main.rs | 16 -- src/client/nvim/main.rs | 91 +++++++++++ src/client/nvim/mod.rs | 133 ---------------- src/lib/events.rs | 100 ------------- src/lib/lib.rs | 6 +- src/lib/proto.rs | 1 + src/lib/user.rs | 27 ---- src/server/actor/buffer.rs | 67 --------- src/server/actor/mod.rs | 3 - src/server/actor/state.rs | 106 ------------- src/server/actor/workspace.rs | 228 ---------------------------- src/server/buffer/actor.rs | 83 ++++++++++ src/server/buffer/mod.rs | 2 + src/server/buffer/service.rs | 105 +++++++++++++ src/server/main.rs | 16 +- src/server/service/buffer.rs | 156 ------------------- src/server/service/mod.rs | 3 - src/server/service/session.rs | 59 -------- src/server/service/workspace.rs | 258 -------------------------------- 26 files changed, 371 insertions(+), 1378 deletions(-) delete mode 100644 proto/session.proto delete mode 100644 proto/workspace.proto create mode 100644 src/client/cli/main.rs delete mode 100644 src/client/dispatcher.rs delete mode 100644 src/client/main.rs create mode 100644 src/client/nvim/main.rs delete mode 100644 src/client/nvim/mod.rs delete mode 100644 src/lib/events.rs create mode 100644 src/lib/proto.rs delete mode 100644 src/lib/user.rs delete mode 100644 src/server/actor/buffer.rs delete mode 100644 src/server/actor/mod.rs delete mode 100644 src/server/actor/state.rs delete mode 100644 src/server/actor/workspace.rs create mode 100644 src/server/buffer/actor.rs create mode 100644 src/server/buffer/mod.rs create mode 100644 src/server/buffer/service.rs delete mode 100644 src/server/service/buffer.rs delete mode 100644 src/server/service/mod.rs delete mode 100644 src/server/service/session.rs delete mode 100644 src/server/service/workspace.rs diff --git a/Cargo.toml b/Cargo.toml index ba0fdc2..86b27e7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,9 +3,9 @@ name = "codemp" version = "0.1.0" edition = "2021" -[features] -default = ["nvim"] -nvim = [] +# [features] +# default = ["nvim"] +# nvim = [] [lib] name = "library" @@ -15,23 +15,35 @@ path = "src/lib/lib.rs" name = "server" path = "src/server/main.rs" -[[bin]] # Bin to run the CodeMP gRPC client -name = "client-neovim" -path = "src/client/main.rs" +[[bin]] +name = "client-cli" +path = "src/client/cli/main.rs" +required-features = ["cli"] + +[[bin]] +name = "client-nvim" +path = "src/client/nvim/main.rs" +required-features = ["nvim"] [dependencies] tracing = "0.1" tracing-subscriber = "0.3" -tonic = "0.7" -prost = "0.10" -futures = "0.3" +tonic = "0.9" tokio = { version = "1.0", features = ["macros", "rt-multi-thread", "sync", "full"] } tokio-stream = "0.1" rmpv = "1" -operational-transform = "0.6" -nvim-rs = { version = "0.4", features = ["use_tokio"] } # TODO put this behind a conditional feature -uuid = { version = "1", features = ["v4", "fast-rng", "macro-diagnostics"] } -rand = "0.8.5" +serde = "1" +serde_json = "1" +operational-transform = { version = "0.6", features = ["serde"] } +md5 = "0.7.0" +prost = "0.11.8" +clap = { version = "4.2.1", features = ["derive"], optional = true } +nvim-rs = { version = "0.5", features = ["use_tokio"], optional = true } [build-dependencies] -tonic-build = "0.7" +tonic-build = "0.9" + +[features] +default = [] +cli = ["dep:clap"] +nvim = ["dep:nvim-rs"] diff --git a/build.rs b/build.rs index 2b8ba6f..838368e 100644 --- a/build.rs +++ b/build.rs @@ -1,6 +1,4 @@ fn main() -> Result<(), Box> { - tonic_build::compile_protos("proto/session.proto")?; - tonic_build::compile_protos("proto/workspace.proto")?; tonic_build::compile_protos("proto/buffer.proto")?; Ok(()) } diff --git a/proto/buffer.proto b/proto/buffer.proto index 8e3b3a3..671b991 100644 --- a/proto/buffer.proto +++ b/proto/buffer.proto @@ -2,35 +2,26 @@ syntax = "proto3"; package buffer; service Buffer { - rpc Attach (stream Operation) returns (stream Operation); - rpc Push (BufferPayload) returns (BufferResponse); - rpc Pull (BufferPayload) returns (BufferPayload); + rpc Attach (BufferPayload) returns (stream RawOp); + rpc Edit (OperationRequest) returns (BufferResponse); + rpc Create (BufferPayload) returns (BufferResponse); } -message Operation { - int64 opId = 1; +message RawOp { + string opseq = 1; +} - enum Action { - RETAIN = 0; - INSERT = 1; - DELETE = 2; - }; - Action action = 2; - - int32 row = 3; - int32 column = 4; - - optional string text = 5; +message OperationRequest { + string path = 1; + string hash = 2; + string opseq = 3; } message BufferPayload { - string sessionKey = 1; string path = 2; optional string content = 3; } message BufferResponse { - string sessionKey = 1; - string path = 2; bool accepted = 3; } diff --git a/proto/session.proto b/proto/session.proto deleted file mode 100644 index 7273946..0000000 --- a/proto/session.proto +++ /dev/null @@ -1,25 +0,0 @@ -syntax = "proto3"; -package session; - -service Session { - // rpc Authenticate(SessionRequest) returns (SessionResponse); - // rpc ListWorkspaces(SessionRequest) returns (WorkspaceList); - rpc CreateWorkspace(WorkspaceBuilderRequest) returns (SessionResponse); -} - -message SessionRequest { - string sessionKey = 1; -} - -message SessionResponse { - string sessionKey = 1; - bool accepted = 2; -} - -message WorkspaceBuilderRequest { - string name = 1; -} - -message WorkspaceList { - repeated string name = 1; // TODO add more fields -} diff --git a/proto/workspace.proto b/proto/workspace.proto deleted file mode 100644 index ae17813..0000000 --- a/proto/workspace.proto +++ /dev/null @@ -1,51 +0,0 @@ -syntax = "proto3"; -package workspace; - -service Workspace { - rpc Join (JoinRequest) returns (stream WorkspaceEvent); - rpc Subscribe (stream CursorUpdate) returns (stream CursorUpdate); - rpc ListUsers (WorkspaceRequest) returns (UsersList); - rpc Buffers (WorkspaceRequest) returns (BufferList); - rpc NewBuffer (BufferRequest) returns (WorkspaceResponse); - rpc RemoveBuffer (BufferRequest) returns (WorkspaceResponse); -} - -message JoinRequest { - string name = 1; -} - -message WorkspaceEvent { - int32 id = 1; - optional string body = 2; -} - -// nvim-rs passes everything as i64, so having them as i64 in the packet itself is convenient -// TODO can we make them i32 and save some space? -message CursorUpdate { - string username = 1; - int64 buffer = 2; - int64 col = 3; - int64 row = 4; -} - -message WorkspaceRequest { - string sessionKey = 1; -} - -message BufferRequest { - string sessionKey = 1; - string path = 2; -} - -message WorkspaceResponse { - bool accepted = 1; -} - -message BufferList { - repeated string path = 1; -} - -message UsersList { - repeated string name = 1; -} - diff --git a/src/client/cli/main.rs b/src/client/cli/main.rs new file mode 100644 index 0000000..8bf722d --- /dev/null +++ b/src/client/cli/main.rs @@ -0,0 +1,46 @@ +use clap::Parser; +use library::proto::{buffer_client::BufferClient, BufferPayload}; +use tokio_stream::StreamExt; + +#[derive(Parser, Debug)] +struct CliArgs { + /// path of buffer to create + path: String, + + /// initial content for buffer + #[arg(short, long)] + content: Option, + + /// attach instead of creating a new buffer + #[arg(long, default_value_t = false)] + attach: bool, + + /// host to connect to + #[arg(long, default_value = "http://[::1]:50051")] + host: String, +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + let args = CliArgs::parse(); + + let mut client = BufferClient::connect(args.host).await?; + + let request = BufferPayload { + path: args.path, + content: args.content, + }; + + if !args.attach { + client.create(request.clone()).await.unwrap(); + } + + let mut stream = client.attach(request).await.unwrap().into_inner(); + + while let Some(item) = stream.next().await { + println!("> {:?}", item); + } + + Ok(()) +} + diff --git a/src/client/dispatcher.rs b/src/client/dispatcher.rs deleted file mode 100644 index a56aa73..0000000 --- a/src/client/dispatcher.rs +++ /dev/null @@ -1,96 +0,0 @@ -pub mod proto { - tonic::include_proto!("session"); - tonic::include_proto!("workspace"); - tonic::include_proto!("buffer"); -} -use std::sync::Arc; -use tracing::error; - -use tokio::sync::{mpsc, Mutex}; -use tokio_stream::StreamExt; -use tokio_stream::wrappers::ReceiverStream; - -use proto::{ - workspace_client::WorkspaceClient, - session_client::SessionClient, - buffer_client::BufferClient, - WorkspaceBuilderRequest, JoinRequest, SessionResponse, CursorUpdate -}; -use tonic::{transport::Channel, Status, Request, Response}; - -#[derive(Clone)] -pub struct Dispatcher { - name: String, - dp: Arc>, // TODO use channels and don't lock -} - -struct DispatcherWorker { - // TODO do I need all three? Did I design the server badly? - session: SessionClient, - workspace: WorkspaceClient, - _buffers: BufferClient, -} - -impl Dispatcher { - pub async fn connect(addr:String) -> Result { - let (s, w, b) = tokio::join!( - SessionClient::connect(addr.clone()), - WorkspaceClient::connect(addr.clone()), - BufferClient::connect(addr.clone()), - ); - Ok( - Dispatcher { - name: format!("User#{}", rand::random::()), - dp: Arc::new( - Mutex::new( - DispatcherWorker { session: s?, workspace: w?, _buffers: b? } - ) - ) - } - ) - } - - pub async fn create_workspace(&self, name:String) -> Result, Status> { - self.dp.lock().await.session.create_workspace( - Request::new(WorkspaceBuilderRequest { name }) - ).await - } - - pub async fn join_workspace(&self, session_id:String) -> Result<(), Status> { - let mut req = Request::new(JoinRequest { name: self.name.clone() }); - req.metadata_mut().append("workspace", session_id.parse().unwrap()); - let mut stream = self.dp.lock().await.workspace.join(req).await?.into_inner(); - - let _worker = tokio::spawn(async move { - while let Some(pkt) = stream.next().await { - match pkt { - Ok(_event) => { - // TODO do something with events when they will mean something! - }, - Err(e) => error!("Error receiving event | {}", e), - } - } - }); - - Ok(()) - } - - pub async fn start_cursor_worker(&self, session_id:String, feed:mpsc::Receiver) -> Result, Status> { - let mut in_stream = Request::new(ReceiverStream::new(feed)); - in_stream.metadata_mut().append("workspace", session_id.parse().unwrap()); - - let mut stream = self.dp.lock().await.workspace.subscribe(in_stream).await?.into_inner(); - let (tx, rx) = mpsc::channel(50); - - let _worker = tokio::spawn(async move { - while let Some(pkt) = stream.next().await { - match pkt { - Ok(update) => tx.send(update).await.unwrap(), // TODO how to handle an error here? - Err(e) => error!("Error receiving cursor update | {}", e), - } - } - }); - - Ok(rx) - } -} diff --git a/src/client/main.rs b/src/client/main.rs deleted file mode 100644 index 37eae85..0000000 --- a/src/client/main.rs +++ /dev/null @@ -1,16 +0,0 @@ - -mod nvim; -pub mod dispatcher; - -use dispatcher::Dispatcher; - -#[tokio::main] -async fn main() -> Result<(), Box<(dyn std::error::Error + 'static)>> { - - let dispatcher = Dispatcher::connect("http://[::1]:50051".into()).await.unwrap(); - - #[cfg(feature = "nvim")] - crate::nvim::run_nvim_client(dispatcher).await?; - - Ok(()) -} diff --git a/src/client/nvim/main.rs b/src/client/nvim/main.rs new file mode 100644 index 0000000..f71ec8b --- /dev/null +++ b/src/client/nvim/main.rs @@ -0,0 +1,91 @@ +//! A basic example. Mainly for use in a test, but also shows off some basic +//! functionality. +use std::{env, error::Error, fs}; + +use rmpv::Value; + +use tokio::io::Stdout; + +use nvim_rs::{ + compat::tokio::Compat, create::tokio as create, rpc::IntoVal, Handler, Neovim, +}; +use tonic::async_trait; + +#[derive(Clone)] +struct NeovimHandler { + +} + +#[async_trait] +impl Handler for NeovimHandler { + type Writer = Compat; + + async fn handle_request( + &self, + name: String, + args: Vec, + nvim: Neovim>, + ) -> Result { + match name.as_ref() { + "ping" => Ok(Value::from("pong")), + _ => unimplemented!(), + } + } + + async fn handle_notify( + &self, + name: String, + args: Vec, + nvim: Neovim>, + ) { + } +} + +#[tokio::main] +async fn main() { + let handler: NeovimHandler = NeovimHandler {}; + let (nvim, io_handler) = create::new_parent(handler).await; + let curbuf = nvim.get_current_buf().await.unwrap(); + + let mut envargs = env::args(); + let _ = envargs.next(); + let testfile = envargs.next().unwrap(); + + fs::write(testfile, &format!("{:?}", curbuf.into_val())).unwrap(); + + // Any error should probably be logged, as stderr is not visible to users. + match io_handler.await { + Err(joinerr) => eprintln!("Error joining IO loop: '{}'", joinerr), + Ok(Err(err)) => { + if !err.is_reader_error() { + // One last try, since there wasn't an error with writing to the + // stream + nvim + .err_writeln(&format!("Error: '{}'", err)) + .await + .unwrap_or_else(|e| { + // We could inspect this error to see what was happening, and + // maybe retry, but at this point it's probably best + // to assume the worst and print a friendly and + // supportive message to our users + eprintln!("Well, dang... '{}'", e); + }); + } + + if !err.is_channel_closed() { + // Closed channel usually means neovim quit itself, or this plugin was + // told to quit by closing the channel, so it's not always an error + // condition. + eprintln!("Error: '{}'", err); + + let mut source = err.source(); + + while let Some(e) = source { + eprintln!("Caused by: '{}'", e); + source = e.source(); + } + } + } + Ok(Ok(())) => {} + } +} diff --git a/src/client/nvim/mod.rs b/src/client/nvim/mod.rs deleted file mode 100644 index 5db4a62..0000000 --- a/src/client/nvim/mod.rs +++ /dev/null @@ -1,133 +0,0 @@ -use std::sync::Arc; - -use rmpv::Value; - -use tokio::io::Stdout; - -use nvim_rs::{compat::tokio::Compat, Handler, Neovim}; -use nvim_rs::create::tokio::new_parent; -use tokio::sync::{mpsc, Mutex}; - -use crate::dispatcher::{Dispatcher, proto::CursorUpdate}; - -#[derive(Clone)] -pub struct NeovimHandler { - dispatcher: Dispatcher, - sink: Arc>>>, -} - -impl NeovimHandler { - pub fn new(dispatcher: Dispatcher) -> Self { - NeovimHandler { - dispatcher, - sink: Arc::new(Mutex::new(None)), - } - } -} - -#[tonic::async_trait] -impl Handler for NeovimHandler { - type Writer = Compat; - - async fn handle_request( - &self, - name: String, - args: Vec, - neovim: Neovim>, - ) -> Result { - match name.as_ref() { - "ping" => Ok(Value::from("pong")), - "create" => { - if args.len() < 1 { - return Err(Value::from("[!] no session key")); - } - let res = self.dispatcher.create_workspace(args[0].to_string()) - .await - .map_err(|e| Value::from(e.to_string()))? - .into_inner(); - - Ok(res.session_key.into()) - }, - "join" => { - if args.len() < 1 { - return Err(Value::from("[!] no session key")); - } - - self.dispatcher.join_workspace( - args[0].as_str().unwrap().to_string(), // TODO throw err if it's not a string? - ).await.map_err(|e| Value::from(e.to_string()))?; - - Ok("OK".into()) - }, - "cursor-start" => { - if args.len() < 1 { - return Err(Value::from("[!] no session key")); - } - let (tx, stream) = mpsc::channel(50); - let mut rx = self.dispatcher.start_cursor_worker( - args[0].as_str().unwrap().to_string(), stream - ).await.map_err(|e| Value::from(e.to_string()))?; - let sink = self.sink.clone(); - sink.lock().await.replace(tx); - let _worker = tokio::spawn(async move { - let mut col : i64; - let mut row : i64; - let ns = neovim.create_namespace("Cursor").await.unwrap(); - while let Some(update) = rx.recv().await { - neovim.exec_lua(format!("print('{:?}')", update).as_str(), vec![]).await.unwrap(); - let buf = neovim.get_current_buf().await.unwrap(); - buf.clear_namespace(ns, 0, -1).await.unwrap(); - row = update.row as i64; - col = update.col as i64; - buf.add_highlight(ns, "ErrorMsg", row-1, col-1, col).await.unwrap(); - } - sink.lock().await.take(); - }); - Ok("OK".into()) - }, - _ => { - eprintln!("[!] unexpected call"); - Ok(Value::from("")) - }, - } - } - - async fn handle_notify( - &self, - name: String, - args: Vec, - _neovim: Neovim>, - ) { - match name.as_ref() { - "insert" => {}, - "cursor" => { - if args.len() >= 3 { - if let Some(sink) = self.sink.lock().await.as_ref() { - sink.send(CursorUpdate { - buffer: args[0].as_i64().unwrap(), - row: args[1].as_i64().unwrap(), - col: args[2].as_i64().unwrap(), - username: "root".into() - }).await.unwrap(); - } - } - }, - "tick" => eprintln!("tock"), - _ => eprintln!("[!] unexpected notify",) - } - } -} - -pub async fn run_nvim_client(dispatcher: Dispatcher) -> Result<(), Box> { - let handler: NeovimHandler = NeovimHandler::new(dispatcher); - let (_nvim, io_handler) = new_parent(handler).await; - - // Any error should probably be logged, as stderr is not visible to users. - match io_handler.await { - Err(err) => eprintln!("Error joining IO loop: {:?}", err), - Ok(Err(err)) => eprintln!("Process ended with error: {:?}", err), - Ok(Ok(())) => eprintln!("Finished"), - } - - Ok(()) -} diff --git a/src/lib/events.rs b/src/lib/events.rs deleted file mode 100644 index 2d01ad8..0000000 --- a/src/lib/events.rs +++ /dev/null @@ -1,100 +0,0 @@ -use std::fmt::Display; -use crate::user::User; - -#[derive(Debug, Clone)] -pub enum Event { - UserJoin { user: User }, - UserLeave { name: String }, - BufferNew { path: String }, - BufferDelete { path: String }, -} - -impl Display for Event { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - Self::UserJoin { user } => write!(f, "UserJoin(user:{})", user), - Self::UserLeave { name } => write!(f, "UserLeave(user:{})", name), - Self::BufferNew { path } => write!(f, "BufferNew(path:{})", path), - Self::BufferDelete { path } => write!(f, "BufferDelete(path:{})", path), - } - } -} - -// pub type Event = Box; -// -// pub trait EventInterface { -// fn class(&self) -> EventClass; -// fn unwrap(e: Event) -> Option where Self: Sized; -// -// fn wrap(self) -> Event { -// Box::new(self) -// } -// } -// -// -// // User joining workspace -// -// pub struct UserJoinEvent { -// user: User, -// } -// -// impl EventInterface for UserJoinEvent { -// fn class(&self) -> EventClass { EventClass::UserJoin } -// fn unwrap(e: Event) -> Option where Self: Sized { -// if matches!(e.class(), EventClass::UserJoin) { -// return Some(*e); -// } -// None -// } -// } -// -// -// // User leaving workspace -// -// pub struct UserLeaveEvent { -// name: String, -// } -// -// impl EventInterface for UserLeaveEvent { -// fn class(&self) -> EventClass { EventClass::UserLeave } -// } -// -// -// // Cursor movement -// -// pub struct CursorEvent { -// user: String, -// cursor: UserCursor, -// } -// -// impl EventInterface for CursorEvent { -// fn class(&self) -> EventClass { EventClass::Cursor } -// } -// -// impl CursorEvent { -// pub fn new(user:String, cursor: UserCursor) -> Self { -// CursorEvent { user, cursor } -// } -// } -// -// -// // Buffer added -// -// pub struct BufferNewEvent { -// path: String, -// } -// -// impl EventInterface for BufferNewEvent { -// fn class(&self) -> EventClass { EventClass::BufferNew } -// } -// -// -// // Buffer deleted -// -// pub struct BufferDeleteEvent { -// path: String, -// } -// -// impl EventInterface for BufferDeleteEvent { -// fn class(&self) -> EventClass { EventClass::BufferDelete } -// } diff --git a/src/lib/lib.rs b/src/lib/lib.rs index 84e0b84..b7446bb 100644 --- a/src/lib/lib.rs +++ b/src/lib/lib.rs @@ -1,2 +1,4 @@ -pub mod events; -pub mod user; +pub mod proto; + +pub use tonic; +pub use tokio; diff --git a/src/lib/proto.rs b/src/lib/proto.rs new file mode 100644 index 0000000..4e168ef --- /dev/null +++ b/src/lib/proto.rs @@ -0,0 +1 @@ +tonic::include_proto!("buffer"); diff --git a/src/lib/user.rs b/src/lib/user.rs deleted file mode 100644 index 0e80f53..0000000 --- a/src/lib/user.rs +++ /dev/null @@ -1,27 +0,0 @@ -use std::fmt::Display; - -#[derive(Debug, Clone)] -pub struct UserCursor{ - pub buffer: i64, - pub x: i64, - pub y: i64 -} - -impl Display for UserCursor { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "Cursor(buffer:{}, x:{}, y:{})", self.buffer, self.x, self.y) - } -} - - -#[derive(Debug, Clone)] -pub struct User { - pub name: String, - pub cursor: UserCursor, -} - -impl Display for User { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "User(name:{}, cursor:{})", self.name, self.cursor) - } -} diff --git a/src/server/actor/buffer.rs b/src/server/actor/buffer.rs deleted file mode 100644 index 9e28b6d..0000000 --- a/src/server/actor/buffer.rs +++ /dev/null @@ -1,67 +0,0 @@ -use operational_transform::OperationSeq; -use tokio::sync::{broadcast, mpsc, watch}; -use tracing::error; - -use library::events::Event; - -#[derive(Debug, Clone)] -/// A view of a buffer, with references to access value and send operations -pub struct BufferView { - pub name: String, - pub content: watch::Receiver, - op_tx: mpsc::Sender, -} - -impl BufferView { - pub async fn op(&self, op: OperationSeq) -> Result<(), mpsc::error::SendError> { - self.op_tx.send(op).await - } -} - -#[derive(Debug)] -pub struct Buffer { - view: BufferView, - run: watch::Sender, -} - -impl Drop for Buffer { - fn drop(&mut self) { - self.run.send(false).unwrap_or_else(|e| { - error!("Could not stop Buffer worker task: {:?}", e); - }); - } -} - -impl Buffer { - pub fn new(name: String, _bus: broadcast::Sender) -> Self { - let (op_tx, mut op_rx) = mpsc::channel(32); - let (stop_tx, stop_rx) = watch::channel(true); - let (content_tx, content_rx) = watch::channel(String::new()); - - let b = Buffer { - run: stop_tx, - view: BufferView { - name: name.clone(), - op_tx, - content: content_rx, - }, - }; - - tokio::spawn(async move { - let mut content = String::new(); - while stop_rx.borrow().to_owned() { - // TODO handle these errors!! - let op = op_rx.recv().await.unwrap(); - content = op.apply(content.as_str()).unwrap(); - // bus.send((name.clone(), op)).unwrap(); // TODO fails when there are no receivers subscribed - content_tx.send(content.clone()).unwrap(); - } - }); - - return b; - } - - pub fn view(&self) -> BufferView { - return self.view.clone(); - } -} diff --git a/src/server/actor/mod.rs b/src/server/actor/mod.rs deleted file mode 100644 index 42110ea..0000000 --- a/src/server/actor/mod.rs +++ /dev/null @@ -1,3 +0,0 @@ -pub mod buffer; -pub mod workspace; -pub mod state; diff --git a/src/server/actor/state.rs b/src/server/actor/state.rs deleted file mode 100644 index a4b8dae..0000000 --- a/src/server/actor/state.rs +++ /dev/null @@ -1,106 +0,0 @@ - -use std::{collections::HashMap, sync::Arc}; -use tokio::sync::{mpsc, watch}; -use tracing::error; -use uuid::Uuid; - -use crate::actor::workspace::Workspace; - - -#[derive(Debug)] -enum WorkspaceAction { - ADD { - key: Uuid, - w: Box, - }, - REMOVE { - key: Uuid - }, -} - -#[derive(Debug, Clone)] -pub struct WorkspacesView { - watch: watch::Receiver>>, - op: mpsc::Sender, -} - -impl WorkspacesView { - pub fn borrow(&self) -> watch::Ref>> { - self.watch.borrow() - } - - pub async fn add(&mut self, w: Workspace) { - self.op.send(WorkspaceAction::ADD { key: w.id, w: Box::new(w) }).await.unwrap(); - } - - pub async fn remove(&mut self, key: Uuid) { - self.op.send(WorkspaceAction::REMOVE { key }).await.unwrap(); - } -} - -#[derive(Debug)] -pub struct StateManager { - pub workspaces: WorkspacesView, - pub run: watch::Receiver, - run_tx: watch::Sender, -} - -impl Drop for StateManager { - fn drop(&mut self) { - self.run_tx.send(false).unwrap_or_else(|e| { - error!("Could not stop StateManager worker: {:?}", e); - }) - } -} - -impl StateManager { - pub fn new() -> Self { - let (tx, rx) = mpsc::channel(32); // TODO quantify backpressure - let (workspaces_tx, workspaces_rx) = watch::channel(HashMap::new()); - let (run_tx, run_rx) = watch::channel(true); - - let s = StateManager { - workspaces: WorkspacesView { watch: workspaces_rx, op: tx }, - run_tx, run: run_rx, - }; - - s.workspaces_worker(rx, workspaces_tx); - - return s; - } - - fn workspaces_worker(&self, mut rx: mpsc::Receiver, tx: watch::Sender>>) { - let run = self.run.clone(); - tokio::spawn(async move { - let mut store = HashMap::new(); - - while run.borrow().to_owned() { - if let Some(event) = rx.recv().await { - match event { - WorkspaceAction::ADD { key, w } => { - store.insert(key, Arc::new(*w)); // TODO put in hashmap - }, - WorkspaceAction::REMOVE { key } => { - store.remove(&key); - }, - } - tx.send(store.clone()).unwrap(); - } else { - break - } - } - }); - } - - pub fn view(&self) -> WorkspacesView { - return self.workspaces.clone(); - } - - /// get a workspace Arc directly, without passing by the WorkspacesView - pub fn get(&self, key: &Uuid) -> Option> { - if let Some(w) = self.workspaces.borrow().get(key) { - return Some(w.clone()); - } - return None; - } -} diff --git a/src/server/actor/workspace.rs b/src/server/actor/workspace.rs deleted file mode 100644 index df5a0a8..0000000 --- a/src/server/actor/workspace.rs +++ /dev/null @@ -1,228 +0,0 @@ -use std::collections::HashMap; - -use tokio::sync::{broadcast, mpsc, watch::{self, Ref}}; -use tracing::{warn, info}; - -use library::{events::Event, user::{User, UserCursor}}; - -use crate::service::workspace::proto::CursorUpdate; - -use super::buffer::{BufferView, Buffer}; - -#[derive(Debug, Clone)] -pub struct UsersView { - watch: watch::Receiver>, - op: mpsc::Sender, -} - -impl UsersView { // TODO don't unwrap everything! - pub fn borrow(&self) -> Ref> { - return self.watch.borrow(); - } - - pub async fn add(&mut self, user: User) { - self.op.send(UserAction::ADD{ user }).await.unwrap(); - } - - pub async fn remove(&mut self, name: String) { - self.op.send(UserAction::REMOVE{ name }).await.unwrap(); - } - - pub async fn update(&mut self, user_name: String, cursor: UserCursor) { - self.op.send(UserAction::CURSOR { name: user_name, cursor }).await.unwrap(); - } -} - -#[derive(Debug, Clone)] -pub struct BuffersTreeView { - watch: watch::Receiver>, - op: mpsc::Sender, -} - -impl BuffersTreeView { - pub fn borrow(&self) -> Ref> { - return self.watch.borrow(); - } - - pub async fn add(&mut self, buffer: Buffer) { - self.op.send(BufferAction::ADD { buffer }).await.unwrap(); - } - - pub async fn remove(&mut self, path: String) { - self.op.send(BufferAction::REMOVE { path }).await.unwrap(); - } -} - -pub struct WorkspaceView { - rx: broadcast::Receiver, - pub users: UsersView, - pub buffers: BuffersTreeView, -} - -impl WorkspaceView { - pub async fn event(&mut self) -> Result { - self.rx.recv().await - } -} - -// Must be clonable, containing references to the actual state maybe? Or maybe give everyone an Arc, idk -#[derive(Debug)] -pub struct Workspace { - pub id: uuid::Uuid, - pub name: String, - pub bus: broadcast::Sender, - pub cursors: broadcast::Sender, - - pub buffers: BuffersTreeView, - pub users: UsersView, - - run_tx: watch::Sender, - run_rx: watch::Receiver, -} - -impl Drop for Workspace { - fn drop(&mut self) { - self.run_tx.send(false).unwrap_or_else(|e| warn!("could not stop workspace worker: {:?}", e)); - } -} - -impl Workspace { - pub fn new(name: String) -> Self { - let (op_buf_tx, op_buf_rx) = mpsc::channel::(32); - let (op_usr_tx, op_usr_rx) = mpsc::channel::(32); - let (run_tx, run_rx) = watch::channel::(true); - let (buffer_tx, buffer_rx) = watch::channel::>(HashMap::new()); - let (users_tx, users_rx) = watch::channel(HashMap::new()); - let (broadcast_tx, _broadcast_rx) = broadcast::channel::(32); - let (cursors_tx, _cursors_rx) = broadcast::channel::(32); - - let w = Workspace { - id: uuid::Uuid::new_v4(), - name, - bus: broadcast_tx, - cursors: cursors_tx, - buffers: BuffersTreeView{ op: op_buf_tx, watch: buffer_rx }, - users: UsersView{ op: op_usr_tx, watch: users_rx }, - run_tx, - run_rx, - }; - - w.users_worker(op_usr_rx, users_tx); // spawn worker to handle users - w.buffers_worker(op_buf_rx, buffer_tx); // spawn worker to handle buffers - // - info!("new workspace created: {}[{}]", w.name, w.id); - - return w; - } - - fn buffers_worker(&self, mut rx: mpsc::Receiver, tx: watch::Sender>) { - let bus = self.bus.clone(); - let run = self.run_rx.clone(); - tokio::spawn(async move { - let mut buffers : HashMap = HashMap::new(); - - while run.borrow().to_owned() { - // TODO handle these errors!! - let action = rx.recv().await.unwrap(); - match action { - BufferAction::ADD { buffer } => { - let view = buffer.view(); - buffers.insert(view.name.clone(), buffer); - bus.send(Event::BufferNew { path: view.name }).unwrap(); - } - BufferAction::REMOVE { path } => { - buffers.remove(&path); - bus.send(Event::BufferDelete { path }).unwrap(); - } - } - tx.send( - buffers.iter() - .map(|(k, v)| (k.clone(), v.view())) - .collect() - ).unwrap(); - } - }); - } - - fn users_worker(&self, mut rx: mpsc::Receiver, tx: watch::Sender>) { - let bus = self.bus.clone(); - let cursors_tx = self.cursors.clone(); - let run = self.run_rx.clone(); - tokio::spawn(async move { - let mut cursors_rx = cursors_tx.subscribe(); - let mut users : HashMap = HashMap::new(); - - while run.borrow().to_owned() { - tokio::select!{ - action = rx.recv() => { - match action.unwrap() { - UserAction::ADD { user } => { - users.insert(user.name.clone(), user.clone()); - bus.send(Event::UserJoin { user }).unwrap(); - }, - UserAction::REMOVE { name } => { - if let None = users.remove(&name) { - continue; // don't update channel since this was a no-op - } else { - bus.send(Event::UserLeave { name }).unwrap(); - } - }, - UserAction::CURSOR { name, cursor } => { - if let Some(user) = users.get_mut(&name) { - user.cursor = cursor.clone(); - } else { - continue; // don't update channel since this was a no-op - } - }, - }; - }, - cursor = cursors_rx.recv() => { - let cursor = cursor.unwrap(); - if let Some(user) = users.get_mut(&cursor.username) { - user.cursor = UserCursor { buffer: cursor.buffer, x:cursor.col, y:cursor.row }; - } - } - } - - tx.send( - users.iter() - .map(|(k, u)| (k.clone(), u.clone())) - .collect() - ).unwrap(); - } - }); - } - - pub fn view(&self) -> WorkspaceView { - WorkspaceView { - rx: self.bus.subscribe(), - users: self.users.clone(), - buffers: self.buffers.clone(), - } - } -} - -#[derive(Debug)] -enum UserAction { - ADD { - user: User, - }, - REMOVE { - name: String, - }, - CURSOR { - name: String, - cursor: UserCursor, - }, -} - -#[derive(Debug)] -enum BufferAction { - ADD { - buffer: Buffer, - }, - REMOVE { - path: String, // TODO remove by id? - }, -} - diff --git a/src/server/buffer/actor.rs b/src/server/buffer/actor.rs new file mode 100644 index 0000000..d5b1d26 --- /dev/null +++ b/src/server/buffer/actor.rs @@ -0,0 +1,83 @@ +use tokio::sync::{mpsc, broadcast, watch}; +use tracing::error; +use md5::Digest; + +use operational_transform::OperationSeq; + +pub trait BufferStore { + fn get(&self, key: &T) -> Option<&BufferHandle>; + fn put(&mut self, key: T, handle: BufferHandle) -> Option; + + fn handle(&mut self, key: T, content: Option) { + let handle = BufferHandle::new(content); + self.put(key, handle); + } +} + +#[derive(Clone)] +pub struct BufferHandle { + pub edit: mpsc::Sender, + events: broadcast::Sender, + pub digest: watch::Receiver, +} + +impl BufferHandle { + fn new(init: Option) -> Self { + let init_val = init.unwrap_or("".into()); + let (edits_tx, edits_rx) = mpsc::channel(64); // TODO hardcoded size + let (events_tx, _events_rx) = broadcast::channel(64); // TODO hardcoded size + let (digest_tx, digest_rx) = watch::channel(md5::compute(&init_val)); + + let events_tx_clone = events_tx.clone(); + + tokio::spawn(async move { + let worker = BufferWorker { + content: init_val, + edits: edits_rx, + events: events_tx_clone, + digest: digest_tx, + }; + worker.work().await + }); + + BufferHandle { + edit: edits_tx, + events: events_tx, + digest: digest_rx, + } + } + + pub fn subscribe(&self) -> broadcast::Receiver { + self.events.subscribe() + + } +} + +struct BufferWorker { + content: String, + edits: mpsc::Receiver, + events: broadcast::Sender, + digest: watch::Sender, +} + +impl BufferWorker { + async fn work(mut self) { + loop { + match self.edits.recv().await { + None => break, + Some(v) => { + match v.apply(&self.content) { + Ok(res) => { + self.content = res; + self.digest.send(md5::compute(&self.content)).unwrap(); + if let Err(e) = self.events.send(v) { + error!("could not broadcast OpSeq: {}", e); + } + }, + Err(e) => error!("coult not apply OpSeq '{:?}' on '{}' : {}", v, self.content, e), + } + }, + } + } + } +} diff --git a/src/server/buffer/mod.rs b/src/server/buffer/mod.rs new file mode 100644 index 0000000..5c38de9 --- /dev/null +++ b/src/server/buffer/mod.rs @@ -0,0 +1,2 @@ +pub mod actor; +pub mod service; diff --git a/src/server/buffer/service.rs b/src/server/buffer/service.rs new file mode 100644 index 0000000..4e8e9cb --- /dev/null +++ b/src/server/buffer/service.rs @@ -0,0 +1,105 @@ +use std::{pin::Pin, sync::{Arc, RwLock}, collections::HashMap}; + +use operational_transform::OperationSeq; +use tokio::sync::mpsc; +use tonic::{Request, Response, Status}; + +use tokio_stream::{Stream, wrappers::ReceiverStream}; // TODO example used this? + +use library::proto::{buffer_server::{Buffer, BufferServer}, RawOp, BufferPayload, BufferResponse, OperationRequest}; +use tracing::info; + +use super::actor::{BufferHandle, BufferStore}; + +type OperationStream = Pin> + Send>>; + +struct BufferMap { + store: HashMap, +} + +impl From::> for BufferMap { + fn from(value: HashMap) -> Self { + BufferMap { store: value } + } +} + +impl BufferStore for BufferMap { + fn get(&self, key: &String) -> Option<&BufferHandle> { + self.store.get(key) + } + fn put(&mut self, key: String, handle: BufferHandle) -> Option { + self.store.insert(key, handle) + } +} + +pub struct BufferService { + map: Arc>, +} + +#[tonic::async_trait] +impl Buffer for BufferService { + type AttachStream = OperationStream; + + async fn attach(&self, req: Request) -> Result, Status> { + let request = req.into_inner(); + match self.map.read().unwrap().get(&request.path) { + Some(handle) => { + let (tx, rx) = mpsc::channel(128); + let mut sub = handle.subscribe(); + tokio::spawn(async move { + loop { + match sub.recv().await { + Ok(v) => { + let snd = RawOp { opseq: serde_json::to_string(&v).unwrap() }; + tx.send(Ok(snd)).await.unwrap(); + } + Err(_e) => break, + } + } + }); + let output_stream = ReceiverStream::new(rx); + info!("registered new subscriber on buffer"); + Ok(Response::new(Box::pin(output_stream))) + }, + None => Err(Status::not_found("path not found")), + } + } + + async fn edit(&self, req:Request) -> Result, Status> { + let request = req.into_inner(); + let tx = match self.map.read().unwrap().get(&request.path) { + Some(handle) => { + if format!("{:x}", *handle.digest.borrow()) != request.hash { + return Ok(Response::new(BufferResponse { accepted : false } )); + } + handle.edit.clone() + }, + None => return Err(Status::not_found("path not found")), + }; + let opseq : OperationSeq = serde_json::from_str(&request.opseq).unwrap(); + tx.send(opseq).await.unwrap(); + info!("sent edit to buffer"); + Ok(Response::new(BufferResponse { accepted: true })) + } + + async fn create(&self, req:Request) -> Result, Status> { + let request = req.into_inner(); + let _handle = self.map.write().unwrap().handle(request.path, request.content); + info!("created new buffer"); + let answ = BufferResponse { accepted: true }; + Ok(Response::new(answ)) + } + +} + +impl BufferService { + pub fn new() -> BufferService { + BufferService { + map: Arc::new(RwLock::new(HashMap::new().into())), + } + } + + pub fn server(self) -> BufferServer { + BufferServer::new(self) + } +} diff --git a/src/server/main.rs b/src/server/main.rs index e74133d..377e346 100644 --- a/src/server/main.rs +++ b/src/server/main.rs @@ -4,19 +4,13 @@ //! all clients and synching everyone's cursor. //! -pub mod actor; -pub mod service; - -use std::sync::Arc; +mod buffer; use tracing::info; use tonic::transport::Server; -use crate::{ - actor::state::StateManager, - service::{buffer::BufferService, workspace::WorkspaceService, session::SessionService}, -}; +use crate::buffer::service::BufferService; #[tokio::main] async fn main() -> Result<(), Box> { @@ -24,14 +18,10 @@ async fn main() -> Result<(), Box> { let addr = "[::1]:50051".parse()?; - let state = Arc::new(StateManager::new()); - info!("Starting server"); Server::builder() - .add_service(SessionService::new(state.clone()).server()) - .add_service(WorkspaceService::new(state.clone()).server()) - .add_service(BufferService::new(state.clone()).server()) + .add_service(BufferService::new().server()) .serve(addr) .await?; diff --git a/src/server/service/buffer.rs b/src/server/service/buffer.rs deleted file mode 100644 index 4d6f7a4..0000000 --- a/src/server/service/buffer.rs +++ /dev/null @@ -1,156 +0,0 @@ -use std::collections::VecDeque; -use std::{pin::Pin, sync::Arc}; - -use uuid::Uuid; - -use tokio_stream::wrappers::ReceiverStream; -use tracing::error; - -use operational_transform::OperationSeq; -use tonic::{Request, Response, Status}; - -pub mod proto { - tonic::include_proto!("buffer"); -} - -use library::events::Event; - -use tokio::sync::{broadcast, mpsc}; -use tokio_stream::{Stream, StreamExt}; // TODO example used this? - -use proto::buffer_server::{Buffer, BufferServer}; -use proto::Operation; - -use tonic::Streaming; -//use futures::{Stream, StreamExt}; - -use crate::actor::{buffer::BufferView, state::StateManager}; - -use self::proto::{BufferPayload, BufferResponse}; // TODO fuck x2! - -type OperationStream = Pin> + Send>>; - -pub struct BufferService { - state: Arc, -} - -fn op_seq(_o: &Operation) -> OperationSeq { - todo!() -} -fn _op_net(_o: &OperationSeq) -> Operation { - todo!() -} - -// async fn buffer_worker(tx: mpsc::Sender>, mut rx:Streaming, mut rx_core: mpsc::Receiver) { -async fn buffer_worker( - bv: BufferView, - mut client_rx: Streaming, - _tx_client: mpsc::Sender>, - mut rx_core: broadcast::Receiver, -) { - let mut queue: VecDeque = VecDeque::new(); - loop { - tokio::select! { - client_op = client_rx.next() => { - if let Some(result) = client_op { - match result { - Ok(op) => { - bv.op(op_seq(&op)).await.unwrap(); // TODO make OpSeq from network Operation pkt! - queue.push_back(op); - }, - Err(status) => { - error!("error receiving op from client: {:?}", status); - break; - } - } - } - }, - - server_op = rx_core.recv() => { - if let Ok(_oop) = server_op { - let mut send_op = true; - for (i, _op) in queue.iter().enumerate() { - if true { // TODO must compare underlying OperationSeq here! (op.equals(server_op)) - queue.remove(i); - send_op = false; - break; - } else { - // serv_op.transform(op); // TODO transform OpSeq ! - } - } - if send_op { - // tx_client.send(Ok(op_net(&oop.1))).await.unwrap(); - } - } - } - } - } -} - -#[tonic::async_trait] -impl Buffer for BufferService { - // type ServerStreamingEchoStream = ResponseStream; - type AttachStream = OperationStream; - - async fn attach( - &self, - req: Request>, - ) -> Result, Status> { - let session_id: String; - if let Some(sid) = req.metadata().get("session_id") { - session_id = sid.to_str().unwrap().to_string(); - } else { - return Err(Status::failed_precondition( - "Missing metadata key 'session_id'", - )); - } - - let path: String; - if let Some(p) = req.metadata().get("path") { - path = p.to_str().unwrap().to_string(); - } else { - return Err(Status::failed_precondition("Missing metadata key 'path'")); - } - // TODO make these above nicer? more concise? idk - - if let Some(workspace) = self.state.workspaces.borrow().get(&Uuid::parse_str(session_id.as_str()).unwrap()) { - let in_stream = req.into_inner(); - let (tx_og, rx) = mpsc::channel::>(128); - - let b: BufferView = workspace.buffers.borrow().get(&path).unwrap().clone(); - let w = workspace.clone(); - tokio::spawn(async move { - buffer_worker(b, in_stream, tx_og, w.bus.subscribe()).await; - }); - - // echo just write the same data that was received - let out_stream = ReceiverStream::new(rx); - - return Ok(Response::new(Box::pin(out_stream) as Self::AttachStream)); - } else { - return Err(Status::not_found(format!( - "Norkspace with session_id {}", - session_id - ))); - } - } - - async fn push(&self, _req:Request) -> Result, Status> { - todo!() - } - - async fn pull(&self, _req:Request) -> Result, Status> { - todo!() - } - -} - -impl BufferService { - pub fn new(state: Arc) -> BufferService { - BufferService { state } - } - - pub fn server(self) -> BufferServer { - BufferServer::new(self) - } -} diff --git a/src/server/service/mod.rs b/src/server/service/mod.rs deleted file mode 100644 index a43d4c0..0000000 --- a/src/server/service/mod.rs +++ /dev/null @@ -1,3 +0,0 @@ -pub mod buffer; -pub mod session; -pub mod workspace; diff --git a/src/server/service/session.rs b/src/server/service/session.rs deleted file mode 100644 index c0aad35..0000000 --- a/src/server/service/session.rs +++ /dev/null @@ -1,59 +0,0 @@ -pub mod proto { - tonic::include_proto!("session"); -} - -use std::sync::Arc; - -use proto::{session_server::Session, WorkspaceBuilderRequest, SessionResponse}; -use tonic::{Request, Response, Status}; - - -use crate::actor::{ - state::StateManager, workspace::Workspace, // TODO fuck x2! -}; - -use self::proto::session_server::SessionServer; - -#[derive(Debug)] -pub struct SessionService { - state: Arc, -} - -#[tonic::async_trait] -impl Session for SessionService { - async fn create_workspace( - &self, - _req: Request, - ) -> Result, Status> { - // let name = req.extensions().get::().unwrap().id.clone(); - let w = Workspace::new("im lazy".into()); - let res = SessionResponse { accepted:true, session_key: w.id.to_string() }; - - self.state.view().add(w).await; - Ok(Response::new(res)) - } - - // async fn authenticate( - // &self, - // req: Request, - // ) -> Result, Status> { - // todo!() - // } - - // async fn list_workspaces( - // &self, - // req: Request, - // ) -> Result, Status> { - // todo!() - // } -} - -impl SessionService { - pub fn new(state: Arc) -> SessionService { - SessionService { state } - } - - pub fn server(self) -> SessionServer { - SessionServer::new(self) - } -} diff --git a/src/server/service/workspace.rs b/src/server/service/workspace.rs deleted file mode 100644 index b573bdf..0000000 --- a/src/server/service/workspace.rs +++ /dev/null @@ -1,258 +0,0 @@ -use std::{pin::Pin, sync::Arc}; - -use uuid::Uuid; -use tonic::codegen::InterceptedService; -use tonic::service::Interceptor; -use tracing::info; - -use tokio_stream::wrappers::ReceiverStream; -use tonic::{Request, Response, Status, Streaming}; -use tokio::sync::{watch, mpsc}; - -pub mod proto { - tonic::include_proto!("workspace"); -} - -use library::user::User; - -use tokio_stream::{Stream, StreamExt}; // TODO example used this? - -use proto::workspace_server::{Workspace, WorkspaceServer}; -use proto::{BufferList, WorkspaceEvent, WorkspaceRequest, WorkspaceResponse, UsersList, BufferRequest, CursorUpdate, JoinRequest}; - -use library::user::UserCursor; -use crate::actor::{buffer::Buffer, state::StateManager}; // TODO fuck x2! - -pub struct WorkspaceExtension { - pub id: String -} - -#[derive(Debug, Clone)] -pub struct WorkspaceInterceptor { - state: Arc, -} - -impl Interceptor for WorkspaceInterceptor { - fn call(&mut self, mut req: tonic::Request<()>) -> Result, Status> { - // Set an extension that can be retrieved by `say_hello` - let id; - - // TODO this is kinda spaghetti but I can't borrow immutably and mutably req inside this match - // tree... - match req.metadata().get("workspace") { - Some(value) => { - info!("Metadata: {:?}", value); - match value.to_str() { - Ok(w_id) => { - id = w_id.to_string(); - }, - Err(_) => return Err(Status::invalid_argument("Workspace key is not valid")), - } - }, - None => return Err(Status::unauthenticated("No workspace key included in request")) - } - - info!("checking request : {}", id); - - let uid = match Uuid::parse_str(id.as_str()) { - Ok(id) => id, - Err(e) => { return Err(Status::invalid_argument(format!("Invalid uuid : {}", e))); }, - }; - - if !self.state.workspaces.borrow().contains_key(&uid) { - return Err(Status::not_found(format!("Workspace '{}' could not be found", id))); - } - - req.extensions_mut().insert(WorkspaceExtension { id }); - Ok(req) - } -} - - -type EventStream = Pin> + Send>>; -type CursorUpdateStream = Pin> + Send>>; - -#[derive(Debug)] -pub struct WorkspaceService { - state: Arc, -} - -#[tonic::async_trait] -impl Workspace for WorkspaceService { - type JoinStream = EventStream; - type SubscribeStream = CursorUpdateStream; - - async fn join( - &self, - req: Request, - ) -> Result, Status> { - let session_id = Uuid::parse_str(req.extensions().get::().unwrap().id.as_str()).unwrap(); - let r = req.into_inner(); - let run = self.state.run.clone(); - let user_name = r.name.clone(); - match self.state.get(&session_id) { - Some(w) => { - let (tx, rx) = mpsc::channel::>(128); - tokio::spawn(async move { - let mut event_receiver = w.bus.subscribe(); - w.view().users.add( - User { - name: r.name.clone(), - cursor: UserCursor { buffer:0, x:0, y:0 } - } - ).await; - info!("User {} joined workspace {}", r.name, w.id); - while run.borrow().to_owned() { - let res = event_receiver.recv().await.unwrap(); - let broadcasting = WorkspaceEvent { id: 1, body: Some(res.to_string()) }; // TODO actually process packet - tx.send(Ok(broadcasting)).await.unwrap(); - } - w.view().users.remove(user_name).await; - }); - return Ok(Response::new(Box::pin(ReceiverStream::new(rx)))); - }, - None => Err(Status::not_found(format!( - "No active workspace with session_key '{}'", - session_id - ))) - } - } - - async fn subscribe( - &self, - req: tonic::Request>, - ) -> Result, Status> { - let s_id = Uuid::parse_str(req.extensions().get::().unwrap().id.as_str()).unwrap(); - let mut r = req.into_inner(); - match self.state.get(&s_id) { - Some(w) => { - let cursors_ref = w.cursors.clone(); - let (_stop_tx, stop_rx) = watch::channel(true); - let (tx, rx) = mpsc::channel::>(128); - tokio::spawn(async move { - let mut workspace_bus = cursors_ref.subscribe(); - while stop_rx.borrow().to_owned() { - tokio::select!{ - remote = workspace_bus.recv() => { - if let Ok(cur) = remote { - info!("Sending cursor update : {:?}", cur); - tx.send(Ok(cur)).await.unwrap(); - } - }, - local = r.next() => { - match local { - Some(request) => { - info!("Received cursor update : {:?}", request); - match request { - Ok(cur) => { - cursors_ref.send(cur).unwrap(); - }, - Err(_e) => {}, - } - }, - None => {}, - } - }, - } - } - }); - return Ok(Response::new(Box::pin(ReceiverStream::new(rx)))); - }, - None => Err(Status::not_found(format!( - "No active workspace with session_key '{}'", - s_id - ))) - } - } - - async fn buffers( - &self, - req: Request, - ) -> Result, Status> { - let r = req.into_inner(); - match self.state.get(&Uuid::parse_str(r.session_key.as_str()).unwrap()) { - Some(w) => { - let mut out = Vec::new(); - for (_k, v) in w.buffers.borrow().iter() { - out.push(v.name.clone()); - } - Ok(Response::new(BufferList { path: out })) - } - None => Err(Status::not_found(format!( - "No active workspace with session_key '{}'", - r.session_key - ))), - } - } - - async fn new_buffer( - &self, - req: Request, - ) -> Result, Status> { - let session_id = req.extensions().get::().unwrap().id.clone(); - let r = req.into_inner(); - if let Some(w) = self.state.get(&Uuid::parse_str(session_id.as_str()).unwrap()) { - let mut view = w.view(); - let buf = Buffer::new(r.path, w.bus.clone()); - view.buffers.add(buf).await; - - Ok(Response::new(WorkspaceResponse { accepted: true })) - } else { - return Err(Status::not_found(format!( - "No active workspace with session_key '{}'", - r.session_key - ))); - } - } - - async fn remove_buffer( - &self, - req: Request, - ) -> Result, Status> { - let session_id = req.extensions().get::().unwrap().id.clone(); - let r = req.into_inner(); - match self.state.get(&Uuid::parse_str(session_id.as_str()).unwrap()) { - Some(w) => { - w.view().buffers.remove(r.path).await; - Ok(Response::new(WorkspaceResponse { accepted: true })) - } - None => Err(Status::not_found(format!( - "No active workspace with session_key '{}'", - r.session_key - ))), - } - } - - async fn list_users( - &self, - req: Request, - ) -> Result, Status> { - let session_id = req.extensions().get::().unwrap().id.clone(); - let r = req.into_inner(); - match self.state.get(&Uuid::parse_str(session_id.as_str()).unwrap()) { - Some(w) => { - let mut out = Vec::new(); - for (_k, v) in w.users.borrow().iter() { - out.push(v.name.clone()); - } - Ok(Response::new(UsersList { name: out })) - }, - None => Err(Status::not_found(format!( - "No active workspace with session_key '{}'", - r.session_key - ))), - } - } - -} - -impl WorkspaceService { - pub fn new(state: Arc) -> WorkspaceService { - WorkspaceService { state } - } - - pub fn server(self) -> InterceptedService, WorkspaceInterceptor> { - let state = self.state.clone(); - WorkspaceServer::with_interceptor(self, WorkspaceInterceptor { state }) - } -}