fix: remove unnecessary async, re-exported ot, refactor

This commit is contained in:
əlemi 2023-07-02 23:59:04 +02:00
parent d79886e613
commit 43634acc2c
7 changed files with 29 additions and 28 deletions

View file

@ -8,8 +8,6 @@ codemp = { path = "../.." }
tracing = "0.1" tracing = "0.1"
tracing-subscriber = "0.3" tracing-subscriber = "0.3"
uuid = { version = "1.3.1", features = ["v4"] } uuid = { version = "1.3.1", features = ["v4"] }
operational-transform = { version = "0.6", features = ["serde"] }
tokio = { version = "1.0", features = ["macros", "rt-multi-thread", "sync", "full"] }
serde = "1" serde = "1"
serde_json = "1" serde_json = "1"
rmpv = "1" rmpv = "1"

View file

@ -4,10 +4,9 @@ use std::{net::TcpStream, sync::Mutex, collections::BTreeMap};
use codemp::operation::{OperationController, OperationFactory, OperationProcessor}; use codemp::operation::{OperationController, OperationFactory, OperationProcessor};
use codemp::client::CodempClient; use codemp::client::CodempClient;
use codemp::proto::buffer_client::BufferClient; use codemp::proto::buffer_client::BufferClient;
use codemp::tokio;
use rmpv::Value; use rmpv::Value;
use tokio::io::Stdout;
use clap::Parser; use clap::Parser;
use nvim_rs::{compat::tokio::Compat, create::tokio as create, Handler, Neovim}; use nvim_rs::{compat::tokio::Compat, create::tokio as create, Handler, Neovim};
@ -43,13 +42,13 @@ impl NeovimHandler {
#[async_trait::async_trait] #[async_trait::async_trait]
impl Handler for NeovimHandler { impl Handler for NeovimHandler {
type Writer = Compat<Stdout>; type Writer = Compat<tokio::io::Stdout>;
async fn handle_request( async fn handle_request(
&self, &self,
name: String, name: String,
args: Vec<Value>, args: Vec<Value>,
nvim: Neovim<Compat<Stdout>>, nvim: Neovim<Compat<tokio::io::Stdout>>,
) -> Result<Value, Value> { ) -> Result<Value, Value> {
debug!("processing '{}' - {:?}", name, args); debug!("processing '{}' - {:?}", name, args);
match name.as_ref() { match name.as_ref() {
@ -84,7 +83,7 @@ impl Handler for NeovimHandler {
match self.buffer_controller(&path) { match self.buffer_controller(&path) {
None => Err(Value::from("no controller for given path")), None => Err(Value::from("no controller for given path")),
Some(controller) => { Some(controller) => {
match controller.apply(controller.insert(&txt, pos as u64)).await { match controller.apply(controller.insert(&txt, pos as u64)) {
Err(e) => Err(Value::from(format!("could not send insert: {}", e))), Err(e) => Err(Value::from(format!("could not send insert: {}", e))),
Ok(_res) => Ok(Value::Nil), Ok(_res) => Ok(Value::Nil),
} }
@ -102,7 +101,7 @@ impl Handler for NeovimHandler {
match self.buffer_controller(&path) { match self.buffer_controller(&path) {
None => Err(Value::from("no controller for given path")), None => Err(Value::from("no controller for given path")),
Some(controller) => match controller.apply(controller.delete(pos, count)).await { Some(controller) => match controller.apply(controller.delete(pos, count)) {
Err(e) => Err(Value::from(format!("could not send delete: {}", e))), Err(e) => Err(Value::from(format!("could not send delete: {}", e))),
Ok(_res) => Ok(Value::Nil), Ok(_res) => Ok(Value::Nil),
} }
@ -118,7 +117,7 @@ impl Handler for NeovimHandler {
match self.buffer_controller(&path) { match self.buffer_controller(&path) {
None => Err(Value::from("no controller for given path")), None => Err(Value::from("no controller for given path")),
Some(controller) => match controller.apply(controller.replace(&txt)).await { Some(controller) => match controller.apply(controller.replace(&txt)) {
Err(e) => Err(Value::from(format!("could not send replace: {}", e))), Err(e) => Err(Value::from(format!("could not send replace: {}", e))),
Ok(_res) => Ok(Value::Nil), Ok(_res) => Ok(Value::Nil),
} }
@ -244,7 +243,7 @@ impl Handler for NeovimHandler {
&self, &self,
_name: String, _name: String,
_args: Vec<Value>, _args: Vec<Value>,
_nvim: Neovim<Compat<Stdout>>, _nvim: Neovim<Compat<tokio::io::Stdout>>,
) { ) {
warn!("notify not handled"); warn!("notify not handled");
} }

View file

@ -87,7 +87,7 @@ impl CodempClient {
Ok(None) => break warn!("stream closed for buffer {}", _path), Ok(None) => break warn!("stream closed for buffer {}", _path),
Ok(Some(x)) => match serde_json::from_str::<OperationSeq>(&x.opseq) { Ok(Some(x)) => match serde_json::from_str::<OperationSeq>(&x.opseq) {
Err(e) => error!("error deserializing opseq: {}", e), Err(e) => error!("error deserializing opseq: {}", e),
Ok(v) => match _factory.process(v).await { Ok(v) => match _factory.process(v) {
Err(e) => break error!("could not apply operation from server: {}", e), Err(e) => break error!("could not apply operation from server: {}", e),
Ok(_range) => { } // range is obtained awaiting wait(), need to pass the OpSeq itself Ok(_range) => { } // range is obtained awaiting wait(), need to pass the OpSeq itself
} }

View file

@ -38,6 +38,7 @@ pub struct Cursor {
pub end: Position, pub end: Position,
} }
#[derive(Debug)]
pub struct CursorController { pub struct CursorController {
users: Mutex<HashMap<String, Cursor>>, users: Mutex<HashMap<String, Cursor>>,
bus: broadcast::Sender<(String, Cursor)>, bus: broadcast::Sender<(String, Cursor)>,

View file

@ -6,4 +6,5 @@ pub mod errors;
pub use tonic; pub use tonic;
pub use tokio; pub use tokio;
pub use operational_transform as ot;

View file

@ -8,6 +8,7 @@ use super::{OperationFactory, OperationProcessor, op_effective_range};
use crate::errors::IgnorableError; use crate::errors::IgnorableError;
#[derive(Debug)]
pub struct OperationController { pub struct OperationController {
text: Mutex<String>, text: Mutex<String>,
queue: Mutex<VecDeque<OperationSeq>>, queue: Mutex<VecDeque<OperationSeq>>,
@ -77,12 +78,20 @@ impl OperationController {
*self.run.borrow() *self.run.borrow()
} }
async fn operation(&self, op: &OperationSeq) -> Result<Range<u64>, OTError> { fn operation(&self, op: &OperationSeq) -> Result<Range<u64>, OTError> {
let txt = self.content(); let txt = self.content();
let res = op.apply(&txt)?; let res = op.apply(&txt)?;
*self.text.lock().unwrap() = res; *self.text.lock().unwrap() = res;
Ok(op_effective_range(op)) Ok(op_effective_range(op))
} }
fn transform(&self, mut op: OperationSeq) -> Result<OperationSeq, OTError> {
let mut queue = self.queue.lock().unwrap();
for el in queue.iter_mut() {
(op, *el) = op.transform(el)?;
}
Ok(op)
}
} }
impl OperationFactory for OperationController { impl OperationFactory for OperationController {
@ -91,24 +100,18 @@ impl OperationFactory for OperationController {
} }
} }
#[tonic::async_trait]
impl OperationProcessor for OperationController { impl OperationProcessor for OperationController {
async fn apply(&self, op: OperationSeq) -> Result<Range<u64>, OTError> { fn apply(&self, op: OperationSeq) -> Result<Range<u64>, OTError> {
let span = self.operation(&op).await?; let span = self.operation(&op)?;
self.queue.lock().unwrap().push_back(op.clone()); self.queue.lock().unwrap().push_back(op.clone());
self.notifier.send(op.clone()).unwrap_or_warn("notifying of applied change"); self.notifier.send(op).unwrap_or_warn("notifying of applied change");
Ok(span) Ok(span)
} }
async fn process(&self, mut op: OperationSeq) -> Result<Range<u64>, OTError> { fn process(&self, mut op: OperationSeq) -> Result<Range<u64>, OTError> {
{ op = self.transform(op)?;
let mut queue = self.queue.lock().unwrap(); let span = self.operation(&op)?;
for el in queue.iter_mut() {
(op, *el) = op.transform(el)?;
}
}
let span = self.operation(&op).await?;
self.changed_notifier.send(span.clone()).unwrap_or_warn("notifying of changed content"); self.changed_notifier.send(span.clone()).unwrap_or_warn("notifying of changed content");
Ok(span) Ok(span)
} }

View file

@ -4,8 +4,7 @@ use operational_transform::{OperationSeq, OTError};
use crate::operation::factory::OperationFactory; use crate::operation::factory::OperationFactory;
#[tonic::async_trait]
pub trait OperationProcessor : OperationFactory { pub trait OperationProcessor : OperationFactory {
async fn apply(&self, op: OperationSeq) -> Result<Range<u64>, OTError>; fn apply(&self, op: OperationSeq) -> Result<Range<u64>, OTError>;
async fn process(&self, op: OperationSeq) -> Result<Range<u64>, OTError>; fn process(&self, op: OperationSeq) -> Result<Range<u64>, OTError>;
} }