diff --git a/src/client/nvim/codemp.lua b/src/client/nvim/codemp.lua index 3be8f8e..55e0a1d 100644 --- a/src/client/nvim/codemp.lua +++ b/src/client/nvim/codemp.lua @@ -49,13 +49,13 @@ local function hook_callbacks(path, buffer) { callback = function(args) local cursor = vim.api.nvim_win_get_cursor(0) + pcall(M.cursor, path, cursor[1], cursor[2]) -- TODO log errors if cursor[1] == last_line then return end last_line = cursor[1] local lines = vim.api.nvim_buf_get_lines(args.buf, 0, -1, false) pcall(M.replace, path, vim.fn.join(lines, "\n")) -- TODO log errors - pcall(M.cursor, path, cursor[1], cursor[2]) -- TODO log errors end, buffer = buffer, group = codemp_autocmds, @@ -111,6 +111,7 @@ vim.api.nvim_create_user_command('Connect', -- print(vim.fn.join(data, "\n")) end, stderr_buffered = false, + env = { RUST_BACKTRACE = 1 } } ) if M.jobid <= 0 then diff --git a/src/client/nvim/main.rs b/src/client/nvim/main.rs index f5651a6..f7e9125 100644 --- a/src/client/nvim/main.rs +++ b/src/client/nvim/main.rs @@ -1,7 +1,7 @@ +use std::sync::Arc; use std::{net::TcpStream, sync::Mutex, collections::BTreeMap}; -use codemp::cursor::CursorController; -use codemp::operation::OperationController; +use codemp::operation::{OperationController, OperationFactory}; use codemp::{client::CodempClient, operation::OperationProcessor}; use codemp::proto::buffer_client::BufferClient; use rmpv::Value; @@ -16,8 +16,7 @@ use tracing::{error, warn, debug, info}; #[derive(Clone)] struct NeovimHandler { client: CodempClient, - factories: BTreeMap, - cursor: Option, + factories: Arc>>>, } fn nullable_optional_str(args: &Vec, index: usize) -> Option { @@ -36,6 +35,12 @@ fn default_zero_number(args: &Vec, index: usize) -> i64 { nullable_optional_number(args, index).unwrap_or(0) } +impl NeovimHandler { + fn buffer_controller(&self, path: &String) -> Option> { + Some(self.factories.lock().unwrap().get(path)?.clone()) + } +} + #[tonic::async_trait] impl Handler for NeovimHandler { type Writer = Compat; @@ -72,16 +77,18 @@ impl Handler for NeovimHandler { } let path = default_empty_str(&args, 0); let txt = default_empty_str(&args, 1); - let pos = default_zero_number(&args, 2) as u64; - let mut c = self.client.clone(); - match c.insert(path, txt, pos).await { - Ok(res) => { - match res { - true => Ok(Value::Nil), - false => Err(Value::from("rejected")), + let mut pos = default_zero_number(&args, 2) as i64; + + if pos <= 0 { pos = 0 } // TODO wtf vim?? + + match self.buffer_controller(&path) { + None => Err(Value::from("no controller for given path")), + Some(controller) => { + match controller.apply(controller.insert(&txt, pos as u64)).await { + Err(e) => Err(Value::from(format!("could not send insert: {}", e))), + Ok(_res) => Ok(Value::Nil), } - }, - Err(e) => Err(Value::from(format!("could not send insert: {}", e))), + } } }, @@ -93,13 +100,12 @@ impl Handler for NeovimHandler { let pos = default_zero_number(&args, 1) as u64; let count = default_zero_number(&args, 2) as u64; - let mut c = self.client.clone(); - match c.delete(path, pos, count).await { - Ok(res) => match res { - true => Ok(Value::Nil), - false => Err(Value::from("rejected")), - }, - Err(e) => Err(Value::from(format!("could not send insert: {}", e))), + match self.buffer_controller(&path) { + None => Err(Value::from("no controller for given path")), + Some(controller) => match controller.apply(controller.delete(pos, count)).await { + Err(e) => Err(Value::from(format!("could not send delete: {}", e))), + Ok(_res) => Ok(Value::Nil), + } } }, @@ -110,13 +116,12 @@ impl Handler for NeovimHandler { let path = default_empty_str(&args, 0); let txt = default_empty_str(&args, 1); - let mut c = self.client.clone(); - match c.replace(path, txt).await { - Ok(res) => match res { - true => Ok(Value::Nil), - false => Err(Value::from("rejected")), - }, - Err(e) => Err(Value::from(format!("could not send replace: {}", e))), + match self.buffer_controller(&path) { + None => Err(Value::from("no controller for given path")), + Some(controller) => match controller.apply(controller.replace(&txt)).await { + Err(e) => Err(Value::from(format!("could not send replace: {}", e))), + Ok(_res) => Ok(Value::Nil), + } } }, @@ -132,63 +137,72 @@ impl Handler for NeovimHandler { let mut c = self.client.clone(); - let buf = buffer.clone(); - match c.attach(path, move |x| { - let lines : Vec = x.split("\n").map(|x| x.to_string()).collect(); - let b = buf.clone(); - tokio::spawn(async move { - if let Err(e) = b.set_lines(0, -1, false, lines).await { - error!("could not update buffer: {}", e); - } - }); - }).await { + match c.attach(path.clone()).await { Err(e) => Err(Value::from(format!("could not attach to stream: {}", e))), - Ok(content) => { - let lines : Vec = content.split("\n").map(|x| x.to_string()).collect(); - if let Err(e) = buffer.set_lines(0, -1, false, lines).await { - error!("could not update buffer: {}", e); + Ok(controller) => { + let _controller = controller.clone(); + let lines : Vec = _controller.content().split("\n").map(|x| x.to_string()).collect(); + match buffer.set_lines(0, -1, false, lines).await { + Err(e) => Err(Value::from(format!("could not sync buffer: {}", e))), + Ok(()) => { + tokio::spawn(async move { + loop { + _controller.wait().await; + let lines : Vec = _controller.content().split("\n").map(|x| x.to_string()).collect(); + if let Err(e) = buffer.set_lines(0, -1, false, lines).await { + error!("could not update buffer: {}", e); + } + } + }); + self.factories.lock().unwrap().insert(path, controller); + Ok(Value::Nil) + } } - Ok(Value::Nil) }, } }, "detach" => { - if args.len() < 1 { - return Err(Value::from("no path given")); - } - let path = default_empty_str(&args, 0); - let mut c = self.client.clone(); - c.detach(path); - Ok(Value::Nil) + Err(Value::from("unimplemented! try with :q!")) + // if args.len() < 1 { + // return Err(Value::from("no path given")); + // } + // let path = default_empty_str(&args, 0); + // let mut c = self.client.clone(); + // c.detach(path); + // Ok(Value::Nil) }, "listen" => { - if args.len() < 1 { - return Err(Value::from("no path given")); - } - let path = default_empty_str(&args, 0); - let mut c = self.client.clone(); - let ns = nvim.create_namespace("Cursor").await .map_err(|e| Value::from(format!("could not create namespace: {}", e)))?; let buf = nvim.get_current_buf().await .map_err(|e| Value::from(format!("could not get current buf: {}", e)))?; - match c.listen(path, move |cur| { - let _b = buf.clone(); - tokio::spawn(async move { - if let Err(e) = _b.clear_namespace(ns, 0, -1).await { - error!("could not clear previous cursor highlight: {}", e); - } - if let Err(e) = _b.add_highlight(ns, "ErrorMsg", cur.row-1, cur.col, cur.col+1).await { - error!("could not create highlight for cursor: {}", e); - } - }); - }).await { - Ok(()) => Ok(Value::Nil), + let mut c = self.client.clone(); + match c.listen().await { Err(e) => Err(Value::from(format!("could not listen cursors: {}", e))), + Ok(cursor) => { + let mut sub = cursor.sub(); + debug!("spawning cursor processing worker"); + tokio::spawn(async move { + loop { + match sub.recv().await { + Err(e) => return error!("error receiving cursor update from controller: {}", e), + Ok((_usr, cur)) => { + if let Err(e) = buf.clear_namespace(ns, 0, -1).await { + error!("could not clear previous cursor highlight: {}", e); + } + if let Err(e) = buf.add_highlight(ns, "ErrorMsg", cur.start.row-1, cur.start.col, cur.start.col+1).await { + error!("could not create highlight for cursor: {}", e); + } + } + } + } + }); + Ok(Value::Nil) + }, } }, @@ -202,8 +216,8 @@ impl Handler for NeovimHandler { let mut c = self.client.clone(); match c.cursor(path, row, col).await { - Ok(()) => Ok(Value::Nil), - Err(e) => Err(Value::from(format!("could not send cursor update: {}", e))), + Ok(_) => Ok(Value::Nil), + Err(e) => Err(Value:: from(format!("could not update cursor: {}", e))), } }, @@ -262,6 +276,7 @@ async fn main() -> Result<(), Box> { let handler: NeovimHandler = NeovimHandler { client: client.into(), + factories: Arc::new(Mutex::new(BTreeMap::new())), }; let (_nvim, io_handler) = create::new_parent(handler).await; diff --git a/src/lib/client.rs b/src/lib/client.rs index 0ab4938..f16ce15 100644 --- a/src/lib/client.rs +++ b/src/lib/client.rs @@ -8,7 +8,7 @@ use uuid::Uuid; use crate::{ cursor::{CursorController, CursorStorage}, operation::{OperationController, OperationProcessor}, - proto::{buffer_client::BufferClient, BufferPayload, OperationRequest}, + proto::{buffer_client::BufferClient, BufferPayload, OperationRequest, CursorMov}, }; #[derive(Clone)] @@ -26,7 +26,6 @@ impl From::> for CodempClient { impl CodempClient { pub fn new(id: String, client: BufferClient) -> Self { CodempClient { id, client } - } pub async fn create(&mut self, path: String, content: Option) -> Result { @@ -91,7 +90,9 @@ impl CodempClient { Err(e) => break error!("error deserializing opseq: {}", e), Ok(v) => match _factory.process(v).await { Err(e) => break error!("could not apply operation from server: {}", e), - Ok(_txt) => { } + Ok(_txt) => { + // send event containing where the change happened + } } }, } @@ -125,4 +126,13 @@ impl CodempClient { Ok(factory) } + + pub async fn cursor(&mut self, path: String, row: i64, col: i64) -> Result { + let req = CursorMov { + path, row, col, + user: self.id.clone(), + }; + let res = self.client.cursor(req).await?.into_inner(); + Ok(res.accepted) + } } diff --git a/src/lib/cursor.rs b/src/lib/cursor.rs index c6ed07a..9c3a2b4 100644 --- a/src/lib/cursor.rs +++ b/src/lib/cursor.rs @@ -1,5 +1,8 @@ use std::{collections::HashMap, sync::Mutex}; +use tokio::sync::broadcast; +use tracing::{info, error, debug, warn}; + use crate::proto::CursorMov; /// Note that this differs from any hashmap in its put method: no &mut! @@ -16,10 +19,10 @@ pub trait CursorStorage { } } -#[derive(Copy, Clone)] +#[derive(Copy, Clone, Debug, Default)] pub struct Position { - row: i64, - col: i64, + pub row: i64, + pub col: i64, } impl From::<(i64, i64)> for Position { @@ -28,24 +31,52 @@ impl From::<(i64, i64)> for Position { } } -#[derive(Clone)] +#[derive(Clone, Debug, Default)] pub struct Cursor { - buffer: String, - start: Position, - end: Position, + pub buffer: String, + pub start: Position, + pub end: Position, } pub struct CursorController { users: Mutex>, + bus: broadcast::Sender<(String, Cursor)>, + _bus_keepalive: Mutex>, } impl CursorController { pub fn new() -> Self { - CursorController { users: Mutex::new(HashMap::new()) } + let (tx, _rx) = broadcast::channel(64); + CursorController { + users: Mutex::new(HashMap::new()), + bus: tx, + _bus_keepalive: Mutex::new(_rx), + } + } + + pub fn sub(&self) -> broadcast::Receiver<(String, Cursor)> { + self.bus.subscribe() } } impl CursorStorage for CursorController { + fn update(&self, event: CursorMov) -> Option { + debug!("processing cursor event: {:?}", event); + let mut cur = self.get(&event.user).unwrap_or(Cursor::default()); + cur.buffer = event.path; + cur.start = (event.row, event.col).into(); + cur.end = (event.row, event.col).into(); + self.put(event.user.clone(), cur.clone()); + if let Err(e) = self.bus.send((event.user, cur.clone())) { + error!("could not broadcast cursor event: {}", e); + } else { // this is because once there are no receivers, nothing else can be sent + if let Err(e) = self._bus_keepalive.lock().unwrap().try_recv() { + warn!("could not consume event: {}", e); + } + } + Some(cur) + } + fn get(&self, id: &String) -> Option { Some(self.users.lock().unwrap().get(id)?.clone()) } diff --git a/src/lib/operation/processor.rs b/src/lib/operation/processor.rs index a363cf8..a585b80 100644 --- a/src/lib/operation/processor.rs +++ b/src/lib/operation/processor.rs @@ -1,19 +1,19 @@ -use std::{sync::{Mutex, Arc}, collections::VecDeque}; +use std::{sync::Mutex, collections::VecDeque}; use operational_transform::{OperationSeq, OTError}; -use tokio::sync::{watch, oneshot, mpsc}; -use tracing::error; +use tokio::sync::watch; use crate::operation::factory::OperationFactory; #[tonic::async_trait] -pub trait OperationProcessor : OperationFactory{ +pub trait OperationProcessor : OperationFactory { async fn apply(&self, op: OperationSeq) -> Result; async fn process(&self, op: OperationSeq) -> Result; async fn poll(&self) -> Option; async fn ack(&self) -> Option; + async fn wait(&self); } @@ -22,16 +22,21 @@ pub struct OperationController { queue: Mutex>, last: Mutex>, notifier: watch::Sender, + changed: Mutex>, + changed_notifier: watch::Sender<()>, } impl OperationController { pub fn new(content: String) -> Self { let (tx, rx) = watch::channel(OperationSeq::default()); + let (done, wait) = watch::channel(()); OperationController { text: Mutex::new(content), queue: Mutex::new(VecDeque::new()), last: Mutex::new(rx), notifier: tx, + changed: Mutex::new(wait), + changed_notifier: done, } } } @@ -49,10 +54,16 @@ impl OperationProcessor for OperationController { let res = op.apply(&txt)?; *self.text.lock().unwrap() = res.clone(); self.queue.lock().unwrap().push_back(op.clone()); - self.notifier.send(op).unwrap(); + self.notifier.send(op); Ok(res) } + async fn wait(&self) { + let mut blocker = self.changed.lock().unwrap().clone(); + blocker.changed().await; + blocker.changed().await; + } + async fn process(&self, mut op: OperationSeq) -> Result { let mut queue = self.queue.lock().unwrap(); for el in queue.iter_mut() { @@ -61,6 +72,7 @@ impl OperationProcessor for OperationController { let txt = self.content(); let res = op.apply(&txt)?; *self.text.lock().unwrap() = res.clone(); + self.changed_notifier.send(()); Ok(res) } @@ -68,7 +80,9 @@ impl OperationProcessor for OperationController { let len = self.queue.lock().unwrap().len(); if len <= 0 { let mut recv = self.last.lock().unwrap().clone(); - recv.changed().await.unwrap(); + // TODO this is not 100% reliable + recv.changed().await; // acknowledge current state + recv.changed().await; // wait for a change in state } Some(self.queue.lock().unwrap().get(0)?.clone()) }