From 3609dbfa84a348df511c080803677d842294e722 Mon Sep 17 00:00:00 2001 From: alemi Date: Wed, 19 Apr 2023 04:18:22 +0200 Subject: [PATCH] chore: finished reimplementing features modularly now everything that worked in 0.2 seems to work again, and should actually be better. plus, merging differences is done properly and thus should be way more reliable --- src/client/nvim/codemp.lua | 3 +- src/client/nvim/main.rs | 153 ++++++++++++++++++--------------- src/lib/client.rs | 16 +++- src/lib/cursor.rs | 47 ++++++++-- src/lib/operation/processor.rs | 26 ++++-- 5 files changed, 158 insertions(+), 87 deletions(-) diff --git a/src/client/nvim/codemp.lua b/src/client/nvim/codemp.lua index 3be8f8e..55e0a1d 100644 --- a/src/client/nvim/codemp.lua +++ b/src/client/nvim/codemp.lua @@ -49,13 +49,13 @@ local function hook_callbacks(path, buffer) { callback = function(args) local cursor = vim.api.nvim_win_get_cursor(0) + pcall(M.cursor, path, cursor[1], cursor[2]) -- TODO log errors if cursor[1] == last_line then return end last_line = cursor[1] local lines = vim.api.nvim_buf_get_lines(args.buf, 0, -1, false) pcall(M.replace, path, vim.fn.join(lines, "\n")) -- TODO log errors - pcall(M.cursor, path, cursor[1], cursor[2]) -- TODO log errors end, buffer = buffer, group = codemp_autocmds, @@ -111,6 +111,7 @@ vim.api.nvim_create_user_command('Connect', -- print(vim.fn.join(data, "\n")) end, stderr_buffered = false, + env = { RUST_BACKTRACE = 1 } } ) if M.jobid <= 0 then diff --git a/src/client/nvim/main.rs b/src/client/nvim/main.rs index f5651a6..f7e9125 100644 --- a/src/client/nvim/main.rs +++ b/src/client/nvim/main.rs @@ -1,7 +1,7 @@ +use std::sync::Arc; use std::{net::TcpStream, sync::Mutex, collections::BTreeMap}; -use codemp::cursor::CursorController; -use codemp::operation::OperationController; +use codemp::operation::{OperationController, OperationFactory}; use codemp::{client::CodempClient, operation::OperationProcessor}; use codemp::proto::buffer_client::BufferClient; use rmpv::Value; @@ -16,8 +16,7 @@ use tracing::{error, warn, debug, info}; #[derive(Clone)] struct NeovimHandler { client: CodempClient, - factories: BTreeMap, - cursor: Option, + factories: Arc>>>, } fn nullable_optional_str(args: &Vec, index: usize) -> Option { @@ -36,6 +35,12 @@ fn default_zero_number(args: &Vec, index: usize) -> i64 { nullable_optional_number(args, index).unwrap_or(0) } +impl NeovimHandler { + fn buffer_controller(&self, path: &String) -> Option> { + Some(self.factories.lock().unwrap().get(path)?.clone()) + } +} + #[tonic::async_trait] impl Handler for NeovimHandler { type Writer = Compat; @@ -72,16 +77,18 @@ impl Handler for NeovimHandler { } let path = default_empty_str(&args, 0); let txt = default_empty_str(&args, 1); - let pos = default_zero_number(&args, 2) as u64; - let mut c = self.client.clone(); - match c.insert(path, txt, pos).await { - Ok(res) => { - match res { - true => Ok(Value::Nil), - false => Err(Value::from("rejected")), + let mut pos = default_zero_number(&args, 2) as i64; + + if pos <= 0 { pos = 0 } // TODO wtf vim?? + + match self.buffer_controller(&path) { + None => Err(Value::from("no controller for given path")), + Some(controller) => { + match controller.apply(controller.insert(&txt, pos as u64)).await { + Err(e) => Err(Value::from(format!("could not send insert: {}", e))), + Ok(_res) => Ok(Value::Nil), } - }, - Err(e) => Err(Value::from(format!("could not send insert: {}", e))), + } } }, @@ -93,13 +100,12 @@ impl Handler for NeovimHandler { let pos = default_zero_number(&args, 1) as u64; let count = default_zero_number(&args, 2) as u64; - let mut c = self.client.clone(); - match c.delete(path, pos, count).await { - Ok(res) => match res { - true => Ok(Value::Nil), - false => Err(Value::from("rejected")), - }, - Err(e) => Err(Value::from(format!("could not send insert: {}", e))), + match self.buffer_controller(&path) { + None => Err(Value::from("no controller for given path")), + Some(controller) => match controller.apply(controller.delete(pos, count)).await { + Err(e) => Err(Value::from(format!("could not send delete: {}", e))), + Ok(_res) => Ok(Value::Nil), + } } }, @@ -110,13 +116,12 @@ impl Handler for NeovimHandler { let path = default_empty_str(&args, 0); let txt = default_empty_str(&args, 1); - let mut c = self.client.clone(); - match c.replace(path, txt).await { - Ok(res) => match res { - true => Ok(Value::Nil), - false => Err(Value::from("rejected")), - }, - Err(e) => Err(Value::from(format!("could not send replace: {}", e))), + match self.buffer_controller(&path) { + None => Err(Value::from("no controller for given path")), + Some(controller) => match controller.apply(controller.replace(&txt)).await { + Err(e) => Err(Value::from(format!("could not send replace: {}", e))), + Ok(_res) => Ok(Value::Nil), + } } }, @@ -132,63 +137,72 @@ impl Handler for NeovimHandler { let mut c = self.client.clone(); - let buf = buffer.clone(); - match c.attach(path, move |x| { - let lines : Vec = x.split("\n").map(|x| x.to_string()).collect(); - let b = buf.clone(); - tokio::spawn(async move { - if let Err(e) = b.set_lines(0, -1, false, lines).await { - error!("could not update buffer: {}", e); - } - }); - }).await { + match c.attach(path.clone()).await { Err(e) => Err(Value::from(format!("could not attach to stream: {}", e))), - Ok(content) => { - let lines : Vec = content.split("\n").map(|x| x.to_string()).collect(); - if let Err(e) = buffer.set_lines(0, -1, false, lines).await { - error!("could not update buffer: {}", e); + Ok(controller) => { + let _controller = controller.clone(); + let lines : Vec = _controller.content().split("\n").map(|x| x.to_string()).collect(); + match buffer.set_lines(0, -1, false, lines).await { + Err(e) => Err(Value::from(format!("could not sync buffer: {}", e))), + Ok(()) => { + tokio::spawn(async move { + loop { + _controller.wait().await; + let lines : Vec = _controller.content().split("\n").map(|x| x.to_string()).collect(); + if let Err(e) = buffer.set_lines(0, -1, false, lines).await { + error!("could not update buffer: {}", e); + } + } + }); + self.factories.lock().unwrap().insert(path, controller); + Ok(Value::Nil) + } } - Ok(Value::Nil) }, } }, "detach" => { - if args.len() < 1 { - return Err(Value::from("no path given")); - } - let path = default_empty_str(&args, 0); - let mut c = self.client.clone(); - c.detach(path); - Ok(Value::Nil) + Err(Value::from("unimplemented! try with :q!")) + // if args.len() < 1 { + // return Err(Value::from("no path given")); + // } + // let path = default_empty_str(&args, 0); + // let mut c = self.client.clone(); + // c.detach(path); + // Ok(Value::Nil) }, "listen" => { - if args.len() < 1 { - return Err(Value::from("no path given")); - } - let path = default_empty_str(&args, 0); - let mut c = self.client.clone(); - let ns = nvim.create_namespace("Cursor").await .map_err(|e| Value::from(format!("could not create namespace: {}", e)))?; let buf = nvim.get_current_buf().await .map_err(|e| Value::from(format!("could not get current buf: {}", e)))?; - match c.listen(path, move |cur| { - let _b = buf.clone(); - tokio::spawn(async move { - if let Err(e) = _b.clear_namespace(ns, 0, -1).await { - error!("could not clear previous cursor highlight: {}", e); - } - if let Err(e) = _b.add_highlight(ns, "ErrorMsg", cur.row-1, cur.col, cur.col+1).await { - error!("could not create highlight for cursor: {}", e); - } - }); - }).await { - Ok(()) => Ok(Value::Nil), + let mut c = self.client.clone(); + match c.listen().await { Err(e) => Err(Value::from(format!("could not listen cursors: {}", e))), + Ok(cursor) => { + let mut sub = cursor.sub(); + debug!("spawning cursor processing worker"); + tokio::spawn(async move { + loop { + match sub.recv().await { + Err(e) => return error!("error receiving cursor update from controller: {}", e), + Ok((_usr, cur)) => { + if let Err(e) = buf.clear_namespace(ns, 0, -1).await { + error!("could not clear previous cursor highlight: {}", e); + } + if let Err(e) = buf.add_highlight(ns, "ErrorMsg", cur.start.row-1, cur.start.col, cur.start.col+1).await { + error!("could not create highlight for cursor: {}", e); + } + } + } + } + }); + Ok(Value::Nil) + }, } }, @@ -202,8 +216,8 @@ impl Handler for NeovimHandler { let mut c = self.client.clone(); match c.cursor(path, row, col).await { - Ok(()) => Ok(Value::Nil), - Err(e) => Err(Value::from(format!("could not send cursor update: {}", e))), + Ok(_) => Ok(Value::Nil), + Err(e) => Err(Value:: from(format!("could not update cursor: {}", e))), } }, @@ -262,6 +276,7 @@ async fn main() -> Result<(), Box> { let handler: NeovimHandler = NeovimHandler { client: client.into(), + factories: Arc::new(Mutex::new(BTreeMap::new())), }; let (_nvim, io_handler) = create::new_parent(handler).await; diff --git a/src/lib/client.rs b/src/lib/client.rs index 0ab4938..f16ce15 100644 --- a/src/lib/client.rs +++ b/src/lib/client.rs @@ -8,7 +8,7 @@ use uuid::Uuid; use crate::{ cursor::{CursorController, CursorStorage}, operation::{OperationController, OperationProcessor}, - proto::{buffer_client::BufferClient, BufferPayload, OperationRequest}, + proto::{buffer_client::BufferClient, BufferPayload, OperationRequest, CursorMov}, }; #[derive(Clone)] @@ -26,7 +26,6 @@ impl From::> for CodempClient { impl CodempClient { pub fn new(id: String, client: BufferClient) -> Self { CodempClient { id, client } - } pub async fn create(&mut self, path: String, content: Option) -> Result { @@ -91,7 +90,9 @@ impl CodempClient { Err(e) => break error!("error deserializing opseq: {}", e), Ok(v) => match _factory.process(v).await { Err(e) => break error!("could not apply operation from server: {}", e), - Ok(_txt) => { } + Ok(_txt) => { + // send event containing where the change happened + } } }, } @@ -125,4 +126,13 @@ impl CodempClient { Ok(factory) } + + pub async fn cursor(&mut self, path: String, row: i64, col: i64) -> Result { + let req = CursorMov { + path, row, col, + user: self.id.clone(), + }; + let res = self.client.cursor(req).await?.into_inner(); + Ok(res.accepted) + } } diff --git a/src/lib/cursor.rs b/src/lib/cursor.rs index c6ed07a..9c3a2b4 100644 --- a/src/lib/cursor.rs +++ b/src/lib/cursor.rs @@ -1,5 +1,8 @@ use std::{collections::HashMap, sync::Mutex}; +use tokio::sync::broadcast; +use tracing::{info, error, debug, warn}; + use crate::proto::CursorMov; /// Note that this differs from any hashmap in its put method: no &mut! @@ -16,10 +19,10 @@ pub trait CursorStorage { } } -#[derive(Copy, Clone)] +#[derive(Copy, Clone, Debug, Default)] pub struct Position { - row: i64, - col: i64, + pub row: i64, + pub col: i64, } impl From::<(i64, i64)> for Position { @@ -28,24 +31,52 @@ impl From::<(i64, i64)> for Position { } } -#[derive(Clone)] +#[derive(Clone, Debug, Default)] pub struct Cursor { - buffer: String, - start: Position, - end: Position, + pub buffer: String, + pub start: Position, + pub end: Position, } pub struct CursorController { users: Mutex>, + bus: broadcast::Sender<(String, Cursor)>, + _bus_keepalive: Mutex>, } impl CursorController { pub fn new() -> Self { - CursorController { users: Mutex::new(HashMap::new()) } + let (tx, _rx) = broadcast::channel(64); + CursorController { + users: Mutex::new(HashMap::new()), + bus: tx, + _bus_keepalive: Mutex::new(_rx), + } + } + + pub fn sub(&self) -> broadcast::Receiver<(String, Cursor)> { + self.bus.subscribe() } } impl CursorStorage for CursorController { + fn update(&self, event: CursorMov) -> Option { + debug!("processing cursor event: {:?}", event); + let mut cur = self.get(&event.user).unwrap_or(Cursor::default()); + cur.buffer = event.path; + cur.start = (event.row, event.col).into(); + cur.end = (event.row, event.col).into(); + self.put(event.user.clone(), cur.clone()); + if let Err(e) = self.bus.send((event.user, cur.clone())) { + error!("could not broadcast cursor event: {}", e); + } else { // this is because once there are no receivers, nothing else can be sent + if let Err(e) = self._bus_keepalive.lock().unwrap().try_recv() { + warn!("could not consume event: {}", e); + } + } + Some(cur) + } + fn get(&self, id: &String) -> Option { Some(self.users.lock().unwrap().get(id)?.clone()) } diff --git a/src/lib/operation/processor.rs b/src/lib/operation/processor.rs index a363cf8..a585b80 100644 --- a/src/lib/operation/processor.rs +++ b/src/lib/operation/processor.rs @@ -1,19 +1,19 @@ -use std::{sync::{Mutex, Arc}, collections::VecDeque}; +use std::{sync::Mutex, collections::VecDeque}; use operational_transform::{OperationSeq, OTError}; -use tokio::sync::{watch, oneshot, mpsc}; -use tracing::error; +use tokio::sync::watch; use crate::operation::factory::OperationFactory; #[tonic::async_trait] -pub trait OperationProcessor : OperationFactory{ +pub trait OperationProcessor : OperationFactory { async fn apply(&self, op: OperationSeq) -> Result; async fn process(&self, op: OperationSeq) -> Result; async fn poll(&self) -> Option; async fn ack(&self) -> Option; + async fn wait(&self); } @@ -22,16 +22,21 @@ pub struct OperationController { queue: Mutex>, last: Mutex>, notifier: watch::Sender, + changed: Mutex>, + changed_notifier: watch::Sender<()>, } impl OperationController { pub fn new(content: String) -> Self { let (tx, rx) = watch::channel(OperationSeq::default()); + let (done, wait) = watch::channel(()); OperationController { text: Mutex::new(content), queue: Mutex::new(VecDeque::new()), last: Mutex::new(rx), notifier: tx, + changed: Mutex::new(wait), + changed_notifier: done, } } } @@ -49,10 +54,16 @@ impl OperationProcessor for OperationController { let res = op.apply(&txt)?; *self.text.lock().unwrap() = res.clone(); self.queue.lock().unwrap().push_back(op.clone()); - self.notifier.send(op).unwrap(); + self.notifier.send(op); Ok(res) } + async fn wait(&self) { + let mut blocker = self.changed.lock().unwrap().clone(); + blocker.changed().await; + blocker.changed().await; + } + async fn process(&self, mut op: OperationSeq) -> Result { let mut queue = self.queue.lock().unwrap(); for el in queue.iter_mut() { @@ -61,6 +72,7 @@ impl OperationProcessor for OperationController { let txt = self.content(); let res = op.apply(&txt)?; *self.text.lock().unwrap() = res.clone(); + self.changed_notifier.send(()); Ok(res) } @@ -68,7 +80,9 @@ impl OperationProcessor for OperationController { let len = self.queue.lock().unwrap().len(); if len <= 0 { let mut recv = self.last.lock().unwrap().clone(); - recv.changed().await.unwrap(); + // TODO this is not 100% reliable + recv.changed().await; // acknowledge current state + recv.changed().await; // wait for a change in state } Some(self.queue.lock().unwrap().get(0)?.clone()) }