diff --git a/client/nvim/Cargo.toml b/client/nvim/Cargo.toml index ea71678..59949b7 100644 --- a/client/nvim/Cargo.toml +++ b/client/nvim/Cargo.toml @@ -8,8 +8,6 @@ codemp = { path = "../.." } tracing = "0.1" tracing-subscriber = "0.3" uuid = { version = "1.3.1", features = ["v4"] } -operational-transform = { version = "0.6", features = ["serde"] } -tokio = { version = "1.0", features = ["macros", "rt-multi-thread", "sync", "full"] } serde = "1" serde_json = "1" rmpv = "1" diff --git a/client/nvim/src/main.rs b/client/nvim/src/main.rs index 2acc73b..bcae4a6 100644 --- a/client/nvim/src/main.rs +++ b/client/nvim/src/main.rs @@ -4,10 +4,9 @@ use std::{net::TcpStream, sync::Mutex, collections::BTreeMap}; use codemp::operation::{OperationController, OperationFactory, OperationProcessor}; use codemp::client::CodempClient; use codemp::proto::buffer_client::BufferClient; +use codemp::tokio; + use rmpv::Value; - - -use tokio::io::Stdout; use clap::Parser; use nvim_rs::{compat::tokio::Compat, create::tokio as create, Handler, Neovim}; @@ -43,13 +42,13 @@ impl NeovimHandler { #[async_trait::async_trait] impl Handler for NeovimHandler { - type Writer = Compat; + type Writer = Compat; async fn handle_request( &self, name: String, args: Vec, - nvim: Neovim>, + nvim: Neovim>, ) -> Result { debug!("processing '{}' - {:?}", name, args); match name.as_ref() { @@ -84,7 +83,7 @@ impl Handler for NeovimHandler { match self.buffer_controller(&path) { None => Err(Value::from("no controller for given path")), Some(controller) => { - match controller.apply(controller.insert(&txt, pos as u64)).await { + match controller.apply(controller.insert(&txt, pos as u64)) { Err(e) => Err(Value::from(format!("could not send insert: {}", e))), Ok(_res) => Ok(Value::Nil), } @@ -102,7 +101,7 @@ impl Handler for NeovimHandler { match self.buffer_controller(&path) { None => Err(Value::from("no controller for given path")), - Some(controller) => match controller.apply(controller.delete(pos, count)).await { + Some(controller) => match controller.apply(controller.delete(pos, count)) { Err(e) => Err(Value::from(format!("could not send delete: {}", e))), Ok(_res) => Ok(Value::Nil), } @@ -118,7 +117,7 @@ impl Handler for NeovimHandler { match self.buffer_controller(&path) { None => Err(Value::from("no controller for given path")), - Some(controller) => match controller.apply(controller.replace(&txt)).await { + Some(controller) => match controller.apply(controller.replace(&txt)) { Err(e) => Err(Value::from(format!("could not send replace: {}", e))), Ok(_res) => Ok(Value::Nil), } @@ -244,7 +243,7 @@ impl Handler for NeovimHandler { &self, _name: String, _args: Vec, - _nvim: Neovim>, + _nvim: Neovim>, ) { warn!("notify not handled"); } diff --git a/src/client.rs b/src/client.rs index a4ad8eb..f8d68d8 100644 --- a/src/client.rs +++ b/src/client.rs @@ -87,7 +87,7 @@ impl CodempClient { Ok(None) => break warn!("stream closed for buffer {}", _path), Ok(Some(x)) => match serde_json::from_str::(&x.opseq) { Err(e) => error!("error deserializing opseq: {}", e), - Ok(v) => match _factory.process(v).await { + Ok(v) => match _factory.process(v) { Err(e) => break error!("could not apply operation from server: {}", e), Ok(_range) => { } // range is obtained awaiting wait(), need to pass the OpSeq itself } diff --git a/src/cursor.rs b/src/cursor.rs index 97ca69a..a7b8bdc 100644 --- a/src/cursor.rs +++ b/src/cursor.rs @@ -38,6 +38,7 @@ pub struct Cursor { pub end: Position, } +#[derive(Debug)] pub struct CursorController { users: Mutex>, bus: broadcast::Sender<(String, Cursor)>, diff --git a/src/lib.rs b/src/lib.rs index 4e12e2b..bc248e2 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -6,4 +6,5 @@ pub mod errors; pub use tonic; pub use tokio; +pub use operational_transform as ot; diff --git a/src/operation/controller.rs b/src/operation/controller.rs index d2764c3..c6f11e5 100644 --- a/src/operation/controller.rs +++ b/src/operation/controller.rs @@ -8,6 +8,7 @@ use super::{OperationFactory, OperationProcessor, op_effective_range}; use crate::errors::IgnorableError; +#[derive(Debug)] pub struct OperationController { text: Mutex, queue: Mutex>, @@ -77,12 +78,20 @@ impl OperationController { *self.run.borrow() } - async fn operation(&self, op: &OperationSeq) -> Result, OTError> { + fn operation(&self, op: &OperationSeq) -> Result, OTError> { let txt = self.content(); let res = op.apply(&txt)?; *self.text.lock().unwrap() = res; Ok(op_effective_range(op)) } + + fn transform(&self, mut op: OperationSeq) -> Result { + let mut queue = self.queue.lock().unwrap(); + for el in queue.iter_mut() { + (op, *el) = op.transform(el)?; + } + Ok(op) + } } impl OperationFactory for OperationController { @@ -91,24 +100,18 @@ impl OperationFactory for OperationController { } } -#[tonic::async_trait] impl OperationProcessor for OperationController { - async fn apply(&self, op: OperationSeq) -> Result, OTError> { - let span = self.operation(&op).await?; + fn apply(&self, op: OperationSeq) -> Result, OTError> { + let span = self.operation(&op)?; self.queue.lock().unwrap().push_back(op.clone()); - self.notifier.send(op.clone()).unwrap_or_warn("notifying of applied change"); + self.notifier.send(op).unwrap_or_warn("notifying of applied change"); Ok(span) } - async fn process(&self, mut op: OperationSeq) -> Result, OTError> { - { - let mut queue = self.queue.lock().unwrap(); - for el in queue.iter_mut() { - (op, *el) = op.transform(el)?; - } - } - let span = self.operation(&op).await?; + fn process(&self, mut op: OperationSeq) -> Result, OTError> { + op = self.transform(op)?; + let span = self.operation(&op)?; self.changed_notifier.send(span.clone()).unwrap_or_warn("notifying of changed content"); Ok(span) } diff --git a/src/operation/processor.rs b/src/operation/processor.rs index a37dfa2..cda36b9 100644 --- a/src/operation/processor.rs +++ b/src/operation/processor.rs @@ -4,8 +4,7 @@ use operational_transform::{OperationSeq, OTError}; use crate::operation::factory::OperationFactory; -#[tonic::async_trait] pub trait OperationProcessor : OperationFactory { - async fn apply(&self, op: OperationSeq) -> Result, OTError>; - async fn process(&self, op: OperationSeq) -> Result, OTError>; + fn apply(&self, op: OperationSeq) -> Result, OTError>; + fn process(&self, op: OperationSeq) -> Result, OTError>; }