feat: super barebones synched cursor across clients

This commit is contained in:
əlemi 2022-10-18 02:22:04 +02:00
parent 60e53b4a94
commit 9bbd30a5f8
7 changed files with 193 additions and 96 deletions

View file

@ -10,7 +10,8 @@ if ! exists('s:jobid')
let s:jobid = 0
endif
let s:bin = "/home/alemi/projects/codemp/target/debug/codemp-client"
" TODO I know I know...
let s:bin = "/home/alemi/projects/codemp/target/debug/client-neovim"
function codemp#init()
let result = s:StartJob()
@ -61,9 +62,7 @@ function s:ConfigureJob(jobid)
autocmd VimLeavePre * :call s:StopJob()
autocmd InsertEnter * :call s:NotifyInsertEnter()
autocmd InsertLeave * :call s:NotifyInsertLeave()
autocmd CursorMoved * :call codemp#cursor()
augroup END
endfunction
@ -73,31 +72,26 @@ function s:NotifyInsertEnter()
endfunction
function s:NotifyInsertLeave()
call rpcnotify(s:jobid, 'insert', 0)
endfunction
function codemp#buffer()
call rpcrequest(s:jobid, "buffer")
endfunction
function codemp#ping()
call rpcrequest(s:jobid, "ping")
endfunction
function codemp#test()
call rpcrequest(s:jobid, "rpc")
call rpcnotify(s:jobid, 'insert', -1)
endfunction
function codemp#create(k)
call rpcrequest(s:jobid, "create", a:k)
let l:sid = rpcrequest(s:jobid, "create", a:k)
echo l:sid
endfunction
function codemp#sync(k)
call rpcrequest(s:jobid, "sync", a:k)
function codemp#join(k)
let l:ret = rpcrequest(s:jobid, "join", a:k)
echo l:ret
endfunction
function codemp#leave(k)
call rpcrequest(s:jobid, "leave", a:k)
function codemp#startcursor(k)
call rpcrequest(s:jobid, "cursor-start", a:k)
endfunction
function codemp#cursor()
let l:position = getpos('.')
call rpcnotify(s:jobid, "cursor", 0, l:position[1], l:position[2])
endfunction
function s:OnStderr(id, data, event) dict

View file

@ -19,11 +19,13 @@ message WorkspaceEvent {
optional string body = 2;
}
// nvim-rs passes everything as i64, so having them as i64 in the packet itself is convenient
// TODO can we make them i32 and save some space?
message CursorUpdate {
string username = 1;
int32 buffer = 2;
int32 col = 3;
int32 row = 4;
int64 buffer = 2;
int64 col = 3;
int64 row = 4;
}
message WorkspaceRequest {

96
src/client/dispatcher.rs Normal file
View file

@ -0,0 +1,96 @@
pub mod proto {
tonic::include_proto!("session");
tonic::include_proto!("workspace");
tonic::include_proto!("buffer");
}
use std::sync::Arc;
use tracing::error;
use tokio::sync::{mpsc, Mutex};
use tokio_stream::StreamExt;
use tokio_stream::wrappers::ReceiverStream;
use proto::{
workspace_client::WorkspaceClient,
session_client::SessionClient,
buffer_client::BufferClient,
WorkspaceBuilderRequest, JoinRequest, SessionResponse, CursorUpdate
};
use tonic::{transport::Channel, Status, Request, Response};
#[derive(Clone)]
pub struct Dispatcher {
name: String,
dp: Arc<Mutex<DispatcherWorker>>, // TODO use channels and don't lock
}
struct DispatcherWorker {
// TODO do I need all three? Did I design the server badly?
session: SessionClient<Channel>,
workspace: WorkspaceClient<Channel>,
_buffers: BufferClient<Channel>,
}
impl Dispatcher {
pub async fn connect(addr:String) -> Result<Dispatcher, tonic::transport::Error> {
let (s, w, b) = tokio::join!(
SessionClient::connect(addr.clone()),
WorkspaceClient::connect(addr.clone()),
BufferClient::connect(addr.clone()),
);
Ok(
Dispatcher {
name: format!("User#{}", rand::random::<u16>()),
dp: Arc::new(
Mutex::new(
DispatcherWorker { session: s?, workspace: w?, _buffers: b? }
)
)
}
)
}
pub async fn create_workspace(&self, name:String) -> Result<Response<SessionResponse>, Status> {
self.dp.lock().await.session.create_workspace(
Request::new(WorkspaceBuilderRequest { name })
).await
}
pub async fn join_workspace(&self, session_id:String) -> Result<(), Status> {
let mut req = Request::new(JoinRequest { name: self.name.clone() });
req.metadata_mut().append("workspace", session_id.parse().unwrap());
let mut stream = self.dp.lock().await.workspace.join(req).await?.into_inner();
let _worker = tokio::spawn(async move {
while let Some(pkt) = stream.next().await {
match pkt {
Ok(_event) => {
// TODO do something with events when they will mean something!
},
Err(e) => error!("Error receiving event | {}", e),
}
}
});
Ok(())
}
pub async fn start_cursor_worker(&self, session_id:String, feed:mpsc::Receiver<CursorUpdate>) -> Result<mpsc::Receiver<CursorUpdate>, Status> {
let mut in_stream = Request::new(ReceiverStream::new(feed));
in_stream.metadata_mut().append("workspace", session_id.parse().unwrap());
let mut stream = self.dp.lock().await.workspace.subscribe(in_stream).await?.into_inner();
let (tx, rx) = mpsc::channel(50);
let _worker = tokio::spawn(async move {
while let Some(pkt) = stream.next().await {
match pkt {
Ok(update) => tx.send(update).await.unwrap(), // TODO how to handle an error here?
Err(e) => error!("Error receiving cursor update | {}", e),
}
}
});
Ok(rx)
}
}

View file

@ -1,14 +1,16 @@
mod nvim;
pub mod proto { tonic::include_proto!("workspace"); }
use proto::workspace_client::WorkspaceClient;
mod nvim;
pub mod dispatcher;
use dispatcher::Dispatcher;
#[tokio::main]
async fn main() -> Result<(), Box<(dyn std::error::Error + 'static)>> {
let client = WorkspaceClient::connect("http://[::1]:50051").await?;
let dispatcher = Dispatcher::connect("http://[::1]:50051".into()).await.unwrap();
#[cfg(feature = "nvim")]
crate::nvim::run_nvim_client(client).await?;
crate::nvim::run_nvim_client(dispatcher).await?;
Ok(())
}

View file

@ -1,27 +1,26 @@
use std::sync::Arc;
use rmpv::Value;
use tokio::io::Stdout;
use nvim_rs::{compat::tokio::Compat, Handler, Neovim};
use nvim_rs::create::tokio::new_parent;
use tonic::transport::Channel;
use tokio::sync::{mpsc, Mutex};
use crate::proto::{WorkspaceRequest, workspace_client::WorkspaceClient};
use crate::dispatcher::{Dispatcher, proto::CursorUpdate};
#[derive(Clone)]
pub struct NeovimHandler {
go: bool,
client: WorkspaceClient<Channel>,
dispatcher: Dispatcher,
sink: Arc<Mutex<Option<mpsc::Sender<CursorUpdate>>>>,
}
impl NeovimHandler {
pub fn new(client: WorkspaceClient<Channel>) -> Self {
NeovimHandler { go: true, client }
}
async fn live_edit_worker(&self) {
while self.go {
pub fn new(dispatcher: Dispatcher) -> Self {
NeovimHandler {
dispatcher,
sink: Arc::new(Mutex::new(None)),
}
}
}
@ -42,63 +41,50 @@ impl Handler for NeovimHandler {
if args.len() < 1 {
return Err(Value::from("[!] no session key"));
}
let res = self.dispatcher.create_workspace(args[0].to_string())
.await
.map_err(|e| Value::from(e.to_string()))?
.into_inner();
Ok(res.session_key.into())
},
"join" => {
if args.len() < 1 {
return Err(Value::from("[!] no session key"));
}
self.dispatcher.join_workspace(
args[0].as_str().unwrap().to_string(), // TODO throw err if it's not a string?
).await.map_err(|e| Value::from(e.to_string()))?;
Ok("OK".into())
},
"cursor-start" => {
if args.len() < 1 {
return Err(Value::from("[!] no session key"));
}
let (tx, stream) = mpsc::channel(50);
let mut rx = self.dispatcher.start_cursor_worker(
args[0].as_str().unwrap().to_string(), stream
).await.map_err(|e| Value::from(e.to_string()))?;
let sink = self.sink.clone();
sink.lock().await.replace(tx);
let _worker = tokio::spawn(async move {
let mut col : i64;
let mut row : i64 = 0;
let ns = neovim.create_namespace("Cursor").await.unwrap();
while let Some(update) = rx.recv().await {
neovim.exec_lua(format!("print('{:?}')", update).as_str(), vec![]).await.unwrap();
let buf = neovim.get_current_buf().await.unwrap();
let _content = buf.get_lines(0, buf.line_count().await.unwrap(), false).await.unwrap().join("\n");
let _request = tonic::Request::new(WorkspaceRequest {
session_key: args[0].to_string(),
});
let mut _c = self.client.clone();
Err(Value::from("[!] Unimplemented"))
// let resp = c.create(request).await.unwrap().into_inner();
// if resp.accepted {
// Ok(Value::from(resp.session_key))
// } else {
// Err(Value::from("[!] rejected"))
// }
},
"sync" => {
if args.len() < 1 {
return Err(Value::from("[!] no session key"));
buf.clear_namespace(ns, 0, -1).await.unwrap();
row = update.row as i64;
col = update.col as i64;
buf.add_highlight(ns, "ErrorMsg", row-1, col-1, col).await.unwrap();
}
let _buf = neovim.get_current_buf().await.unwrap();
let _request = tonic::Request::new(WorkspaceRequest {
session_key: args[0].to_string(),
sink.lock().await.take();
});
let mut _c = self.client.clone();
Err(Value::from("[!] Unimplemented"))
// let resp = c.sync(request).await.unwrap().into_inner();
// if let Some(content) = resp.content {
// buf.set_lines(
// 0,
// buf.line_count().await.unwrap(),
// false,
// content.split("\n").map(|s| s.to_string()).collect()
// ).await.unwrap();
// Ok(Value::from(""))
// } else {
// Err(Value::from("[!] no content"))
// }
Ok("OK".into())
},
"leave" => {
if args.len() < 1 {
return Err(Value::from("[!] no session key"));
}
let _request = tonic::Request::new(WorkspaceRequest {
session_key: args[0].to_string(),
});
let mut _c = self.client.clone();
Err(Value::from("[!] Unimplemented"))
// let resp = c.leave(request).await.unwrap().into_inner();
// if resp.accepted {
// Ok(Value::from(format!("closed session #{}", resp.session_key)))
// } else {
// Err(Value::from("[!] could not close session"))
// }
},
"go" => {
self.live_edit_worker().await;
Ok(Value::from(""))
}
_ => {
eprintln!("[!] unexpected call");
Ok(Value::from(""))
@ -109,19 +95,31 @@ impl Handler for NeovimHandler {
async fn handle_notify(
&self,
name: String,
_args: Vec<Value>,
args: Vec<Value>,
_neovim: Neovim<Compat<Stdout>>,
) {
match name.as_ref() {
"insert" => {},
"cursor" => {
if args.len() >= 3 {
if let Some(sink) = self.sink.lock().await.as_ref() {
sink.send(CursorUpdate {
buffer: args[0].as_i64().unwrap(),
row: args[1].as_i64().unwrap(),
col: args[2].as_i64().unwrap(),
username: "root".into()
}).await.unwrap();
}
}
},
"tick" => eprintln!("tock"),
_ => eprintln!("[!] unexpected notify",)
}
}
}
pub async fn run_nvim_client(c: WorkspaceClient<Channel>) -> Result<(), Box<dyn std::error::Error + 'static>> {
let handler: NeovimHandler = NeovimHandler::new(c);
pub async fn run_nvim_client(dispatcher: Dispatcher) -> Result<(), Box<dyn std::error::Error + 'static>> {
let handler: NeovimHandler = NeovimHandler::new(dispatcher);
let (_nvim, io_handler) = new_parent(handler).await;
// Any error should probably be logged, as stderr is not visible to users.

View file

@ -1,11 +1,13 @@
use std::collections::HashMap;
use tokio::sync::{broadcast, mpsc, watch::{self, Ref}};
use tracing::warn;
use tracing::{warn, info};
use library::{events::Event, user::{User, UserCursor}};
use super::{buffer::{BufferView, Buffer}, state::{User, UserCursor}};
use crate::service::workspace::proto::CursorUpdate;
use super::{buffer::{BufferView, Buffer}};
#[derive(Debug, Clone)]
pub struct UsersView {
@ -107,6 +109,8 @@ impl Workspace {
w.users_worker(op_usr_rx, users_tx); // spawn worker to handle users
w.buffers_worker(op_buf_rx, buffer_tx); // spawn worker to handle buffers
//
info!("new workspace created: {}[{}]", w.name, w.id);
return w;
}

View file

@ -16,6 +16,8 @@ use self::proto::session_server::SessionServer;
use super::workspace::WorkspaceExtension;
use uuid::Uuid;
#[derive(Debug)]
pub struct SessionService {
state: Arc<StateManager>,
@ -27,9 +29,8 @@ impl Session for SessionService {
&self,
req: Request<WorkspaceBuilderRequest>,
) -> Result<Response<SessionResponse>, Status> {
let name = req.extensions().get::<WorkspaceExtension>().unwrap().id.clone();
let w = Workspace::new(name);
// let name = req.extensions().get::<WorkspaceExtension>().unwrap().id.clone();
let w = Workspace::new("im lazy".into());
let res = SessionResponse { accepted:true, session_key: w.id.to_string() };
self.state.view().add(w).await;