diff --git a/proto/buffer.proto b/proto/buffer.proto index 3a322f5..f6bc495 100644 --- a/proto/buffer.proto +++ b/proto/buffer.proto @@ -6,6 +6,8 @@ service Buffer { rpc Edit (OperationRequest) returns (BufferResponse); rpc Create (BufferPayload) returns (BufferResponse); rpc Sync (BufferPayload) returns (BufferResponse); + rpc Cursor (CursorMov) returns (BufferResponse); + rpc Listen (BufferPayload) returns (stream CursorMov); } message RawOp { @@ -13,6 +15,13 @@ message RawOp { string user = 2; } +message CursorMov { + string user = 1; + string path = 2; + int64 row = 3; + int64 col = 4; +} + message OperationRequest { string path = 1; string hash = 2; diff --git a/src/client/nvim/codemp.lua b/src/client/nvim/codemp.lua index 63fb064..405f61d 100644 --- a/src/client/nvim/codemp.lua +++ b/src/client/nvim/codemp.lua @@ -4,8 +4,10 @@ local M = {} M.jobid = nil M.create = function(path, content) return vim.rpcrequest(M.jobid, "create", path, content) end M.insert = function(path, txt, pos) return vim.rpcrequest(M.jobid, "insert", path, txt, pos) end +M.cursor = function(path, row, col) return vim.rpcrequest(M.jobid, "cursor", path, row, col) end M.delete = function(path, pos, count) return vim.rpcrequest(M.jobid, "delete", path, pos, count) end M.attach = function(path) return vim.rpcrequest(M.jobid, "attach", path) end +M.listen = function(path) return vim.rpcrequest(M.jobid, "listen", path) end M.detach = function(path) return vim.rpcrequest(M.jobid, "detach", path) end local function cursor_offset() @@ -24,6 +26,17 @@ local function hook_callbacks(path, buffer) group = codemp_autocmds, } ) + vim.api.nvim_create_autocmd( + { "CursorMoved" }, + { + callback = function(_) + local cursor = vim.api.nvim_win_get_cursor(0) + M.cursor(path, cursor[1], cursor[2]) + end, + buffer = buffer, + group = codemp_autocmds, + } + ) vim.keymap.set('i', '', function() M.delete(path, cursor_offset(), 1) return '' end, {expr = true, buffer = buffer}) vim.keymap.set('i', '', function() M.delete(path, cursor_offset() + 1, 1) return '' end, {expr = true, buffer = buffer}) vim.keymap.set('i', '', function() M.insert(path, "\n", cursor_offset()) return ''end, {expr = true, buffer = buffer}) @@ -76,6 +89,7 @@ vim.api.nvim_create_user_command('Share', M.create(path, vim.fn.join(lines, "\n")) hook_callbacks(path, bufnr) M.attach(path) + M.listen(path) end, { nargs=1 }) @@ -85,6 +99,7 @@ vim.api.nvim_create_user_command('Join', local bufnr = vim.api.nvim_get_current_buf() hook_callbacks(path, bufnr) M.attach(path) + M.listen(path) end, { nargs=1 }) diff --git a/src/client/nvim/main.rs b/src/client/nvim/main.rs index ec649a1..8b46e49 100644 --- a/src/client/nvim/main.rs +++ b/src/client/nvim/main.rs @@ -25,11 +25,11 @@ fn default_empty_str(args: &Vec, index: usize) -> String { nullable_optional_str(args, index).unwrap_or("".into()) } -fn nullable_optional_number(args: &Vec, index: usize) -> Option { - Some(args.get(index)?.as_u64()?) +fn nullable_optional_number(args: &Vec, index: usize) -> Option { + Some(args.get(index)?.as_i64()?) } -fn default_zero_number(args: &Vec, index: usize) -> u64 { +fn default_zero_number(args: &Vec, index: usize) -> i64 { nullable_optional_number(args, index).unwrap_or(0) } @@ -69,7 +69,7 @@ impl Handler for NeovimHandler { } let path = default_empty_str(&args, 0); let txt = default_empty_str(&args, 1); - let pos = default_zero_number(&args, 2); + let pos = default_zero_number(&args, 2) as u64; let mut c = self.client.clone(); info!("correctly parsed arguments: {} - {} - {}", path, txt, pos); match c.insert(path, txt, pos).await { @@ -89,8 +89,8 @@ impl Handler for NeovimHandler { return Err(Value::from("not enough arguments")); } let path = default_empty_str(&args, 0); - let pos = default_zero_number(&args, 1); - let count = default_zero_number(&args, 2); + let pos = default_zero_number(&args, 1) as u64; + let count = default_zero_number(&args, 2) as u64; let mut c = self.client.clone(); match c.delete(path, pos, count).await { @@ -145,6 +145,53 @@ impl Handler for NeovimHandler { Ok(Value::Nil) }, + "listen" => { + if args.len() < 1 { + return Err(Value::from("no path given")); + } + let path = default_empty_str(&args, 0); + let mut c = self.client.clone(); + + let ns = nvim.create_namespace("Cursor").await + .map_err(|e| Value::from(format!("could not create namespace: {}", e)))?; + + let buf = nvim.get_current_buf().await + .map_err(|e| Value::from(format!("could not get current buf: {}", e)))?; + + match c.listen(path, move |cur| { + let _b = buf.clone(); + tokio::spawn(async move { + if let Err(e) = _b.clear_namespace(ns, 0, -1).await { + error!("could not clear previous cursor highlight: {}", e); + } + if let Err(e) = _b.add_highlight( + ns, "ErrorMsg", + cur.row as i64 - 1, cur.col as i64 - 1, cur.col as i64 + ).await { + error!("could not create highlight for cursor: {}", e); + } + }); + }).await { + Ok(()) => Ok(Value::Nil), + Err(e) => Err(Value::from(format!("could not listen cursors: {}", e))), + } + }, + + "cursor" => { + if args.len() < 3 { + return Err(Value::from("not enough args")); + } + let path = default_empty_str(&args, 0); + let row = default_zero_number(&args, 1); + let col = default_zero_number(&args, 2); + + let mut c = self.client.clone(); + match c.cursor(path, row, col).await { + Ok(()) => Ok(Value::Nil), + Err(e) => Err(Value::from(format!("could not send cursor update: {}", e))), + } + }, + _ => Err(Value::from("unimplemented")), } } diff --git a/src/lib/client.rs b/src/lib/client.rs index fd1b308..64e5d07 100644 --- a/src/lib/client.rs +++ b/src/lib/client.rs @@ -6,7 +6,7 @@ use uuid::Uuid; use crate::{ opfactory::AsyncFactory, - proto::{buffer_client::BufferClient, BufferPayload, OperationRequest, RawOp}, + proto::{buffer_client::BufferClient, BufferPayload, OperationRequest, RawOp, CursorMov}, tonic::{transport::Channel, Status, Streaming}, }; @@ -89,6 +89,32 @@ impl CodempClient { } } + pub async fn cursor(&mut self, path: String, row: i64, col: i64) -> Result<(), Status> { + let req = CursorMov { + path, user: self.id.to_string(), + row, col, + }; + let _res = self.client.cursor(req).await?.into_inner(); + Ok(()) + } + + pub async fn listen(&mut self, path: String, callback: F) -> Result<(), Status> + where F : Fn(CursorMov) -> () + Send + 'static { + let req = BufferPayload { + path, + content: None, + user: self.id.to_string(), + }; + let mut stream = self.client.listen(req).await?.into_inner(); + tokio::spawn(async move { + // TODO catch some errors + while let Ok(Some(x)) = stream.message().await { + callback(x) + } + }); + Ok(()) + } + pub async fn attach(&mut self, path: String, callback: F) -> Result where F : Fn(String) -> () + Send + 'static { let content = self.sync(path.clone()).await?; diff --git a/src/lib/opfactory.rs b/src/lib/opfactory.rs index 99ea1d9..1dd85a1 100644 --- a/src/lib/opfactory.rs +++ b/src/lib/opfactory.rs @@ -70,6 +70,7 @@ impl OperationFactory { pub struct AsyncFactory { run: watch::Sender, ops: mpsc::Sender, + #[allow(unused)] // TODO is this necessary? content: watch::Receiver, } @@ -123,9 +124,6 @@ impl AsyncFactory { } - - - #[derive(Debug)] enum OpMsg { Exec(OpWrapper, oneshot::Sender>), diff --git a/src/server/buffer/service.rs b/src/server/buffer/service.rs index 6d26dc0..c0da738 100644 --- a/src/server/buffer/service.rs +++ b/src/server/buffer/service.rs @@ -1,16 +1,17 @@ use std::{pin::Pin, sync::{Arc, RwLock}, collections::HashMap}; -use tokio::sync::mpsc; +use tokio::sync::{mpsc, broadcast}; use tonic::{Request, Response, Status}; use tokio_stream::{Stream, wrappers::ReceiverStream}; // TODO example used this? -use codemp::proto::{buffer_server::{Buffer, BufferServer}, RawOp, BufferPayload, BufferResponse, OperationRequest}; +use codemp::proto::{buffer_server::{Buffer, BufferServer}, RawOp, BufferPayload, BufferResponse, OperationRequest, CursorMov}; use tracing::info; use super::actor::{BufferHandle, BufferStore}; type OperationStream = Pin> + Send>>; +type CursorStream = Pin> + Send>>; struct BufferMap { store: HashMap, @@ -33,11 +34,22 @@ impl BufferStore for BufferMap { pub struct BufferService { map: Arc>, + cursor: broadcast::Sender, +} + +impl BufferService { + fn get_buffer(&self, path: &String) -> Result { + match self.map.read().unwrap().get(path) { + Some(buf) => Ok(buf.clone()), + None => Err(Status::not_found("no buffer for given path")), + } + } } #[tonic::async_trait] impl Buffer for BufferService { type AttachStream = OperationStream; + type ListenStream = CursorStream; async fn attach(&self, req: Request) -> Result, Status> { let request = req.into_inner(); @@ -65,6 +77,33 @@ impl Buffer for BufferService { } } + async fn listen(&self, req: Request) -> Result, Status> { + let mut sub = self.cursor.subscribe(); + let myself = req.into_inner().user; + let (tx, rx) = mpsc::channel(128); + tokio::spawn(async move { + loop { + match sub.recv().await { + Ok(v) => { + if v.user == myself { continue } + tx.send(Ok(v)).await.unwrap(); // TODO unnecessary channel? + } + Err(_e) => break, + } + } + }); + let output_stream = ReceiverStream::new(rx); + info!("registered new subscriber to cursor updates"); + Ok(Response::new(Box::pin(output_stream))) + } + + async fn cursor(&self, req:Request) -> Result, Status> { + match self.cursor.send(req.into_inner()) { + Ok(_) => Ok(Response::new(BufferResponse { accepted: true, content: None})), + Err(e) => Err(Status::internal(format!("could not broadcast cursor update: {}", e))), + } + } + async fn edit(&self, req:Request) -> Result, Status> { let request = req.into_inner(); let tx = match self.map.read().unwrap().get(&request.path) { @@ -106,8 +145,10 @@ impl Buffer for BufferService { impl BufferService { pub fn new() -> BufferService { + let (cur_tx, _cur_rx) = broadcast::channel(64); // TODO hardcoded capacity BufferService { map: Arc::new(RwLock::new(HashMap::new().into())), + cursor: cur_tx, } }