diff --git a/Cargo.toml b/Cargo.toml index 00370e3..0e6edc6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,7 +13,7 @@ name = "codemp" # core tracing = "0.1" tonic = { version = "0.9", features = ["tls", "tls-roots"] } -prost = "0.11.8" +prost = { version = "0.11.8", optional = true } md5 = "0.7.0" uuid = { version = "1.3.1", features = ["v4"] } operational-transform = { version = "0.6", features = ["serde"] } @@ -23,6 +23,12 @@ serde = { version = "1", optional = false } serde_json = { version = "1", optional = false } tracing-subscriber = { version = "0.3", optional = true } similar = { version = "2.2", features = ["inline"] } +lazy_static = { version = "1.4", optional = true } [build-dependencies] tonic-build = "0.9" + +[features] +default = ["proto", "static"] +proto = ["dep:prost"] +static = ["dep:lazy_static"] diff --git a/build.rs b/build.rs index 838368e..8c727c0 100644 --- a/build.rs +++ b/build.rs @@ -1,4 +1,5 @@ fn main() -> Result<(), Box> { tonic_build::compile_protos("proto/buffer.proto")?; + tonic_build::compile_protos("proto/cursor.proto")?; Ok(()) } diff --git a/client/vscode/src/lib.rs b/client/vscode/src/lib.rs index 7ad69ef..4c2794f 100644 --- a/client/vscode/src/lib.rs +++ b/client/vscode/src/lib.rs @@ -3,9 +3,13 @@ use std::sync::Arc; use neon::prelude::*; use once_cell::sync::OnceCell; use codemp::{ - controller::{cursor::{CursorSubscriber, CursorControllerHandle}, buffer::{OperationControllerHandle, OperationControllerSubscriber}}, - client::CodempClient, - proto::buffer_client::BufferClient, factory::OperationFactory, + cursor::controller::{CursorSubscriber, CursorControllerHandle}, + buffer::{ + controller::{OperationControllerHandle, OperationControllerSubscriber}, + client::CodempClient, + factory::OperationFactory, + }, + proto::buffer_client::BufferClient, }; use codemp::tokio::{runtime::Runtime, sync::Mutex}; @@ -75,7 +79,7 @@ fn create_client(mut cx: FunctionContext) -> JsResult { let channel = cx.channel(); runtime(&mut cx)?.spawn(async move { - match rc.lock().await.create(path, content).await { + match rc.lock().await.create(&path, content.as_deref()).await { Ok(accepted) => deferred.settle_with(&channel, move |mut cx| Ok(cx.boolean(accepted))), Err(e) => deferred.settle_with(&channel, move |mut cx| cx.throw_error::>(e.to_string())), } @@ -123,7 +127,7 @@ fn attach_client(mut cx: FunctionContext) -> JsResult { let channel = cx.channel(); runtime(&mut cx)?.spawn(async move { - match rc.lock().await.attach(path).await { + match rc.lock().await.attach(&path).await { Ok(controller) => { deferred.settle_with(&channel, move |mut cx| { let obj = cx.empty_object(); @@ -183,7 +187,7 @@ fn callback_operation(mut cx: FunctionContext) -> JsResult { let boxed : Handle> = this.get(&mut cx, "boxed")?; let callback = Arc::new(cx.argument::(0)?.root(&mut cx)); - let mut rc = boxed.0.clone(); + let rc = boxed.0.clone(); let channel = cx.channel(); // TODO when garbage collecting OperationController stop this worker @@ -213,7 +217,7 @@ fn callback_cursor(mut cx: FunctionContext) -> JsResult { let boxed : Handle> = this.get(&mut cx, "boxed")?; let callback = Arc::new(cx.argument::(0)?.root(&mut cx)); - let mut rc = boxed.0.clone(); + let rc = boxed.0.clone(); let channel = cx.channel(); // TODO when garbage collecting OperationController stop this worker diff --git a/proto/buffer.proto b/proto/buffer.proto index 9b33b2e..b668a44 100644 --- a/proto/buffer.proto +++ b/proto/buffer.proto @@ -1,26 +1,16 @@ syntax = "proto3"; -package buffer; + +package codemp.buffer; service Buffer { rpc Attach (BufferPayload) returns (stream RawOp); - rpc Edit (OperationRequest) returns (BufferResponse); - rpc Create (BufferPayload) returns (BufferResponse); + rpc Edit (OperationRequest) returns (BufferEditResponse); + rpc Create (BufferPayload) returns (BufferCreateResponse); rpc Sync (BufferPayload) returns (BufferResponse); - rpc Moved (Cursor) returns (BufferResponse); - rpc Listen (BufferPayload) returns (stream Cursor); } -message Position { - int32 row = 1; - int32 col = 2; -} - -message Cursor { - string user = 1; - string buffer = 2; - Position start = 3; - Position end = 4; -} +message BufferCreateResponse {} +message BufferEditResponse {} message RawOp { string opseq = 1; @@ -41,6 +31,5 @@ message BufferPayload { } message BufferResponse { - bool accepted = 1; - optional string content = 2; + string content = 2; } diff --git a/proto/cursor.proto b/proto/cursor.proto new file mode 100644 index 0000000..5af4fa2 --- /dev/null +++ b/proto/cursor.proto @@ -0,0 +1,30 @@ +syntax = "proto3"; + +package codemp.cursor; + +service Cursor { + rpc Moved (CursorEvent) returns (MovedResponse); + rpc Listen (UserIdentity) returns (stream CursorEvent); +} + +message MovedResponse {} + +message RowCol { + int32 row = 1; + int32 col = 2; +} + +message CursorPosition { + string buffer = 1; + RowCol start = 2; + RowCol end = 3; +} + +message CursorEvent { + string user = 1; + CursorPosition position = 2; +} + +message UserIdentity { + string id = 1; +} diff --git a/server/src/buffer/service.rs b/server/src/buffer/service.rs index 5785eda..5745981 100644 --- a/server/src/buffer/service.rs +++ b/server/src/buffer/service.rs @@ -1,17 +1,16 @@ use std::{pin::Pin, sync::{Arc, RwLock}, collections::HashMap}; -use tokio::sync::{mpsc, broadcast, oneshot}; +use tokio::sync::{mpsc, oneshot}; use tonic::{Request, Response, Status}; use tokio_stream::{Stream, wrappers::ReceiverStream}; // TODO example used this? -use codemp::proto::{buffer_server::{Buffer, BufferServer}, RawOp, BufferPayload, BufferResponse, OperationRequest, Cursor}; +use codemp::proto::{buffer_server::Buffer, RawOp, BufferPayload, BufferResponse, OperationRequest, BufferEditResponse, BufferCreateResponse}; use tracing::info; use super::actor::{BufferHandle, BufferStore}; type OperationStream = Pin> + Send>>; -type CursorStream = Pin> + Send>>; struct BufferMap { store: HashMap, @@ -34,15 +33,12 @@ impl BufferStore for BufferMap { pub struct BufferService { map: Arc>, - cursor: broadcast::Sender, } -impl BufferService { - #[allow(unused)] - fn get_buffer(&self, path: &String) -> Result { - match self.map.read().unwrap().get(path) { - Some(buf) => Ok(buf.clone()), - None => Err(Status::not_found("no buffer for given path")), +impl Default for BufferService { + fn default() -> BufferService { + BufferService { + map: Arc::new(RwLock::new(HashMap::new().into())), } } } @@ -50,7 +46,6 @@ impl BufferService { #[tonic::async_trait] impl Buffer for BufferService { type AttachStream = OperationStream; - type ListenStream = CursorStream; async fn attach(&self, req: Request) -> Result, Status> { let request = req.into_inner(); @@ -73,29 +68,7 @@ impl Buffer for BufferService { } } - async fn listen(&self, req: Request) -> Result, Status> { - let mut sub = self.cursor.subscribe(); - let myself = req.into_inner().user; - let (tx, rx) = mpsc::channel(128); - tokio::spawn(async move { - while let Ok(v) = sub.recv().await { - if v.user == myself { continue } - tx.send(Ok(v)).await.unwrap(); // TODO unnecessary channel? - } - }); - let output_stream = ReceiverStream::new(rx); - info!("registered new subscriber to cursor updates"); - Ok(Response::new(Box::pin(output_stream))) - } - - async fn moved(&self, req:Request) -> Result, Status> { - match self.cursor.send(req.into_inner()) { - Ok(_) => Ok(Response::new(BufferResponse { accepted: true, content: None})), - Err(e) => Err(Status::internal(format!("could not broadcast cursor update: {}", e))), - } - } - - async fn edit(&self, req:Request) -> Result, Status> { + async fn edit(&self, req:Request) -> Result, Status> { let request = req.into_inner(); let tx = match self.map.read().unwrap().get(&request.path) { Some(handle) => { @@ -110,19 +83,20 @@ impl Buffer for BufferService { let (ack, status) = oneshot::channel(); match tx.send((ack, request)).await { Err(e) => Err(Status::internal(format!("error sending edit to buffer actor: {}", e))), - Ok(()) => Ok(Response::new(BufferResponse { - accepted: status.await.unwrap_or(false), - content: None - })) + Ok(()) => { + match status.await { + Ok(_accepted) => Ok(Response::new(BufferEditResponse { })), + Err(e) => Err(Status::internal(format!("error receiving edit result: {}", e))), + } + } } } - async fn create(&self, req:Request) -> Result, Status> { + async fn create(&self, req:Request) -> Result, Status> { let request = req.into_inner(); let _handle = self.map.write().unwrap().handle(request.path, request.content); info!("created new buffer"); - let answ = BufferResponse { accepted: true, content: None }; - Ok(Response::new(answ)) + Ok(Response::new(BufferCreateResponse { })) } async fn sync(&self, req: Request) -> Result, Status> { @@ -131,23 +105,9 @@ impl Buffer for BufferService { None => Err(Status::not_found("requested buffer does not exist")), Some(buf) => { info!("synching buffer"); - let answ = BufferResponse { accepted: true, content: Some(buf.content.borrow().clone()) }; + let answ = BufferResponse { content: buf.content.borrow().clone() }; Ok(Response::new(answ)) } } } } - -impl BufferService { - pub fn new() -> BufferService { - let (cur_tx, _cur_rx) = broadcast::channel(64); // TODO hardcoded capacity - BufferService { - map: Arc::new(RwLock::new(HashMap::new().into())), - cursor: cur_tx, - } - } - - pub fn server(self) -> BufferServer { - BufferServer::new(self) - } -} diff --git a/server/src/cursor/mod.rs b/server/src/cursor/mod.rs new file mode 100644 index 0000000..1f278a4 --- /dev/null +++ b/server/src/cursor/mod.rs @@ -0,0 +1 @@ +pub mod service; diff --git a/server/src/cursor/service.rs b/server/src/cursor/service.rs new file mode 100644 index 0000000..b641c95 --- /dev/null +++ b/server/src/cursor/service.rs @@ -0,0 +1,52 @@ +use std::pin::Pin; + +use tokio::sync::{mpsc, broadcast}; +use tonic::{Request, Response, Status}; + +use tokio_stream::{Stream, wrappers::ReceiverStream}; // TODO example used this? + +use codemp::proto::{cursor_server::Cursor, UserIdentity, CursorPosition, MovedResponse}; +use tracing::info; + +type CursorStream = Pin> + Send>>; + +pub struct CursorService { + cursor: broadcast::Sender, +} + +#[tonic::async_trait] +impl Cursor for CursorService { + type ListenStream = CursorStream; + + async fn listen(&self, req: Request) -> Result, Status> { + let mut sub = self.cursor.subscribe(); + let myself = req.into_inner().id; + let (tx, rx) = mpsc::channel(128); + tokio::spawn(async move { + while let Ok(v) = sub.recv().await { + if v.user == myself { continue } + tx.send(Ok(v)).await.unwrap(); // TODO unnecessary channel? + } + }); + let output_stream = ReceiverStream::new(rx); + info!("registered new subscriber to cursor updates"); + Ok(Response::new(Box::pin(output_stream))) + } + + async fn moved(&self, req:Request) -> Result, Status> { + match self.cursor.send(req.into_inner()) { + Ok(_) => Ok(Response::new(MovedResponse { })), + Err(e) => Err(Status::internal(format!("could not broadcast cursor update: {}", e))), + } + } +} + +impl Default for CursorService { + fn default() -> Self { + let (cur_tx, _cur_rx) = broadcast::channel(64); // TODO hardcoded capacity + // TODO don't drop receiver because sending event when there are no receivers throws an error + CursorService { + cursor: cur_tx, + } + } +} diff --git a/server/src/main.rs b/server/src/main.rs index 13fe906..ec951e1 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -5,12 +5,16 @@ //! use clap::Parser; +use codemp::proto::buffer_server::BufferServer; +use codemp::proto::cursor_server::CursorServer; use tracing::info; use tonic::transport::Server; mod buffer; +mod cursor; use crate::buffer::service::BufferService; +use crate::cursor::service::CursorService; #[derive(Parser, Debug)] struct CliArgs { @@ -37,7 +41,8 @@ async fn main() -> Result<(), Box> { info!("binding on {}", args.host); Server::builder() - .add_service(BufferService::new().server()) + .add_service(BufferServer::new(BufferService::default())) + .add_service(CursorServer::new(CursorService::default())) .serve(args.host.parse()?) .await?; diff --git a/src/buffer/controller.rs b/src/buffer/controller.rs new file mode 100644 index 0000000..73e967e --- /dev/null +++ b/src/buffer/controller.rs @@ -0,0 +1,41 @@ +use operational_transform::OperationSeq; +use tokio::sync::{watch, mpsc, broadcast, Mutex}; +use tonic::async_trait; + +use crate::{Controller, CodempError}; +use crate::buffer::factory::{leading_noop, tailing_noop, OperationFactory}; + +use super::TextChange; + +pub struct BufferController { + content: watch::Receiver, + operations: mpsc::Sender, + stream: Mutex>, +} + +#[async_trait] +impl OperationFactory for BufferController { + fn content(&self) -> String { + self.content.borrow().clone() + } +} + +#[async_trait] +impl Controller for BufferController { + type Input = OperationSeq; + + async fn recv(&self) -> Result { + let op = self.stream.lock().await.recv().await?; + let after = self.content.borrow().clone(); + let skip = leading_noop(op.ops()) as usize; + let before_len = op.base_len(); + let tail = tailing_noop(op.ops()) as usize; + let span = skip..before_len-tail; + let content = after[skip..after.len()-tail].to_string(); + Ok(TextChange { span, content }) + } + + async fn send(&self, op: OperationSeq) -> Result<(), CodempError> { + Ok(self.operations.send(op).await?) + } +} diff --git a/src/factory.rs b/src/buffer/factory.rs similarity index 74% rename from src/factory.rs rename to src/buffer/factory.rs index b9c47c2..4c175e0 100644 --- a/src/factory.rs +++ b/src/buffer/factory.rs @@ -1,6 +1,25 @@ -use operational_transform::OperationSeq; +use std::ops::Range; + +use operational_transform::{OperationSeq, Operation}; use similar::{TextDiff, ChangeTag}; +pub const fn leading_noop(seq: &[Operation]) -> u64 { count_noop(seq.first()) } +pub const fn tailing_noop(seq: &[Operation]) -> u64 { count_noop(seq.last()) } + +const fn count_noop(op: Option<&Operation>) -> u64 { + match op { + None => 0, + Some(Operation::Retain(n)) => *n, + Some(_) => 0, + } +} + +pub fn op_effective_range(op: &OperationSeq) -> Range { + let first = leading_noop(op.ops()); + let last = op.base_len() as u64 - tailing_noop(op.ops()); + first..last +} + pub trait OperationFactory { fn content(&self) -> String; diff --git a/src/buffer/mod.rs b/src/buffer/mod.rs new file mode 100644 index 0000000..922e217 --- /dev/null +++ b/src/buffer/mod.rs @@ -0,0 +1,11 @@ +use std::ops::Range; + +pub(crate) mod worker; +pub mod controller; +pub mod factory; + + +pub struct TextChange { + pub span: Range, + pub content: String, +} diff --git a/src/buffer/worker.rs b/src/buffer/worker.rs new file mode 100644 index 0000000..43bb008 --- /dev/null +++ b/src/buffer/worker.rs @@ -0,0 +1,114 @@ +use std::{sync::Arc, collections::VecDeque}; + +use operational_transform::OperationSeq; +use tokio::sync::{watch, mpsc, broadcast, Mutex}; +use tonic::transport::Channel; +use tonic::{async_trait, Streaming}; + +use crate::proto::{OperationRequest, RawOp}; +use crate::proto::buffer_client::BufferClient; +use crate::ControllerWorker; + +use super::TextChange; +use super::controller::BufferController; + + +pub(crate) struct BufferControllerWorker { + uid: String, + pub(crate) content: watch::Sender, + pub(crate) operations: mpsc::Receiver, + pub(crate) stream: Arc>, + pub(crate) queue: VecDeque, + receiver: watch::Receiver, + sender: mpsc::Sender, + buffer: String, + path: String, +} + +impl BufferControllerWorker { + pub fn new(uid: String, buffer: &str, path: &str) -> Self { + let (txt_tx, txt_rx) = watch::channel(buffer.to_string()); + let (op_tx, op_rx) = mpsc::channel(64); + let (s_tx, _s_rx) = broadcast::channel(64); + BufferControllerWorker { + uid, + content: txt_tx, + operations: op_rx, + stream: Arc::new(s_tx), + receiver: txt_rx, + sender: op_tx, + queue: VecDeque::new(), + buffer: buffer.to_string(), + path: path.to_string(), + } + } +} + +#[async_trait] +impl ControllerWorker for BufferControllerWorker { + type Controller = BufferController; + type Tx = BufferClient; + type Rx = Streaming; + + fn subscribe(&self) -> BufferController { + BufferController { + content: self.receiver.clone(), + operations: self.sender.clone(), + stream: Mutex::new(self.stream.subscribe()), + } + } + + async fn work(mut self, mut tx: Self::Tx, mut rx: Self::Rx) { + loop { + let op = tokio::select! { + Some(operation) = recv_opseq(&mut rx) => { + let mut out = operation; + for op in self.queue.iter_mut() { + (*op, out) = op.transform(&out).unwrap(); + } + self.stream.send(out.clone()).unwrap(); + out + }, + Some(op) = self.operations.recv() => { + self.queue.push_back(op.clone()); + op + }, + else => break + }; + self.buffer = op.apply(&self.buffer).unwrap(); + self.content.send(self.buffer.clone()).unwrap(); + + while let Some(op) = self.queue.get(0) { + if !send_opseq(&mut tx, self.uid.clone(), self.path.clone(), op.clone()).await { break } + self.queue.pop_front(); + } + } + } +} + +async fn send_opseq(tx: &mut BufferClient, uid: String, path: String, op: OperationSeq) -> bool { + let req = OperationRequest { + hash: "".into(), + opseq: serde_json::to_string(&op).unwrap(), + path, + user: uid, + }; + match tx.edit(req).await { + Ok(_) => true, + Err(e) => { + tracing::error!("error sending edit: {}", e); + false + } + } +} + +async fn recv_opseq(rx: &mut Streaming) -> Option { + match rx.message().await { + Ok(Some(op)) => Some(serde_json::from_str(&op.opseq).unwrap()), + Ok(None) => None, + Err(e) => { + tracing::error!("could not receive edit from server: {}", e); + None + } + } +} diff --git a/src/client.rs b/src/client.rs index 69fae5d..9691ee2 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,143 +1,117 @@ -use operational_transform::OperationSeq; -use tonic::{transport::Channel, Status, Streaming, async_trait}; -use uuid::Uuid; +use std::{sync::Arc, collections::BTreeMap}; + +use tonic::transport::Channel; use crate::{ - controller::{ControllerWorker, - cursor::{CursorControllerHandle, CursorControllerWorker, CursorEditor}, - buffer::{OperationControllerHandle, OperationControllerEditor, OperationControllerWorker} + cursor::{worker::CursorControllerWorker, controller::CursorController}, + proto::{ + buffer_client::BufferClient, cursor_client::CursorClient, UserIdentity, BufferPayload, }, - proto::{buffer_client::BufferClient, BufferPayload, RawOp, OperationRequest, Cursor}, + CodempError, ControllerWorker, buffer::{controller::BufferController, worker::BufferControllerWorker}, }; -#[derive(Clone)] + pub struct CodempClient { id: String, - client: BufferClient, + client: ServiceClients, + workspace: Option, } -impl From::> for CodempClient { - fn from(value: BufferClient) -> Self { - CodempClient { id: Uuid::new_v4().to_string(), client: value } - } +struct ServiceClients { + buffer: BufferClient, + cursor: CursorClient, } +struct Workspace { + cursor: Arc, + buffers: BTreeMap>, +} + + impl CodempClient { - pub async fn new(dest: &str) -> Result { - Ok(BufferClient::connect(dest.to_string()).await?.into()) + pub async fn new(dst: &str) -> Result { + let buffer = BufferClient::connect(dst.to_string()).await?; + let cursor = CursorClient::connect(dst.to_string()).await?; + let id = uuid::Uuid::new_v4().to_string(); + + Ok(CodempClient { id, client: ServiceClients { buffer, cursor}, workspace: None }) } - pub fn id(&self) -> &str { &self.id } - - pub async fn create(&mut self, path: String, content: Option) -> Result { - let req = BufferPayload { - path, content, - user: self.id.clone(), - }; - - let res = self.client.create(req).await?; - - Ok(res.into_inner().accepted) + pub fn get_cursor(&self) -> Option> { + Some(self.workspace?.cursor.clone()) } - pub async fn listen(&mut self) -> Result { - let req = BufferPayload { - path: "".into(), - content: None, - user: self.id.clone(), - }; + pub fn get_buffer(&self, path: &str) -> Option> { + self.workspace?.buffers.get(path).cloned() + } - let stream = self.client.listen(req).await?.into_inner(); + pub async fn join(&mut self, _session: &str) -> Result, CodempError> { + // TODO there is no real workspace handling in codemp server so it behaves like one big global + // session. I'm still creating this to start laying out the proper use flow + let stream = self.client.cursor.listen(UserIdentity { id: "".into() }).await?.into_inner(); - let controller = CursorControllerWorker::new(self.id().to_string(), (self.clone(), stream)); - let handle = controller.subscribe(); + let controller = CursorControllerWorker::new(self.id.clone()); + let client = self.client.cursor.clone(); + + let handle = Arc::new(controller.subscribe()); tokio::spawn(async move { tracing::debug!("cursor worker started"); - controller.work().await; + controller.work(client, stream).await; tracing::debug!("cursor worker stopped"); }); + self.workspace = Some( + Workspace { + cursor: handle.clone(), + buffers: BTreeMap::new() + } + ); + Ok(handle) } - pub async fn attach(&mut self, path: String) -> Result { - let req = BufferPayload { - path: path.clone(), - content: None, - user: self.id.clone(), - }; + pub async fn create(&mut self, path: &str, content: Option<&str>) -> Result<(), CodempError> { + if let Some(workspace) = &self.workspace { + self.client.buffer + .create(BufferPayload { + user: self.id.clone(), + path: path.to_string(), + content: content.map(|x| x.to_string()), + }).await?; - let content = self.client.sync(req.clone()) - .await? - .into_inner() - .content - .unwrap_or("".into()); - - let stream = self.client.attach(req).await?.into_inner(); - - let controller = OperationControllerWorker::new((self.clone(), stream), content, path); - let factory = controller.subscribe(); - - tokio::spawn(async move { - tracing::debug!("buffer worker started"); - controller.work().await; - tracing::debug!("buffer worker stopped"); - }); - - Ok(factory) - } -} - -#[async_trait] -impl OperationControllerEditor for (CodempClient, Streaming) { - async fn edit(&mut self, path: String, op: OperationSeq) -> bool { - let req = OperationRequest { - hash: "".into(), - opseq: serde_json::to_string(&op).unwrap(), - path, - user: self.0.id().to_string(), - }; - match self.0.client.edit(req).await { - Ok(res) => res.into_inner().accepted, - Err(e) => { - tracing::error!("error sending edit: {}", e); - false - } + Ok(()) + } else { + Err(CodempError::InvalidState { msg: "join a workspace first".into() }) } } - async fn recv(&mut self) -> Option { - match self.1.message().await { - Ok(Some(op)) => Some(serde_json::from_str(&op.opseq).unwrap()), - Ok(None) => None, - Err(e) => { - tracing::error!("could not receive edit from server: {}", e); - None - } - } - } -} - -#[async_trait] -impl CursorEditor for (CodempClient, Streaming) { - async fn moved(&mut self, cursor: Cursor) -> bool { - match self.0.client.moved(cursor).await { - Ok(res) => res.into_inner().accepted, - Err(e) => { - tracing::error!("could not send cursor movement: {}", e); - false - } - } - } - - async fn recv(&mut self) -> Option { - match self.1.message().await { - Ok(cursor) => cursor, - Err(e) => { - tracing::error!("could not receive cursor update: {}", e); - None - } + pub async fn attach(&mut self, path: &str, content: Option<&str>) -> Result, CodempError> { + if let Some(workspace) = &mut self.workspace { + let mut client = self.client.buffer.clone(); + let req = BufferPayload { + path: path.to_string(), user: self.id.clone(), content: None + }; + + let content = client.sync(req.clone()).await?.into_inner().content; + + let stream = client.attach(req).await?.into_inner(); + + let controller = BufferControllerWorker::new(self.id.clone(), &content, path); + let handler = Arc::new(controller.subscribe()); + + let _path = path.to_string(); + tokio::spawn(async move { + tracing::debug!("buffer[{}] worker started", _path); + controller.work(client, stream).await; + tracing::debug!("buffer[{}] worker stopped", _path); + }); + + workspace.buffers.insert(path.to_string(), handler.clone()); + + Ok(handler) + } else { + Err(CodempError::InvalidState { msg: "join a workspace first".into() }) } } } diff --git a/src/controller/buffer.rs b/src/controller/buffer.rs deleted file mode 100644 index 11ca642..0000000 --- a/src/controller/buffer.rs +++ /dev/null @@ -1,139 +0,0 @@ -use std::{sync::Arc, collections::VecDeque, ops::Range}; - -use operational_transform::OperationSeq; -use tokio::sync::{watch, mpsc, broadcast}; -use tonic::async_trait; - -use super::{leading_noop, tailing_noop, ControllerWorker}; -use crate::errors::IgnorableError; -use crate::factory::OperationFactory; - -pub struct TextChange { - pub span: Range, - pub content: String, -} - -#[async_trait] -pub trait OperationControllerSubscriber { - async fn poll(&mut self) -> Option; - async fn apply(&self, op: OperationSeq); -} - -pub struct OperationControllerHandle { - content: watch::Receiver, - operations: mpsc::Sender, - original: Arc>, - stream: broadcast::Receiver, -} - -impl Clone for OperationControllerHandle { - fn clone(&self) -> Self { - OperationControllerHandle { - content: self.content.clone(), - operations: self.operations.clone(), - original: self.original.clone(), - stream: self.original.subscribe(), - } - } -} - -#[async_trait] -impl OperationFactory for OperationControllerHandle { - fn content(&self) -> String { - self.content.borrow().clone() - } -} - -#[async_trait] -impl OperationControllerSubscriber for OperationControllerHandle { - async fn poll(&mut self) -> Option { - let op = self.stream.recv().await.ok()?; - let after = self.content.borrow().clone(); - let skip = leading_noop(op.ops()) as usize; - let before_len = op.base_len(); - let tail = tailing_noop(op.ops()) as usize; - let span = skip..before_len-tail; - let content = after[skip..after.len()-tail].to_string(); - Some(TextChange { span, content }) - } - - async fn apply(&self, op: OperationSeq) { - self.operations.send(op).await - .unwrap_or_warn("could not apply+send operation") - } -} - -#[async_trait] -pub(crate) trait OperationControllerEditor { - async fn edit(&mut self, path: String, op: OperationSeq) -> bool; - async fn recv(&mut self) -> Option; -} - -pub(crate) struct OperationControllerWorker { - pub(crate) content: watch::Sender, - pub(crate) operations: mpsc::Receiver, - pub(crate) stream: Arc>, - pub(crate) queue: VecDeque, - receiver: watch::Receiver, - sender: mpsc::Sender, - client: C, - buffer: String, - path: String, -} - -#[async_trait] -impl ControllerWorker for OperationControllerWorker { - fn subscribe(&self) -> OperationControllerHandle { - OperationControllerHandle { - content: self.receiver.clone(), - operations: self.sender.clone(), - original: self.stream.clone(), - stream: self.stream.subscribe(), - } - } - - async fn work(mut self) { - loop { - let op = tokio::select! { - Some(operation) = self.client.recv() => { - let mut out = operation; - for op in self.queue.iter_mut() { - (*op, out) = op.transform(&out).unwrap(); - } - self.stream.send(out.clone()).unwrap(); - out - }, - Some(op) = self.operations.recv() => { - self.queue.push_back(op.clone()); - op - }, - else => break - }; - self.buffer = op.apply(&self.buffer).unwrap(); - self.content.send(self.buffer.clone()).unwrap(); - - while let Some(op) = self.queue.get(0) { - if !self.client.edit(self.path.clone(), op.clone()).await { break } - self.queue.pop_front(); - } - } - } - -} - -impl OperationControllerWorker { - pub fn new(client: C, buffer: String, path: String) -> Self { - let (txt_tx, txt_rx) = watch::channel(buffer.clone()); - let (op_tx, op_rx) = mpsc::channel(64); - let (s_tx, _s_rx) = broadcast::channel(64); - OperationControllerWorker { - content: txt_tx, - operations: op_rx, - stream: Arc::new(s_tx), - receiver: txt_rx, - sender: op_tx, - queue: VecDeque::new(), - client, buffer, path - } - } -} diff --git a/src/controller/cursor.rs b/src/controller/cursor.rs deleted file mode 100644 index 267e358..0000000 --- a/src/controller/cursor.rs +++ /dev/null @@ -1,104 +0,0 @@ -use std::sync::Arc; - -use tokio::sync::{mpsc, broadcast}; -use tonic::async_trait; - -use crate::{proto::{Position, Cursor}, errors::IgnorableError, controller::ControllerWorker}; - -#[async_trait] -pub trait CursorSubscriber { - async fn send(&self, path: &str, start: Position, end: Position); - async fn poll(&mut self) -> Option; -} - -pub struct CursorControllerHandle { - uid: String, - op: mpsc::Sender, - stream: broadcast::Receiver, - original: Arc>, -} - -impl Clone for CursorControllerHandle { - fn clone(&self) -> Self { - Self { - uid: self.uid.clone(), - op: self.op.clone(), - stream: self.original.subscribe(), - original: self.original.clone() - } - } -} - -#[async_trait] -impl CursorSubscriber for CursorControllerHandle { - async fn send(&self, path: &str, start: Position, end: Position) { - self.op.send(Cursor { - user: self.uid.clone(), - buffer: path.to_string(), - start: Some(start), - end: Some(end), - }).await.unwrap_or_warn("could not send cursor op") - } - - async fn poll(&mut self) -> Option { - match self.stream.recv().await { - Ok(x) => Some(x), - Err(e) => { - tracing::warn!("could not poll for cursor: {}", e); - None - } - } - } -} - -#[async_trait] -pub(crate) trait CursorEditor { - async fn moved(&mut self, cursor: Cursor) -> bool; - async fn recv(&mut self) -> Option; -} - -pub(crate) struct CursorControllerWorker { - uid: String, - producer: mpsc::Sender, - op: mpsc::Receiver, - channel: Arc>, - client: C, -} - -impl CursorControllerWorker { - pub(crate) fn new(uid: String, client: C) -> Self { - let (op_tx, op_rx) = mpsc::channel(64); - let (cur_tx, _cur_rx) = broadcast::channel(64); - CursorControllerWorker { - uid, client, - producer: op_tx, - op: op_rx, - channel: Arc::new(cur_tx), - } - } -} - -#[async_trait] -impl ControllerWorker for CursorControllerWorker { - fn subscribe(&self) -> CursorControllerHandle { - CursorControllerHandle { - uid: self.uid.clone(), - op: self.producer.clone(), - stream: self.channel.subscribe(), - original: self.channel.clone(), - } - } - - async fn work(mut self) { - loop { - tokio::select!{ - Some(cur) = self.client.recv() => self.channel.send(cur).unwrap_or_warn("could not broadcast event"), - Some(op) = self.op.recv() => { self.client.moved(op).await; }, - else => break, - } - } - } - -} - - diff --git a/src/controller/mod.rs b/src/controller/mod.rs deleted file mode 100644 index 86ca589..0000000 --- a/src/controller/mod.rs +++ /dev/null @@ -1,30 +0,0 @@ -pub mod buffer; -pub mod cursor; - -use std::ops::Range; - -use operational_transform::{Operation, OperationSeq}; -use tonic::async_trait; - -#[async_trait] -pub trait ControllerWorker { - fn subscribe(&self) -> T; - async fn work(self); -} - -pub const fn leading_noop(seq: &[Operation]) -> u64 { count_noop(seq.first()) } -pub const fn tailing_noop(seq: &[Operation]) -> u64 { count_noop(seq.last()) } - -const fn count_noop(op: Option<&Operation>) -> u64 { - match op { - None => 0, - Some(Operation::Retain(n)) => *n, - Some(_) => 0, - } -} - -pub fn op_effective_range(op: &OperationSeq) -> Range { - let first = leading_noop(op.ops()); - let last = op.base_len() as u64 - tailing_noop(op.ops()); - first..last -} diff --git a/src/cursor.rs b/src/cursor.rs deleted file mode 100644 index 4ee265e..0000000 --- a/src/cursor.rs +++ /dev/null @@ -1,23 +0,0 @@ -use crate::proto::{Position, Cursor}; - -impl From:: for (i32, i32) { - fn from(pos: Position) -> (i32, i32) { - (pos.row, pos.col) - } -} - -impl From::<(i32, i32)> for Position { - fn from((row, col): (i32, i32)) -> Self { - Position { row, col } - } -} - -impl Cursor { - pub fn start(&self) -> Position { - self.start.clone().unwrap_or((0, 0).into()) - } - - pub fn end(&self) -> Position { - self.end.clone().unwrap_or((0, 0).into()) - } -} diff --git a/src/cursor/controller.rs b/src/cursor/controller.rs new file mode 100644 index 0000000..7e879a7 --- /dev/null +++ b/src/cursor/controller.rs @@ -0,0 +1,51 @@ +use tokio::sync::{mpsc, broadcast::{self, error::RecvError}, Mutex}; +use tonic::async_trait; + +use crate::{proto::{CursorPosition, CursorEvent}, CodempError, Controller}; + +pub struct CursorController { + uid: String, + op: mpsc::Sender, + stream: Mutex>, +} + +#[async_trait] +impl Controller for CursorController { + type Input = CursorPosition; + + async fn send(&self, cursor: CursorPosition) -> Result<(), CodempError> { + Ok(self.op.send(CursorEvent { + user: self.uid.clone(), + position: Some(cursor), + }).await?) + } + + // TODO is this cancelable? so it can be used in tokio::select! + // TODO is the result type overkill? should be an option? + async fn recv(&self) -> Result { + let mut stream = self.stream.lock().await; + match stream.recv().await { + Ok(x) => Ok(x), + Err(RecvError::Closed) => Err(CodempError::Channel { send: false }), + Err(RecvError::Lagged(n)) => { + tracing::error!("cursor channel lagged behind, skipping {} events", n); + Ok(stream.recv().await.expect("could not receive after lagging")) + } + } + } + + // fn try_poll(&self) -> Option> { + // match self.stream.try_lock() { + // Err(_) => None, + // Ok(mut x) => match x.try_recv() { + // Ok(x) => Some(Some(x)), + // Err(TryRecvError::Empty) => None, + // Err(TryRecvError::Closed) => Some(None), + // Err(TryRecvError::Lagged(n)) => { + // tracing::error!("cursor channel lagged behind, skipping {} events", n); + // Some(Some(x.try_recv().expect("could not receive after lagging"))) + // } + // } + // } + // } +} diff --git a/src/cursor/mod.rs b/src/cursor/mod.rs new file mode 100644 index 0000000..c6fbdd9 --- /dev/null +++ b/src/cursor/mod.rs @@ -0,0 +1,26 @@ +pub(crate) mod worker; +pub mod controller; + +use crate::proto::{RowCol, CursorPosition}; + +impl From:: for (i32, i32) { + fn from(pos: RowCol) -> (i32, i32) { + (pos.row, pos.col) + } +} + +impl From::<(i32, i32)> for RowCol { + fn from((row, col): (i32, i32)) -> Self { + RowCol { row, col } + } +} + +impl CursorPosition { + pub fn start(&self) -> RowCol { + self.start.clone().unwrap_or((0, 0).into()) + } + + pub fn end(&self) -> RowCol { + self.end.clone().unwrap_or((0, 0).into()) + } +} diff --git a/src/cursor/worker.rs b/src/cursor/worker.rs new file mode 100644 index 0000000..42fa694 --- /dev/null +++ b/src/cursor/worker.rs @@ -0,0 +1,54 @@ +use std::sync::Arc; + +use tokio::sync::{mpsc, broadcast::{self}, Mutex}; +use tonic::{Streaming, transport::Channel, async_trait}; + +use crate::{proto::{cursor_client::CursorClient, CursorEvent}, errors::IgnorableError, ControllerWorker}; + +use super::controller::CursorController; + +pub(crate) struct CursorControllerWorker { + uid: String, + producer: mpsc::Sender, + op: mpsc::Receiver, + channel: Arc>, +} + +impl CursorControllerWorker { + pub(crate) fn new(uid: String) -> Self { + let (op_tx, op_rx) = mpsc::channel(64); + let (cur_tx, _cur_rx) = broadcast::channel(64); + Self { + uid, + producer: op_tx, + op: op_rx, + channel: Arc::new(cur_tx), + } + } +} + +#[async_trait] +impl ControllerWorker for CursorControllerWorker { + type Controller = CursorController; + type Tx = CursorClient; + type Rx = Streaming; + + fn subscribe(&self) -> CursorController { + CursorController { + uid: self.uid.clone(), + op: self.producer.clone(), + stream: Mutex::new(self.channel.subscribe()), + } + } + + async fn work(mut self, mut tx: Self::Tx, mut rx: Self::Rx) { + loop { + tokio::select!{ + Ok(Some(cur)) = rx.message() => self.channel.send(cur).unwrap_or_warn("could not broadcast event"), + Some(op) = self.op.recv() => { tx.moved(op).await.unwrap_or_warn("could not update cursor"); }, + else => break, + } + } + } +} + diff --git a/src/errors.rs b/src/errors.rs index b8dd254..6b923a7 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -1,3 +1,7 @@ +use std::{error::Error, fmt::Display}; + +use tokio::sync::{mpsc, broadcast}; +use tonic::{Status, Code}; use tracing::warn; pub trait IgnorableError { @@ -13,3 +17,61 @@ where E : std::fmt::Display { } } } + +// TODO split this into specific errors for various parts of the library +#[derive(Debug)] +pub enum CodempError { + Transport { + status: Code, + message: String, + }, + Channel { + send: bool + }, + InvalidState { + msg: String, + }, + + // TODO filler error, remove later + Filler { + message: String, + }, +} + +impl Error for CodempError {} + +impl Display for CodempError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Transport { status, message } => write!(f, "Transport error: ({}) {}", status, message), + Self::Channel { send } => write!(f, "Channel error (send:{})", send), + _ => write!(f, "Unknown error"), + } + } +} + +impl From for CodempError { + fn from(status: Status) -> Self { + CodempError::Transport { status: status.code(), message: status.message().to_string() } + } +} + +impl From for CodempError { + fn from(err: tonic::transport::Error) -> Self { + CodempError::Transport { + status: Code::Unknown, message: format!("underlying transport error: {:?}", err) + } + } +} + +impl From> for CodempError { + fn from(_value: mpsc::error::SendError) -> Self { + CodempError::Channel { send: true } + } +} + +impl From for CodempError { + fn from(_value: broadcast::error::RecvError) -> Self { + CodempError::Channel { send: false } + } +} diff --git a/src/instance.rs b/src/instance.rs new file mode 100644 index 0000000..1421d29 --- /dev/null +++ b/src/instance.rs @@ -0,0 +1,81 @@ +use std::sync::Arc; + +use tokio::sync::Mutex; + +use crate::{ + buffer::controller::BufferController, + errors::CodempError, client::CodempClient, cursor::controller::CursorController, +}; + + +use tokio::runtime::Runtime; + +const CODEMP_DEFAULT_HOST : &str = "http://alemi.dev:50051"; + +lazy_static::lazy_static! { + static ref RUNTIME : Runtime = Runtime::new().expect("could not create tokio runtime"); + static ref INSTANCE : Instance = Instance::default(); +} + +pub struct Instance { + client: Mutex>, +} + +impl Default for Instance { + fn default() -> Self { + Instance { client: Mutex::new(None) } + } +} + +// TODO these methods repeat a lot of code but Mutex makes it hard to simplify + +impl Instance { + pub async fn connect(&self, addr: &str) -> Result<(), CodempError> { + *self.client.lock().await = Some(CodempClient::new(addr).await?); + Ok(()) + } + + pub async fn join(&self, session: &str) -> Result<(), CodempError> { + self.client + .lock() + .await + .as_mut() + .ok_or(CodempError::InvalidState { msg: "connect first".into() })? + .join(session) + .await?; + + Ok(()) + } + + pub async fn create(&self, path: &str, content: Option<&str>) -> Result<(), CodempError> { + self.client + .lock() + .await + .as_mut() + .ok_or(CodempError::InvalidState { msg: "connect first".into() })? + .create(path, content) + .await?; + + Ok(()) + } + + pub async fn get_cursor(&self) -> Result, CodempError> { + self.client + .lock() + .await + .as_mut() + .ok_or(CodempError::InvalidState { msg: "connect first".into() })? + .get_cursor() + .ok_or(CodempError::InvalidState { msg: "join a workspace first".into() }) + } + + pub async fn get_buffer(&self, path: &str) -> Result, CodempError> { + self.client + .lock() + .await + .as_mut() + .ok_or(CodempError::InvalidState { msg: "connect first".into() })? + .get_buffer(path) + .ok_or(CodempError::InvalidState { msg: "join a workspace or create requested buffer first".into() }) + } +} diff --git a/src/lib.rs b/src/lib.rs index 2debfb8..0975600 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,12 +1,39 @@ -pub mod proto; -pub mod client; -pub mod workspace; -pub mod controller; pub mod cursor; pub mod errors; -pub mod factory; +pub mod buffer; + +pub mod client; + +#[cfg(feature = "static")] +pub mod instance; pub use tonic; pub use tokio; pub use operational_transform as ot; +#[cfg(feature = "proto")] +#[allow(non_snake_case)] +pub mod proto { + tonic::include_proto!("codemp.buffer"); + tonic::include_proto!("codemp.cursor"); +} + +pub use errors::CodempError; + +#[tonic::async_trait] // TODO move this somewhere? +pub(crate) trait ControllerWorker { + type Controller : Controller; + type Tx; + type Rx; + + fn subscribe(&self) -> Self::Controller; + async fn work(self, tx: Self::Tx, rx: Self::Rx); +} + +#[tonic::async_trait] +pub trait Controller { + type Input; + + async fn send(&self, x: Self::Input) -> Result<(), CodempError>; + async fn recv(&self) -> Result; +} diff --git a/src/proto.rs b/src/proto.rs deleted file mode 100644 index 4e168ef..0000000 --- a/src/proto.rs +++ /dev/null @@ -1 +0,0 @@ -tonic::include_proto!("buffer"); diff --git a/src/workspace.rs b/src/workspace.rs deleted file mode 100644 index 56609da..0000000 --- a/src/workspace.rs +++ /dev/null @@ -1,57 +0,0 @@ -use std::{collections::BTreeMap, ops::Range}; - -use tokio::sync::RwLock; -use tonic::Status; - -use crate::{ - client::CodempClient, factory::OperationFactory, - controller::{buffer::{OperationControllerHandle, OperationControllerSubscriber}, - cursor::{CursorControllerHandle, CursorSubscriber}}, proto::Cursor, -}; - -pub struct Workspace { - client: CodempClient, - buffers: RwLock>, - cursor: RwLock, -} - -impl Workspace { - pub async fn new(mut client: CodempClient) -> Result { - Ok( - Workspace { - cursor: RwLock::new(client.listen().await?), - client, - buffers: RwLock::new(BTreeMap::new()), - } - ) - } - - pub async fn create(&self, path: &str, content: Option) -> Result { - self.client.clone().create(path.into(), content).await - } - - pub async fn attach(&self, path: String) -> Result<(), Status> { - self.buffers.write().await.insert( - path.clone(), - self.client.clone().attach(path).await? - ); - Ok(()) - } - - pub async fn diff(&self, path: &str, span: Range, text: &str) { - if let Some(controller) = self.buffers.read().await.get(path) { - if let Some(op) = controller.delta(span.start, text, span.end) { - controller.apply(op).await - } - } - } - - pub async fn send(&self, path: &str, start: (i32, i32), end: (i32, i32)) { - self.cursor.read().await.send(path, start.into(), end.into()).await - } - - pub async fn recv(&self) -> Option { - self.cursor.write().await.poll().await - - } -}