From 9bbd30a5f89673d9d98700af9c72682877580d3b Mon Sep 17 00:00:00 2001 From: alemidev Date: Tue, 18 Oct 2022 02:22:04 +0200 Subject: [PATCH] feat: super barebones synched cursor across clients --- plugin/codemp.vim | 38 +++++------ proto/workspace.proto | 8 ++- src/client/dispatcher.rs | 96 +++++++++++++++++++++++++++ src/client/main.rs | 12 ++-- src/client/nvim/mod.rs | 120 +++++++++++++++++----------------- src/server/actor/workspace.rs | 8 ++- src/server/service/session.rs | 7 +- 7 files changed, 193 insertions(+), 96 deletions(-) create mode 100644 src/client/dispatcher.rs diff --git a/plugin/codemp.vim b/plugin/codemp.vim index 46c853e..e601b00 100644 --- a/plugin/codemp.vim +++ b/plugin/codemp.vim @@ -10,7 +10,8 @@ if ! exists('s:jobid') let s:jobid = 0 endif -let s:bin = "/home/alemi/projects/codemp/target/debug/codemp-client" +" TODO I know I know... +let s:bin = "/home/alemi/projects/codemp/target/debug/client-neovim" function codemp#init() let result = s:StartJob() @@ -61,9 +62,7 @@ function s:ConfigureJob(jobid) autocmd VimLeavePre * :call s:StopJob() - autocmd InsertEnter * :call s:NotifyInsertEnter() - autocmd InsertLeave * :call s:NotifyInsertLeave() - + autocmd CursorMoved * :call codemp#cursor() augroup END endfunction @@ -73,31 +72,26 @@ function s:NotifyInsertEnter() endfunction function s:NotifyInsertLeave() - call rpcnotify(s:jobid, 'insert', 0) -endfunction - -function codemp#buffer() - call rpcrequest(s:jobid, "buffer") -endfunction - -function codemp#ping() - call rpcrequest(s:jobid, "ping") -endfunction - -function codemp#test() - call rpcrequest(s:jobid, "rpc") + call rpcnotify(s:jobid, 'insert', -1) endfunction function codemp#create(k) - call rpcrequest(s:jobid, "create", a:k) + let l:sid = rpcrequest(s:jobid, "create", a:k) + echo l:sid endfunction -function codemp#sync(k) - call rpcrequest(s:jobid, "sync", a:k) +function codemp#join(k) + let l:ret = rpcrequest(s:jobid, "join", a:k) + echo l:ret endfunction -function codemp#leave(k) - call rpcrequest(s:jobid, "leave", a:k) +function codemp#startcursor(k) + call rpcrequest(s:jobid, "cursor-start", a:k) +endfunction + +function codemp#cursor() + let l:position = getpos('.') + call rpcnotify(s:jobid, "cursor", 0, l:position[1], l:position[2]) endfunction function s:OnStderr(id, data, event) dict diff --git a/proto/workspace.proto b/proto/workspace.proto index e45a68e..ae17813 100644 --- a/proto/workspace.proto +++ b/proto/workspace.proto @@ -19,11 +19,13 @@ message WorkspaceEvent { optional string body = 2; } +// nvim-rs passes everything as i64, so having them as i64 in the packet itself is convenient +// TODO can we make them i32 and save some space? message CursorUpdate { string username = 1; - int32 buffer = 2; - int32 col = 3; - int32 row = 4; + int64 buffer = 2; + int64 col = 3; + int64 row = 4; } message WorkspaceRequest { diff --git a/src/client/dispatcher.rs b/src/client/dispatcher.rs new file mode 100644 index 0000000..a56aa73 --- /dev/null +++ b/src/client/dispatcher.rs @@ -0,0 +1,96 @@ +pub mod proto { + tonic::include_proto!("session"); + tonic::include_proto!("workspace"); + tonic::include_proto!("buffer"); +} +use std::sync::Arc; +use tracing::error; + +use tokio::sync::{mpsc, Mutex}; +use tokio_stream::StreamExt; +use tokio_stream::wrappers::ReceiverStream; + +use proto::{ + workspace_client::WorkspaceClient, + session_client::SessionClient, + buffer_client::BufferClient, + WorkspaceBuilderRequest, JoinRequest, SessionResponse, CursorUpdate +}; +use tonic::{transport::Channel, Status, Request, Response}; + +#[derive(Clone)] +pub struct Dispatcher { + name: String, + dp: Arc>, // TODO use channels and don't lock +} + +struct DispatcherWorker { + // TODO do I need all three? Did I design the server badly? + session: SessionClient, + workspace: WorkspaceClient, + _buffers: BufferClient, +} + +impl Dispatcher { + pub async fn connect(addr:String) -> Result { + let (s, w, b) = tokio::join!( + SessionClient::connect(addr.clone()), + WorkspaceClient::connect(addr.clone()), + BufferClient::connect(addr.clone()), + ); + Ok( + Dispatcher { + name: format!("User#{}", rand::random::()), + dp: Arc::new( + Mutex::new( + DispatcherWorker { session: s?, workspace: w?, _buffers: b? } + ) + ) + } + ) + } + + pub async fn create_workspace(&self, name:String) -> Result, Status> { + self.dp.lock().await.session.create_workspace( + Request::new(WorkspaceBuilderRequest { name }) + ).await + } + + pub async fn join_workspace(&self, session_id:String) -> Result<(), Status> { + let mut req = Request::new(JoinRequest { name: self.name.clone() }); + req.metadata_mut().append("workspace", session_id.parse().unwrap()); + let mut stream = self.dp.lock().await.workspace.join(req).await?.into_inner(); + + let _worker = tokio::spawn(async move { + while let Some(pkt) = stream.next().await { + match pkt { + Ok(_event) => { + // TODO do something with events when they will mean something! + }, + Err(e) => error!("Error receiving event | {}", e), + } + } + }); + + Ok(()) + } + + pub async fn start_cursor_worker(&self, session_id:String, feed:mpsc::Receiver) -> Result, Status> { + let mut in_stream = Request::new(ReceiverStream::new(feed)); + in_stream.metadata_mut().append("workspace", session_id.parse().unwrap()); + + let mut stream = self.dp.lock().await.workspace.subscribe(in_stream).await?.into_inner(); + let (tx, rx) = mpsc::channel(50); + + let _worker = tokio::spawn(async move { + while let Some(pkt) = stream.next().await { + match pkt { + Ok(update) => tx.send(update).await.unwrap(), // TODO how to handle an error here? + Err(e) => error!("Error receiving cursor update | {}", e), + } + } + }); + + Ok(rx) + } +} diff --git a/src/client/main.rs b/src/client/main.rs index e033cfe..37eae85 100644 --- a/src/client/main.rs +++ b/src/client/main.rs @@ -1,14 +1,16 @@ -mod nvim; -pub mod proto { tonic::include_proto!("workspace"); } -use proto::workspace_client::WorkspaceClient; +mod nvim; +pub mod dispatcher; + +use dispatcher::Dispatcher; #[tokio::main] async fn main() -> Result<(), Box<(dyn std::error::Error + 'static)>> { - let client = WorkspaceClient::connect("http://[::1]:50051").await?; + + let dispatcher = Dispatcher::connect("http://[::1]:50051".into()).await.unwrap(); #[cfg(feature = "nvim")] - crate::nvim::run_nvim_client(client).await?; + crate::nvim::run_nvim_client(dispatcher).await?; Ok(()) } diff --git a/src/client/nvim/mod.rs b/src/client/nvim/mod.rs index 3458215..a176927 100644 --- a/src/client/nvim/mod.rs +++ b/src/client/nvim/mod.rs @@ -1,27 +1,26 @@ +use std::sync::Arc; + use rmpv::Value; use tokio::io::Stdout; use nvim_rs::{compat::tokio::Compat, Handler, Neovim}; use nvim_rs::create::tokio::new_parent; -use tonic::transport::Channel; +use tokio::sync::{mpsc, Mutex}; -use crate::proto::{WorkspaceRequest, workspace_client::WorkspaceClient}; +use crate::dispatcher::{Dispatcher, proto::CursorUpdate}; #[derive(Clone)] pub struct NeovimHandler { - go: bool, - client: WorkspaceClient, + dispatcher: Dispatcher, + sink: Arc>>>, } impl NeovimHandler { - pub fn new(client: WorkspaceClient) -> Self { - NeovimHandler { go: true, client } - } - - async fn live_edit_worker(&self) { - while self.go { - + pub fn new(dispatcher: Dispatcher) -> Self { + NeovimHandler { + dispatcher, + sink: Arc::new(Mutex::new(None)), } } } @@ -42,63 +41,50 @@ impl Handler for NeovimHandler { if args.len() < 1 { return Err(Value::from("[!] no session key")); } - let buf = neovim.get_current_buf().await.unwrap(); - let _content = buf.get_lines(0, buf.line_count().await.unwrap(), false).await.unwrap().join("\n"); - let _request = tonic::Request::new(WorkspaceRequest { - session_key: args[0].to_string(), - }); - let mut _c = self.client.clone(); - Err(Value::from("[!] Unimplemented")) - // let resp = c.create(request).await.unwrap().into_inner(); - // if resp.accepted { - // Ok(Value::from(resp.session_key)) - // } else { - // Err(Value::from("[!] rejected")) - // } + let res = self.dispatcher.create_workspace(args[0].to_string()) + .await + .map_err(|e| Value::from(e.to_string()))? + .into_inner(); + + Ok(res.session_key.into()) }, - "sync" => { + "join" => { if args.len() < 1 { return Err(Value::from("[!] no session key")); } - let _buf = neovim.get_current_buf().await.unwrap(); - let _request = tonic::Request::new(WorkspaceRequest { - session_key: args[0].to_string(), - }); - let mut _c = self.client.clone(); - Err(Value::from("[!] Unimplemented")) - // let resp = c.sync(request).await.unwrap().into_inner(); - // if let Some(content) = resp.content { - // buf.set_lines( - // 0, - // buf.line_count().await.unwrap(), - // false, - // content.split("\n").map(|s| s.to_string()).collect() - // ).await.unwrap(); - // Ok(Value::from("")) - // } else { - // Err(Value::from("[!] no content")) - // } + + self.dispatcher.join_workspace( + args[0].as_str().unwrap().to_string(), // TODO throw err if it's not a string? + ).await.map_err(|e| Value::from(e.to_string()))?; + + Ok("OK".into()) }, - "leave" => { + "cursor-start" => { if args.len() < 1 { return Err(Value::from("[!] no session key")); } - let _request = tonic::Request::new(WorkspaceRequest { - session_key: args[0].to_string(), + let (tx, stream) = mpsc::channel(50); + let mut rx = self.dispatcher.start_cursor_worker( + args[0].as_str().unwrap().to_string(), stream + ).await.map_err(|e| Value::from(e.to_string()))?; + let sink = self.sink.clone(); + sink.lock().await.replace(tx); + let _worker = tokio::spawn(async move { + let mut col : i64; + let mut row : i64 = 0; + let ns = neovim.create_namespace("Cursor").await.unwrap(); + while let Some(update) = rx.recv().await { + neovim.exec_lua(format!("print('{:?}')", update).as_str(), vec![]).await.unwrap(); + let buf = neovim.get_current_buf().await.unwrap(); + buf.clear_namespace(ns, 0, -1).await.unwrap(); + row = update.row as i64; + col = update.col as i64; + buf.add_highlight(ns, "ErrorMsg", row-1, col-1, col).await.unwrap(); + } + sink.lock().await.take(); }); - let mut _c = self.client.clone(); - Err(Value::from("[!] Unimplemented")) - // let resp = c.leave(request).await.unwrap().into_inner(); - // if resp.accepted { - // Ok(Value::from(format!("closed session #{}", resp.session_key))) - // } else { - // Err(Value::from("[!] could not close session")) - // } + Ok("OK".into()) }, - "go" => { - self.live_edit_worker().await; - Ok(Value::from("")) - } _ => { eprintln!("[!] unexpected call"); Ok(Value::from("")) @@ -109,19 +95,31 @@ impl Handler for NeovimHandler { async fn handle_notify( &self, name: String, - _args: Vec, + args: Vec, _neovim: Neovim>, ) { match name.as_ref() { "insert" => {}, + "cursor" => { + if args.len() >= 3 { + if let Some(sink) = self.sink.lock().await.as_ref() { + sink.send(CursorUpdate { + buffer: args[0].as_i64().unwrap(), + row: args[1].as_i64().unwrap(), + col: args[2].as_i64().unwrap(), + username: "root".into() + }).await.unwrap(); + } + } + }, "tick" => eprintln!("tock"), _ => eprintln!("[!] unexpected notify",) } } } -pub async fn run_nvim_client(c: WorkspaceClient) -> Result<(), Box> { - let handler: NeovimHandler = NeovimHandler::new(c); +pub async fn run_nvim_client(dispatcher: Dispatcher) -> Result<(), Box> { + let handler: NeovimHandler = NeovimHandler::new(dispatcher); let (_nvim, io_handler) = new_parent(handler).await; // Any error should probably be logged, as stderr is not visible to users. diff --git a/src/server/actor/workspace.rs b/src/server/actor/workspace.rs index 19c8500..c027139 100644 --- a/src/server/actor/workspace.rs +++ b/src/server/actor/workspace.rs @@ -1,11 +1,13 @@ use std::collections::HashMap; use tokio::sync::{broadcast, mpsc, watch::{self, Ref}}; -use tracing::warn; +use tracing::{warn, info}; use library::{events::Event, user::{User, UserCursor}}; -use super::{buffer::{BufferView, Buffer}, state::{User, UserCursor}}; +use crate::service::workspace::proto::CursorUpdate; + +use super::{buffer::{BufferView, Buffer}}; #[derive(Debug, Clone)] pub struct UsersView { @@ -107,6 +109,8 @@ impl Workspace { w.users_worker(op_usr_rx, users_tx); // spawn worker to handle users w.buffers_worker(op_buf_rx, buffer_tx); // spawn worker to handle buffers + // + info!("new workspace created: {}[{}]", w.name, w.id); return w; } diff --git a/src/server/service/session.rs b/src/server/service/session.rs index bf23f80..3b92d77 100644 --- a/src/server/service/session.rs +++ b/src/server/service/session.rs @@ -16,6 +16,8 @@ use self::proto::session_server::SessionServer; use super::workspace::WorkspaceExtension; +use uuid::Uuid; + #[derive(Debug)] pub struct SessionService { state: Arc, @@ -27,9 +29,8 @@ impl Session for SessionService { &self, req: Request, ) -> Result, Status> { - let name = req.extensions().get::().unwrap().id.clone(); - - let w = Workspace::new(name); + // let name = req.extensions().get::().unwrap().id.clone(); + let w = Workspace::new("im lazy".into()); let res = SessionResponse { accepted:true, session_key: w.id.to_string() }; self.state.view().add(w).await;