diff --git a/src/client/nvim/codemp.lua b/src/client/nvim/codemp.lua index 3be8f8e..55e0a1d 100644 --- a/src/client/nvim/codemp.lua +++ b/src/client/nvim/codemp.lua @@ -49,13 +49,13 @@ local function hook_callbacks(path, buffer) { callback = function(args) local cursor = vim.api.nvim_win_get_cursor(0) + pcall(M.cursor, path, cursor[1], cursor[2]) -- TODO log errors if cursor[1] == last_line then return end last_line = cursor[1] local lines = vim.api.nvim_buf_get_lines(args.buf, 0, -1, false) pcall(M.replace, path, vim.fn.join(lines, "\n")) -- TODO log errors - pcall(M.cursor, path, cursor[1], cursor[2]) -- TODO log errors end, buffer = buffer, group = codemp_autocmds, @@ -111,6 +111,7 @@ vim.api.nvim_create_user_command('Connect', -- print(vim.fn.join(data, "\n")) end, stderr_buffered = false, + env = { RUST_BACKTRACE = 1 } } ) if M.jobid <= 0 then diff --git a/src/client/nvim/main.rs b/src/client/nvim/main.rs index 0b6662e..f7e9125 100644 --- a/src/client/nvim/main.rs +++ b/src/client/nvim/main.rs @@ -1,6 +1,8 @@ -use std::{net::TcpStream, sync::Mutex}; +use std::sync::Arc; +use std::{net::TcpStream, sync::Mutex, collections::BTreeMap}; -use codemp::client::CodempClient; +use codemp::operation::{OperationController, OperationFactory}; +use codemp::{client::CodempClient, operation::OperationProcessor}; use codemp::proto::buffer_client::BufferClient; use rmpv::Value; @@ -9,12 +11,12 @@ use tokio::io::Stdout; use clap::Parser; use nvim_rs::{compat::tokio::Compat, create::tokio as create, Handler, Neovim}; -use tonic::async_trait; use tracing::{error, warn, debug, info}; #[derive(Clone)] struct NeovimHandler { client: CodempClient, + factories: Arc>>>, } fn nullable_optional_str(args: &Vec, index: usize) -> Option { @@ -33,7 +35,13 @@ fn default_zero_number(args: &Vec, index: usize) -> i64 { nullable_optional_number(args, index).unwrap_or(0) } -#[async_trait] +impl NeovimHandler { + fn buffer_controller(&self, path: &String) -> Option> { + Some(self.factories.lock().unwrap().get(path)?.clone()) + } +} + +#[tonic::async_trait] impl Handler for NeovimHandler { type Writer = Compat; @@ -69,16 +77,18 @@ impl Handler for NeovimHandler { } let path = default_empty_str(&args, 0); let txt = default_empty_str(&args, 1); - let pos = default_zero_number(&args, 2) as u64; - let mut c = self.client.clone(); - match c.insert(path, txt, pos).await { - Ok(res) => { - match res { - true => Ok(Value::Nil), - false => Err(Value::from("rejected")), + let mut pos = default_zero_number(&args, 2) as i64; + + if pos <= 0 { pos = 0 } // TODO wtf vim?? + + match self.buffer_controller(&path) { + None => Err(Value::from("no controller for given path")), + Some(controller) => { + match controller.apply(controller.insert(&txt, pos as u64)).await { + Err(e) => Err(Value::from(format!("could not send insert: {}", e))), + Ok(_res) => Ok(Value::Nil), } - }, - Err(e) => Err(Value::from(format!("could not send insert: {}", e))), + } } }, @@ -90,13 +100,12 @@ impl Handler for NeovimHandler { let pos = default_zero_number(&args, 1) as u64; let count = default_zero_number(&args, 2) as u64; - let mut c = self.client.clone(); - match c.delete(path, pos, count).await { - Ok(res) => match res { - true => Ok(Value::Nil), - false => Err(Value::from("rejected")), - }, - Err(e) => Err(Value::from(format!("could not send insert: {}", e))), + match self.buffer_controller(&path) { + None => Err(Value::from("no controller for given path")), + Some(controller) => match controller.apply(controller.delete(pos, count)).await { + Err(e) => Err(Value::from(format!("could not send delete: {}", e))), + Ok(_res) => Ok(Value::Nil), + } } }, @@ -107,13 +116,12 @@ impl Handler for NeovimHandler { let path = default_empty_str(&args, 0); let txt = default_empty_str(&args, 1); - let mut c = self.client.clone(); - match c.replace(path, txt).await { - Ok(res) => match res { - true => Ok(Value::Nil), - false => Err(Value::from("rejected")), - }, - Err(e) => Err(Value::from(format!("could not send replace: {}", e))), + match self.buffer_controller(&path) { + None => Err(Value::from("no controller for given path")), + Some(controller) => match controller.apply(controller.replace(&txt)).await { + Err(e) => Err(Value::from(format!("could not send replace: {}", e))), + Ok(_res) => Ok(Value::Nil), + } } }, @@ -129,63 +137,72 @@ impl Handler for NeovimHandler { let mut c = self.client.clone(); - let buf = buffer.clone(); - match c.attach(path, move |x| { - let lines : Vec = x.split("\n").map(|x| x.to_string()).collect(); - let b = buf.clone(); - tokio::spawn(async move { - if let Err(e) = b.set_lines(0, -1, false, lines).await { - error!("could not update buffer: {}", e); - } - }); - }).await { + match c.attach(path.clone()).await { Err(e) => Err(Value::from(format!("could not attach to stream: {}", e))), - Ok(content) => { - let lines : Vec = content.split("\n").map(|x| x.to_string()).collect(); - if let Err(e) = buffer.set_lines(0, -1, false, lines).await { - error!("could not update buffer: {}", e); + Ok(controller) => { + let _controller = controller.clone(); + let lines : Vec = _controller.content().split("\n").map(|x| x.to_string()).collect(); + match buffer.set_lines(0, -1, false, lines).await { + Err(e) => Err(Value::from(format!("could not sync buffer: {}", e))), + Ok(()) => { + tokio::spawn(async move { + loop { + _controller.wait().await; + let lines : Vec = _controller.content().split("\n").map(|x| x.to_string()).collect(); + if let Err(e) = buffer.set_lines(0, -1, false, lines).await { + error!("could not update buffer: {}", e); + } + } + }); + self.factories.lock().unwrap().insert(path, controller); + Ok(Value::Nil) + } } - Ok(Value::Nil) }, } }, "detach" => { - if args.len() < 1 { - return Err(Value::from("no path given")); - } - let path = default_empty_str(&args, 0); - let mut c = self.client.clone(); - c.detach(path); - Ok(Value::Nil) + Err(Value::from("unimplemented! try with :q!")) + // if args.len() < 1 { + // return Err(Value::from("no path given")); + // } + // let path = default_empty_str(&args, 0); + // let mut c = self.client.clone(); + // c.detach(path); + // Ok(Value::Nil) }, "listen" => { - if args.len() < 1 { - return Err(Value::from("no path given")); - } - let path = default_empty_str(&args, 0); - let mut c = self.client.clone(); - let ns = nvim.create_namespace("Cursor").await .map_err(|e| Value::from(format!("could not create namespace: {}", e)))?; let buf = nvim.get_current_buf().await .map_err(|e| Value::from(format!("could not get current buf: {}", e)))?; - match c.listen(path, move |cur| { - let _b = buf.clone(); - tokio::spawn(async move { - if let Err(e) = _b.clear_namespace(ns, 0, -1).await { - error!("could not clear previous cursor highlight: {}", e); - } - if let Err(e) = _b.add_highlight(ns, "ErrorMsg", cur.row-1, cur.col, cur.col+1).await { - error!("could not create highlight for cursor: {}", e); - } - }); - }).await { - Ok(()) => Ok(Value::Nil), + let mut c = self.client.clone(); + match c.listen().await { Err(e) => Err(Value::from(format!("could not listen cursors: {}", e))), + Ok(cursor) => { + let mut sub = cursor.sub(); + debug!("spawning cursor processing worker"); + tokio::spawn(async move { + loop { + match sub.recv().await { + Err(e) => return error!("error receiving cursor update from controller: {}", e), + Ok((_usr, cur)) => { + if let Err(e) = buf.clear_namespace(ns, 0, -1).await { + error!("could not clear previous cursor highlight: {}", e); + } + if let Err(e) = buf.add_highlight(ns, "ErrorMsg", cur.start.row-1, cur.start.col, cur.start.col+1).await { + error!("could not create highlight for cursor: {}", e); + } + } + } + } + }); + Ok(Value::Nil) + }, } }, @@ -199,8 +216,8 @@ impl Handler for NeovimHandler { let mut c = self.client.clone(); match c.cursor(path, row, col).await { - Ok(()) => Ok(Value::Nil), - Err(e) => Err(Value::from(format!("could not send cursor update: {}", e))), + Ok(_) => Ok(Value::Nil), + Err(e) => Err(Value:: from(format!("could not update cursor: {}", e))), } }, @@ -259,6 +276,7 @@ async fn main() -> Result<(), Box> { let handler: NeovimHandler = NeovimHandler { client: client.into(), + factories: Arc::new(Mutex::new(BTreeMap::new())), }; let (_nvim, io_handler) = create::new_parent(handler).await; diff --git a/src/lib/client.rs b/src/lib/client.rs index 6b9d179..f16ce15 100644 --- a/src/lib/client.rs +++ b/src/lib/client.rs @@ -1,194 +1,138 @@ -/// TODO better name for this file +use std::sync::Arc; -use std::{sync::{Arc, RwLock}, collections::BTreeMap}; -use tracing::{error, warn, info}; +use operational_transform::OperationSeq; +use tonic::{transport::Channel, Status}; +use tracing::{error, warn}; use uuid::Uuid; use crate::{ - opfactory::AsyncFactory, - proto::{buffer_client::BufferClient, BufferPayload, OperationRequest, RawOp, CursorMov}, - tonic::{transport::Channel, Status, Streaming}, + cursor::{CursorController, CursorStorage}, + operation::{OperationController, OperationProcessor}, + proto::{buffer_client::BufferClient, BufferPayload, OperationRequest, CursorMov}, }; -pub type FactoryStore = Arc>>>; - -impl From::> for CodempClient { - fn from(x: BufferClient) -> CodempClient { - CodempClient { - id: Uuid::new_v4(), - client:x, - factories: Arc::new(RwLock::new(BTreeMap::new())), - } - } -} - #[derive(Clone)] pub struct CodempClient { - id: Uuid, + id: String, client: BufferClient, - factories: FactoryStore, +} + +impl From::> for CodempClient { + fn from(value: BufferClient) -> Self { + CodempClient { id: Uuid::new_v4().to_string(), client: value } + } } impl CodempClient { - fn get_factory(&self, path: &String) -> Result, Status> { - match self.factories.read().unwrap().get(path) { - Some(f) => Ok(f.clone()), - None => Err(Status::not_found("no active buffer for given path")), - } - } - - pub fn add_factory(&self, path: String, factory:Arc) { - self.factories.write().unwrap().insert(path, factory); + pub fn new(id: String, client: BufferClient) -> Self { + CodempClient { id, client } } pub async fn create(&mut self, path: String, content: Option) -> Result { let req = BufferPayload { - path: path.clone(), - content: content.clone(), - user: self.id.to_string(), + path, content, + user: self.id.clone(), }; - let res = self.client.create(req).await?.into_inner(); + let res = self.client.create(req).await?; - Ok(res.accepted) + Ok(res.into_inner().accepted) } - pub async fn insert(&mut self, path: String, txt: String, pos: u64) -> Result { - let factory = self.get_factory(&path)?; - match factory.insert(txt, pos).await { - Err(e) => Err(Status::internal(format!("invalid operation: {}", e))), - Ok(op) => { - let req = OperationRequest { - path, - hash: "".into(), - user: self.id.to_string(), - opseq: serde_json::to_string(&op) - .map_err(|_| Status::invalid_argument("could not serialize opseq"))?, - }; - let res = self.client.edit(req).await?.into_inner(); - if let Err(e) = factory.ack(op.clone()).await { - error!("could not ack op '{:?}' : {}", op, e); - } - Ok(res.accepted) - }, - } - } - - pub async fn delete(&mut self, path: String, pos: u64, count: u64) -> Result { - let factory = self.get_factory(&path)?; - match factory.delete(pos, count).await { - Err(e) => Err(Status::internal(format!("invalid operation: {}", e))), - Ok(op) => { - let req = OperationRequest { - path, - hash: "".into(), - user: self.id.to_string(), - opseq: serde_json::to_string(&op) - .map_err(|_| Status::invalid_argument("could not serialize opseq"))?, - }; - let res = self.client.edit(req).await?.into_inner(); - if let Err(e) = factory.ack(op.clone()).await { - error!("could not ack op '{:?}' : {}", op, e); - } - Ok(res.accepted) - }, - } - } - - pub async fn replace(&mut self, path: String, txt: String) -> Result { - let factory = self.get_factory(&path)?; - match factory.replace(txt).await { - Err(e) => Err(Status::internal(format!("invalid operation: {}", e))), - Ok(op) => { - let req = OperationRequest { - path, - hash: "".into(), - user: self.id.to_string(), - opseq: serde_json::to_string(&op) - .map_err(|_| Status::invalid_argument("could not serialize opseq"))?, - }; - let res = self.client.edit(req).await?.into_inner(); - if let Err(e) = factory.ack(op.clone()).await { - error!("could not ack op '{:?}' : {}", op, e); - } - Ok(res.accepted) - }, - } - } - - pub async fn cursor(&mut self, path: String, row: i64, col: i64) -> Result<(), Status> { - let req = CursorMov { - path, user: self.id.to_string(), - row, col, - }; - let _res = self.client.cursor(req).await?.into_inner(); - Ok(()) - } - - pub async fn listen(&mut self, path: String, callback: F) -> Result<(), Status> - where F : Fn(CursorMov) -> () + Send + 'static { + pub async fn listen(&mut self) -> Result, Status> { let req = BufferPayload { - path, + path: "".into(), content: None, - user: self.id.to_string(), + user: self.id.clone(), }; + let mut stream = self.client.listen(req).await?.into_inner(); + + let controller = Arc::new(CursorController::new()); + + let _controller = controller.clone(); tokio::spawn(async move { - // TODO catch some errors - while let Ok(Some(x)) = stream.message().await { - callback(x) + loop { + match stream.message().await { + Err(e) => break error!("error receiving cursor: {}", e), + Ok(None) => break, + Ok(Some(x)) => { _controller.update(x); }, + } } }); - Ok(()) + + Ok(controller) } - pub async fn attach(&mut self, path: String, callback: F) -> Result - where F : Fn(String) -> () + Send + 'static { - let content = self.sync(path.clone()).await?; - let factory = Arc::new(AsyncFactory::new(Some(content.clone()))); - self.add_factory(path.clone(), factory.clone()); + pub async fn attach(&mut self, path: String) -> Result, Status> { let req = BufferPayload { - path, + path: path.clone(), content: None, - user: self.id.to_string(), + user: self.id.clone(), }; - let stream = self.client.attach(req).await?.into_inner(); - tokio::spawn(async move { Self::worker(stream, factory, callback).await } ); - Ok(content) - } - pub fn detach(&mut self, path: String) { - self.factories.write().unwrap().remove(&path); - info!("|| detached from buffer"); - } + let content = self.client.sync(req.clone()) + .await? + .into_inner() + .content; - async fn sync(&mut self, path: String) -> Result { - let res = self.client.sync( - BufferPayload { - path, content: None, user: self.id.to_string(), + let mut stream = self.client.attach(req).await?.into_inner(); + + let factory = Arc::new(OperationController::new(content.unwrap_or("".into()))); + + let _factory = factory.clone(); + tokio::spawn(async move { + loop { + match stream.message().await { + Err(e) => break error!("error receiving update: {}", e), + Ok(None) => break, // clean exit + Ok(Some(x)) => match serde_json::from_str::(&x.opseq) { + Err(e) => break error!("error deserializing opseq: {}", e), + Ok(v) => match _factory.process(v).await { + Err(e) => break error!("could not apply operation from server: {}", e), + Ok(_txt) => { + // send event containing where the change happened + } + } + }, + } } - ).await?; - Ok(res.into_inner().content.unwrap_or("".into())) + }); + + let mut _client = self.client.clone(); + let _uid = self.id.clone(); + let _factory = factory.clone(); + let _path = path.clone(); + tokio::spawn(async move { + while let Some(op) = _factory.poll().await { + let req = OperationRequest { + hash: "".into(), + opseq: serde_json::to_string(&op).unwrap(), + path: _path.clone(), + user: _uid.clone(), + }; + match _client.edit(req).await { + Ok(res) => match res.into_inner().accepted { + true => { _factory.ack().await; }, + false => { + warn!("server rejected operation, retrying in 1s"); + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + } + }, + Err(e) => error!("could not send edit: {}", e), + } + } + }); + + Ok(factory) } - async fn worker(mut stream: Streaming, factory: Arc, callback: F) - where F : Fn(String) -> () { - info!("|> buffer worker started"); - loop { - match stream.message().await { - Err(e) => break error!("error receiving change: {}", e), - Ok(v) => match v { - None => break warn!("stream closed"), - Some(operation) => match serde_json::from_str(&operation.opseq) { - Err(e) => break error!("could not deserialize opseq: {}", e), - Ok(op) => match factory.process(op).await { - Err(e) => break error!("desynched: {}", e), - Ok(x) => callback(x), - }, - } - }, - } - } - info!("[] buffer worker stopped"); + pub async fn cursor(&mut self, path: String, row: i64, col: i64) -> Result { + let req = CursorMov { + path, row, col, + user: self.id.clone(), + }; + let res = self.client.cursor(req).await?.into_inner(); + Ok(res.accepted) } } diff --git a/src/lib/cursor.rs b/src/lib/cursor.rs new file mode 100644 index 0000000..9c3a2b4 --- /dev/null +++ b/src/lib/cursor.rs @@ -0,0 +1,87 @@ +use std::{collections::HashMap, sync::Mutex}; + +use tokio::sync::broadcast; +use tracing::{info, error, debug, warn}; + +use crate::proto::CursorMov; + +/// Note that this differs from any hashmap in its put method: no &mut! +pub trait CursorStorage { + fn get(&self, id: &String) -> Option; + fn put(&self, id: String, val: Cursor); + + fn update(&self, event: CursorMov) -> Option { + let mut cur = self.get(&event.user)?; + cur.buffer = event.path; + cur.start = (event.row, event.col).into(); + self.put(event.user, cur.clone()); + Some(cur) + } +} + +#[derive(Copy, Clone, Debug, Default)] +pub struct Position { + pub row: i64, + pub col: i64, +} + +impl From::<(i64, i64)> for Position { + fn from((row, col): (i64, i64)) -> Self { + Position { row, col } + } +} + +#[derive(Clone, Debug, Default)] +pub struct Cursor { + pub buffer: String, + pub start: Position, + pub end: Position, +} + +pub struct CursorController { + users: Mutex>, + bus: broadcast::Sender<(String, Cursor)>, + _bus_keepalive: Mutex>, +} + +impl CursorController { + pub fn new() -> Self { + let (tx, _rx) = broadcast::channel(64); + CursorController { + users: Mutex::new(HashMap::new()), + bus: tx, + _bus_keepalive: Mutex::new(_rx), + } + } + + pub fn sub(&self) -> broadcast::Receiver<(String, Cursor)> { + self.bus.subscribe() + } +} + +impl CursorStorage for CursorController { + fn update(&self, event: CursorMov) -> Option { + debug!("processing cursor event: {:?}", event); + let mut cur = self.get(&event.user).unwrap_or(Cursor::default()); + cur.buffer = event.path; + cur.start = (event.row, event.col).into(); + cur.end = (event.row, event.col).into(); + self.put(event.user.clone(), cur.clone()); + if let Err(e) = self.bus.send((event.user, cur.clone())) { + error!("could not broadcast cursor event: {}", e); + } else { // this is because once there are no receivers, nothing else can be sent + if let Err(e) = self._bus_keepalive.lock().unwrap().try_recv() { + warn!("could not consume event: {}", e); + } + } + Some(cur) + } + + fn get(&self, id: &String) -> Option { + Some(self.users.lock().unwrap().get(id)?.clone()) + } + + fn put(&self, id: String, val: Cursor) { + self.users.lock().unwrap().insert(id, val); + } +} diff --git a/src/lib/lib.rs b/src/lib/lib.rs index 82b6ce5..1ae3d88 100644 --- a/src/lib/lib.rs +++ b/src/lib/lib.rs @@ -1,6 +1,7 @@ pub mod proto; -pub mod opfactory; pub mod client; +pub mod operation; +pub mod cursor; pub use tonic; pub use tokio; diff --git a/src/lib/operation/factory.rs b/src/lib/operation/factory.rs new file mode 100644 index 0000000..b8fa014 --- /dev/null +++ b/src/lib/operation/factory.rs @@ -0,0 +1,53 @@ +use operational_transform::OperationSeq; +use similar::{TextDiff, ChangeTag}; + +pub trait OperationFactory { + fn content(&self) -> String; + + fn replace(&self, txt: &str) -> OperationSeq { + let mut out = OperationSeq::default(); + let content = self.content(); + if content == txt { + return out; // TODO this won't work, should we return a noop instead? + } + + let diff = TextDiff::from_chars(content.as_str(), txt); + + for change in diff.iter_all_changes() { + match change.tag() { + ChangeTag::Equal => out.retain(1), + ChangeTag::Delete => out.delete(1), + ChangeTag::Insert => out.insert(change.value()), + } + } + + out + } + + fn insert(&self, txt: &str, pos: u64) -> OperationSeq { + let mut out = OperationSeq::default(); + let total = self.content().len() as u64; + out.retain(pos); + out.insert(txt); + out.retain(total - pos); + out + } + + fn delete(&self, pos: u64, count: u64) -> OperationSeq { + let mut out = OperationSeq::default(); + let len = self.content().len() as u64; + out.retain(pos - count); + out.delete(count); + out.retain(len - pos); + out + } + + fn cancel(&self, pos: u64, count: u64) -> OperationSeq { + let mut out = OperationSeq::default(); + let len = self.content().len() as u64; + out.retain(pos); + out.delete(count); + out.retain(len - (pos+count)); + out + } +} diff --git a/src/lib/operation/mod.rs b/src/lib/operation/mod.rs new file mode 100644 index 0000000..8796b59 --- /dev/null +++ b/src/lib/operation/mod.rs @@ -0,0 +1,5 @@ +pub mod factory; +pub mod processor; + +pub use processor::{OperationController, OperationProcessor}; +pub use factory::OperationFactory; diff --git a/src/lib/operation/processor.rs b/src/lib/operation/processor.rs new file mode 100644 index 0000000..a585b80 --- /dev/null +++ b/src/lib/operation/processor.rs @@ -0,0 +1,93 @@ +use std::{sync::Mutex, collections::VecDeque}; + +use operational_transform::{OperationSeq, OTError}; +use tokio::sync::watch; + +use crate::operation::factory::OperationFactory; + + +#[tonic::async_trait] +pub trait OperationProcessor : OperationFactory { + async fn apply(&self, op: OperationSeq) -> Result; + async fn process(&self, op: OperationSeq) -> Result; + + async fn poll(&self) -> Option; + async fn ack(&self) -> Option; + async fn wait(&self); +} + + +pub struct OperationController { + text: Mutex, + queue: Mutex>, + last: Mutex>, + notifier: watch::Sender, + changed: Mutex>, + changed_notifier: watch::Sender<()>, +} + +impl OperationController { + pub fn new(content: String) -> Self { + let (tx, rx) = watch::channel(OperationSeq::default()); + let (done, wait) = watch::channel(()); + OperationController { + text: Mutex::new(content), + queue: Mutex::new(VecDeque::new()), + last: Mutex::new(rx), + notifier: tx, + changed: Mutex::new(wait), + changed_notifier: done, + } + } +} + +impl OperationFactory for OperationController { + fn content(&self) -> String { + self.text.lock().unwrap().clone() + } +} + +#[tonic::async_trait] +impl OperationProcessor for OperationController { + async fn apply(&self, op: OperationSeq) -> Result { + let txt = self.content(); + let res = op.apply(&txt)?; + *self.text.lock().unwrap() = res.clone(); + self.queue.lock().unwrap().push_back(op.clone()); + self.notifier.send(op); + Ok(res) + } + + async fn wait(&self) { + let mut blocker = self.changed.lock().unwrap().clone(); + blocker.changed().await; + blocker.changed().await; + } + + async fn process(&self, mut op: OperationSeq) -> Result { + let mut queue = self.queue.lock().unwrap(); + for el in queue.iter_mut() { + (op, *el) = op.transform(el)?; + } + let txt = self.content(); + let res = op.apply(&txt)?; + *self.text.lock().unwrap() = res.clone(); + self.changed_notifier.send(()); + Ok(res) + } + + async fn poll(&self) -> Option { + let len = self.queue.lock().unwrap().len(); + if len <= 0 { + let mut recv = self.last.lock().unwrap().clone(); + // TODO this is not 100% reliable + recv.changed().await; // acknowledge current state + recv.changed().await; // wait for a change in state + } + Some(self.queue.lock().unwrap().get(0)?.clone()) + } + + async fn ack(&self) -> Option { + self.queue.lock().unwrap().pop_front() + } +} diff --git a/src/lib/opfactory.rs b/src/lib/opfactory.rs deleted file mode 100644 index 320bd84..0000000 --- a/src/lib/opfactory.rs +++ /dev/null @@ -1,234 +0,0 @@ -use std::collections::VecDeque; - -use operational_transform::{OperationSeq, OTError}; -use similar::TextDiff; -use tokio::sync::{mpsc, watch, oneshot}; -use tracing::{error, warn}; - -#[derive(Clone)] -pub struct OperationFactory { - content: String, - queue: VecDeque, -} - -impl OperationFactory { - pub fn new(init: Option) -> Self { - OperationFactory { - content: init.unwrap_or(String::new()), - queue: VecDeque::new(), - } - } - - fn apply(&mut self, op: OperationSeq) -> Result { - if op.is_noop() { return Err(OTError) } - self.content = op.apply(&self.content)?; - self.queue.push_back(op.clone()); - Ok(op) - } - - // TODO remove the need for this - pub fn content(&self) -> String { - self.content.clone() - } - - pub fn check(&self, txt: &str) -> bool { - self.content == txt - } - - pub fn replace(&mut self, txt: &str) -> Result { - let mut out = OperationSeq::default(); - if self.content == txt { // TODO throw and error rather than wasting everyone's resources - return Err(OTError); // nothing to do - } - - let diff = TextDiff::from_chars(self.content.as_str(), txt); - - for change in diff.iter_all_changes() { - match change.tag() { - similar::ChangeTag::Equal => out.retain(1), - similar::ChangeTag::Delete => out.delete(1), - similar::ChangeTag::Insert => out.insert(change.value()), - } - } - - self.content = out.apply(&self.content)?; - Ok(out) - } - - pub fn insert(&mut self, txt: &str, pos: u64) -> Result { - let mut out = OperationSeq::default(); - let total = self.content.len() as u64; - out.retain(pos); - out.insert(txt); - out.retain(total - pos); - Ok(self.apply(out)?) - } - - pub fn delete(&mut self, pos: u64, count: u64) -> Result { - let mut out = OperationSeq::default(); - let len = self.content.len() as u64; - out.retain(pos - count); - out.delete(count); - out.retain(len - pos); - Ok(self.apply(out)?) - } - - pub fn cancel(&mut self, pos: u64, count: u64) -> Result { - let mut out = OperationSeq::default(); - let len = self.content.len() as u64; - out.retain(pos); - out.delete(count); - out.retain(len - (pos+count)); - Ok(self.apply(out)?) - } - - pub fn ack(&mut self, op: OperationSeq) -> Result<(), OTError> { // TODO use a different error? - // TODO is manually iterating from behind worth the manual search boilerplate? - for (i, o) in self.queue.iter().enumerate().rev() { - if o == &op { - self.queue.remove(i); - return Ok(()); - } - } - warn!("could not ack op {:?} from {:?}", op, self.queue); - Err(OTError) - } - - pub fn process(&mut self, mut op: OperationSeq) -> Result { - for o in self.queue.iter_mut() { - (op, *o) = op.transform(o)?; - } - self.content = op.apply(&self.content)?; - Ok(self.content.clone()) - } -} - - -pub struct AsyncFactory { - run: watch::Sender, - ops: mpsc::Sender, - #[allow(unused)] // TODO is this necessary? - content: watch::Receiver, -} - -impl Drop for AsyncFactory { - fn drop(&mut self) { - self.run.send(false).unwrap_or(()); - } -} - -impl AsyncFactory { - pub fn new(init: Option) -> Self { - let (run_tx, run_rx) = watch::channel(true); - let (ops_tx, ops_rx) = mpsc::channel(64); // TODO hardcoded size - let (txt_tx, txt_rx) = watch::channel("".into()); - - let worker = AsyncFactoryWorker { - factory: OperationFactory::new(init), - ops: ops_rx, - run: run_rx, - content: txt_tx, - }; - - tokio::spawn(async move { worker.work().await }); - - AsyncFactory { run: run_tx, ops: ops_tx, content: txt_rx } - } - - pub async fn insert(&self, txt: String, pos: u64) -> Result { - let (tx, rx) = oneshot::channel(); - self.ops.send(OpMsg::Exec(OpWrapper::Insert(txt, pos), tx)).await.map_err(|_| OTError)?; - rx.await.map_err(|_| OTError)? - } - - pub async fn delete(&self, pos: u64, count: u64) -> Result { - let (tx, rx) = oneshot::channel(); - self.ops.send(OpMsg::Exec(OpWrapper::Delete(pos, count), tx)).await.map_err(|_| OTError)?; - rx.await.map_err(|_| OTError)? - } - - pub async fn cancel(&self, pos: u64, count: u64) -> Result { - let (tx, rx) = oneshot::channel(); - self.ops.send(OpMsg::Exec(OpWrapper::Cancel(pos, count), tx)).await.map_err(|_| OTError)?; - rx.await.map_err(|_| OTError)? - } - - pub async fn replace(&self, txt: String) -> Result { - let (tx, rx) = oneshot::channel(); - self.ops.send(OpMsg::Exec(OpWrapper::Replace(txt), tx)).await.map_err(|_| OTError)?; - rx.await.map_err(|_| OTError)? - } - - pub async fn process(&self, opseq: OperationSeq) -> Result { - let (tx, rx) = oneshot::channel(); - self.ops.send(OpMsg::Process(opseq, tx)).await.map_err(|_| OTError)?; - rx.await.map_err(|_| OTError)? - } - - pub async fn ack(&self, opseq: OperationSeq) -> Result<(), OTError> { - let (tx, rx) = oneshot::channel(); - self.ops.send(OpMsg::Ack(opseq, tx)).await.map_err(|_| OTError)?; - rx.await.map_err(|_| OTError)? - } -} - - -#[derive(Debug)] -enum OpMsg { - Exec(OpWrapper, oneshot::Sender>), - Process(OperationSeq, oneshot::Sender>), - Ack(OperationSeq, oneshot::Sender>), -} - -#[derive(Debug)] -enum OpWrapper { - Insert(String, u64), - Delete(u64, u64), - Cancel(u64, u64), - Replace(String), -} - -struct AsyncFactoryWorker { - factory: OperationFactory, - ops: mpsc::Receiver, - run: watch::Receiver, - content: watch::Sender -} - -impl AsyncFactoryWorker { - async fn work(mut self) { - while *self.run.borrow() { - tokio::select! { // periodically check run so that we stop cleanly - - recv = self.ops.recv() => { - match recv { - Some(msg) => { - match msg { - OpMsg::Exec(op, tx) => tx.send(self.exec(op)).unwrap_or(()), - OpMsg::Process(opseq, tx) => tx.send(self.factory.process(opseq)).unwrap_or(()), - OpMsg::Ack(opseq, tx) => tx.send(self.factory.ack(opseq)).unwrap_or(()), - } - if let Err(e) = self.content.send(self.factory.content()) { - error!("error updating content: {}", e); - break; - } - }, - None => break, - } - }, - - _ = tokio::time::sleep(std::time::Duration::from_secs(1)) => {}, - - }; - } - } - - fn exec(&mut self, op: OpWrapper) -> Result { - match op { - OpWrapper::Insert(txt, pos) => Ok(self.factory.insert(&txt, pos)?), - OpWrapper::Delete(pos, count) => Ok(self.factory.delete(pos, count)?), - OpWrapper::Cancel(pos, count) => Ok(self.factory.cancel(pos, count)?), - OpWrapper::Replace(txt) => Ok(self.factory.replace(&txt)?), - } - } -}