feat: very crude cursor implementation

This commit is contained in:
əlemi 2023-04-12 03:29:42 +02:00
parent e471a6dbc9
commit 8ca5128ca9
6 changed files with 148 additions and 12 deletions

View file

@ -6,6 +6,8 @@ service Buffer {
rpc Edit (OperationRequest) returns (BufferResponse); rpc Edit (OperationRequest) returns (BufferResponse);
rpc Create (BufferPayload) returns (BufferResponse); rpc Create (BufferPayload) returns (BufferResponse);
rpc Sync (BufferPayload) returns (BufferResponse); rpc Sync (BufferPayload) returns (BufferResponse);
rpc Cursor (CursorMov) returns (BufferResponse);
rpc Listen (BufferPayload) returns (stream CursorMov);
} }
message RawOp { message RawOp {
@ -13,6 +15,13 @@ message RawOp {
string user = 2; string user = 2;
} }
message CursorMov {
string user = 1;
string path = 2;
int64 row = 3;
int64 col = 4;
}
message OperationRequest { message OperationRequest {
string path = 1; string path = 1;
string hash = 2; string hash = 2;

View file

@ -4,8 +4,10 @@ local M = {}
M.jobid = nil M.jobid = nil
M.create = function(path, content) return vim.rpcrequest(M.jobid, "create", path, content) end M.create = function(path, content) return vim.rpcrequest(M.jobid, "create", path, content) end
M.insert = function(path, txt, pos) return vim.rpcrequest(M.jobid, "insert", path, txt, pos) end M.insert = function(path, txt, pos) return vim.rpcrequest(M.jobid, "insert", path, txt, pos) end
M.cursor = function(path, row, col) return vim.rpcrequest(M.jobid, "cursor", path, row, col) end
M.delete = function(path, pos, count) return vim.rpcrequest(M.jobid, "delete", path, pos, count) end M.delete = function(path, pos, count) return vim.rpcrequest(M.jobid, "delete", path, pos, count) end
M.attach = function(path) return vim.rpcrequest(M.jobid, "attach", path) end M.attach = function(path) return vim.rpcrequest(M.jobid, "attach", path) end
M.listen = function(path) return vim.rpcrequest(M.jobid, "listen", path) end
M.detach = function(path) return vim.rpcrequest(M.jobid, "detach", path) end M.detach = function(path) return vim.rpcrequest(M.jobid, "detach", path) end
local function cursor_offset() local function cursor_offset()
@ -24,6 +26,17 @@ local function hook_callbacks(path, buffer)
group = codemp_autocmds, group = codemp_autocmds,
} }
) )
vim.api.nvim_create_autocmd(
{ "CursorMoved" },
{
callback = function(_)
local cursor = vim.api.nvim_win_get_cursor(0)
M.cursor(path, cursor[1], cursor[2])
end,
buffer = buffer,
group = codemp_autocmds,
}
)
vim.keymap.set('i', '<BS>', function() M.delete(path, cursor_offset(), 1) return '<BS>' end, {expr = true, buffer = buffer}) vim.keymap.set('i', '<BS>', function() M.delete(path, cursor_offset(), 1) return '<BS>' end, {expr = true, buffer = buffer})
vim.keymap.set('i', '<Del>', function() M.delete(path, cursor_offset() + 1, 1) return '<Del>' end, {expr = true, buffer = buffer}) vim.keymap.set('i', '<Del>', function() M.delete(path, cursor_offset() + 1, 1) return '<Del>' end, {expr = true, buffer = buffer})
vim.keymap.set('i', '<CR>', function() M.insert(path, "\n", cursor_offset()) return '<CR>'end, {expr = true, buffer = buffer}) vim.keymap.set('i', '<CR>', function() M.insert(path, "\n", cursor_offset()) return '<CR>'end, {expr = true, buffer = buffer})
@ -76,6 +89,7 @@ vim.api.nvim_create_user_command('Share',
M.create(path, vim.fn.join(lines, "\n")) M.create(path, vim.fn.join(lines, "\n"))
hook_callbacks(path, bufnr) hook_callbacks(path, bufnr)
M.attach(path) M.attach(path)
M.listen(path)
end, end,
{ nargs=1 }) { nargs=1 })
@ -85,6 +99,7 @@ vim.api.nvim_create_user_command('Join',
local bufnr = vim.api.nvim_get_current_buf() local bufnr = vim.api.nvim_get_current_buf()
hook_callbacks(path, bufnr) hook_callbacks(path, bufnr)
M.attach(path) M.attach(path)
M.listen(path)
end, end,
{ nargs=1 }) { nargs=1 })

View file

@ -25,11 +25,11 @@ fn default_empty_str(args: &Vec<Value>, index: usize) -> String {
nullable_optional_str(args, index).unwrap_or("".into()) nullable_optional_str(args, index).unwrap_or("".into())
} }
fn nullable_optional_number(args: &Vec<Value>, index: usize) -> Option<u64> { fn nullable_optional_number(args: &Vec<Value>, index: usize) -> Option<i64> {
Some(args.get(index)?.as_u64()?) Some(args.get(index)?.as_i64()?)
} }
fn default_zero_number(args: &Vec<Value>, index: usize) -> u64 { fn default_zero_number(args: &Vec<Value>, index: usize) -> i64 {
nullable_optional_number(args, index).unwrap_or(0) nullable_optional_number(args, index).unwrap_or(0)
} }
@ -69,7 +69,7 @@ impl Handler for NeovimHandler {
} }
let path = default_empty_str(&args, 0); let path = default_empty_str(&args, 0);
let txt = default_empty_str(&args, 1); let txt = default_empty_str(&args, 1);
let pos = default_zero_number(&args, 2); let pos = default_zero_number(&args, 2) as u64;
let mut c = self.client.clone(); let mut c = self.client.clone();
info!("correctly parsed arguments: {} - {} - {}", path, txt, pos); info!("correctly parsed arguments: {} - {} - {}", path, txt, pos);
match c.insert(path, txt, pos).await { match c.insert(path, txt, pos).await {
@ -89,8 +89,8 @@ impl Handler for NeovimHandler {
return Err(Value::from("not enough arguments")); return Err(Value::from("not enough arguments"));
} }
let path = default_empty_str(&args, 0); let path = default_empty_str(&args, 0);
let pos = default_zero_number(&args, 1); let pos = default_zero_number(&args, 1) as u64;
let count = default_zero_number(&args, 2); let count = default_zero_number(&args, 2) as u64;
let mut c = self.client.clone(); let mut c = self.client.clone();
match c.delete(path, pos, count).await { match c.delete(path, pos, count).await {
@ -145,6 +145,53 @@ impl Handler for NeovimHandler {
Ok(Value::Nil) Ok(Value::Nil)
}, },
"listen" => {
if args.len() < 1 {
return Err(Value::from("no path given"));
}
let path = default_empty_str(&args, 0);
let mut c = self.client.clone();
let ns = nvim.create_namespace("Cursor").await
.map_err(|e| Value::from(format!("could not create namespace: {}", e)))?;
let buf = nvim.get_current_buf().await
.map_err(|e| Value::from(format!("could not get current buf: {}", e)))?;
match c.listen(path, move |cur| {
let _b = buf.clone();
tokio::spawn(async move {
if let Err(e) = _b.clear_namespace(ns, 0, -1).await {
error!("could not clear previous cursor highlight: {}", e);
}
if let Err(e) = _b.add_highlight(
ns, "ErrorMsg",
cur.row as i64 - 1, cur.col as i64 - 1, cur.col as i64
).await {
error!("could not create highlight for cursor: {}", e);
}
});
}).await {
Ok(()) => Ok(Value::Nil),
Err(e) => Err(Value::from(format!("could not listen cursors: {}", e))),
}
},
"cursor" => {
if args.len() < 3 {
return Err(Value::from("not enough args"));
}
let path = default_empty_str(&args, 0);
let row = default_zero_number(&args, 1);
let col = default_zero_number(&args, 2);
let mut c = self.client.clone();
match c.cursor(path, row, col).await {
Ok(()) => Ok(Value::Nil),
Err(e) => Err(Value::from(format!("could not send cursor update: {}", e))),
}
},
_ => Err(Value::from("unimplemented")), _ => Err(Value::from("unimplemented")),
} }
} }

View file

@ -6,7 +6,7 @@ use uuid::Uuid;
use crate::{ use crate::{
opfactory::AsyncFactory, opfactory::AsyncFactory,
proto::{buffer_client::BufferClient, BufferPayload, OperationRequest, RawOp}, proto::{buffer_client::BufferClient, BufferPayload, OperationRequest, RawOp, CursorMov},
tonic::{transport::Channel, Status, Streaming}, tonic::{transport::Channel, Status, Streaming},
}; };
@ -89,6 +89,32 @@ impl CodempClient {
} }
} }
pub async fn cursor(&mut self, path: String, row: i64, col: i64) -> Result<(), Status> {
let req = CursorMov {
path, user: self.id.to_string(),
row, col,
};
let _res = self.client.cursor(req).await?.into_inner();
Ok(())
}
pub async fn listen<F>(&mut self, path: String, callback: F) -> Result<(), Status>
where F : Fn(CursorMov) -> () + Send + 'static {
let req = BufferPayload {
path,
content: None,
user: self.id.to_string(),
};
let mut stream = self.client.listen(req).await?.into_inner();
tokio::spawn(async move {
// TODO catch some errors
while let Ok(Some(x)) = stream.message().await {
callback(x)
}
});
Ok(())
}
pub async fn attach<F>(&mut self, path: String, callback: F) -> Result<String, Status> pub async fn attach<F>(&mut self, path: String, callback: F) -> Result<String, Status>
where F : Fn(String) -> () + Send + 'static { where F : Fn(String) -> () + Send + 'static {
let content = self.sync(path.clone()).await?; let content = self.sync(path.clone()).await?;

View file

@ -70,6 +70,7 @@ impl OperationFactory {
pub struct AsyncFactory { pub struct AsyncFactory {
run: watch::Sender<bool>, run: watch::Sender<bool>,
ops: mpsc::Sender<OpMsg>, ops: mpsc::Sender<OpMsg>,
#[allow(unused)] // TODO is this necessary?
content: watch::Receiver<String>, content: watch::Receiver<String>,
} }
@ -123,9 +124,6 @@ impl AsyncFactory {
} }
#[derive(Debug)] #[derive(Debug)]
enum OpMsg { enum OpMsg {
Exec(OpWrapper, oneshot::Sender<Result<OperationSeq, OTError>>), Exec(OpWrapper, oneshot::Sender<Result<OperationSeq, OTError>>),

View file

@ -1,16 +1,17 @@
use std::{pin::Pin, sync::{Arc, RwLock}, collections::HashMap}; use std::{pin::Pin, sync::{Arc, RwLock}, collections::HashMap};
use tokio::sync::mpsc; use tokio::sync::{mpsc, broadcast};
use tonic::{Request, Response, Status}; use tonic::{Request, Response, Status};
use tokio_stream::{Stream, wrappers::ReceiverStream}; // TODO example used this? use tokio_stream::{Stream, wrappers::ReceiverStream}; // TODO example used this?
use codemp::proto::{buffer_server::{Buffer, BufferServer}, RawOp, BufferPayload, BufferResponse, OperationRequest}; use codemp::proto::{buffer_server::{Buffer, BufferServer}, RawOp, BufferPayload, BufferResponse, OperationRequest, CursorMov};
use tracing::info; use tracing::info;
use super::actor::{BufferHandle, BufferStore}; use super::actor::{BufferHandle, BufferStore};
type OperationStream = Pin<Box<dyn Stream<Item = Result<RawOp, Status>> + Send>>; type OperationStream = Pin<Box<dyn Stream<Item = Result<RawOp, Status>> + Send>>;
type CursorStream = Pin<Box<dyn Stream<Item = Result<CursorMov, Status>> + Send>>;
struct BufferMap { struct BufferMap {
store: HashMap<String, BufferHandle>, store: HashMap<String, BufferHandle>,
@ -33,11 +34,22 @@ impl BufferStore<String> for BufferMap {
pub struct BufferService { pub struct BufferService {
map: Arc<RwLock<BufferMap>>, map: Arc<RwLock<BufferMap>>,
cursor: broadcast::Sender<CursorMov>,
}
impl BufferService {
fn get_buffer(&self, path: &String) -> Result<BufferHandle, Status> {
match self.map.read().unwrap().get(path) {
Some(buf) => Ok(buf.clone()),
None => Err(Status::not_found("no buffer for given path")),
}
}
} }
#[tonic::async_trait] #[tonic::async_trait]
impl Buffer for BufferService { impl Buffer for BufferService {
type AttachStream = OperationStream; type AttachStream = OperationStream;
type ListenStream = CursorStream;
async fn attach(&self, req: Request<BufferPayload>) -> Result<tonic::Response<OperationStream>, Status> { async fn attach(&self, req: Request<BufferPayload>) -> Result<tonic::Response<OperationStream>, Status> {
let request = req.into_inner(); let request = req.into_inner();
@ -65,6 +77,33 @@ impl Buffer for BufferService {
} }
} }
async fn listen(&self, req: Request<BufferPayload>) -> Result<tonic::Response<CursorStream>, Status> {
let mut sub = self.cursor.subscribe();
let myself = req.into_inner().user;
let (tx, rx) = mpsc::channel(128);
tokio::spawn(async move {
loop {
match sub.recv().await {
Ok(v) => {
if v.user == myself { continue }
tx.send(Ok(v)).await.unwrap(); // TODO unnecessary channel?
}
Err(_e) => break,
}
}
});
let output_stream = ReceiverStream::new(rx);
info!("registered new subscriber to cursor updates");
Ok(Response::new(Box::pin(output_stream)))
}
async fn cursor(&self, req:Request<CursorMov>) -> Result<Response<BufferResponse>, Status> {
match self.cursor.send(req.into_inner()) {
Ok(_) => Ok(Response::new(BufferResponse { accepted: true, content: None})),
Err(e) => Err(Status::internal(format!("could not broadcast cursor update: {}", e))),
}
}
async fn edit(&self, req:Request<OperationRequest>) -> Result<Response<BufferResponse>, Status> { async fn edit(&self, req:Request<OperationRequest>) -> Result<Response<BufferResponse>, Status> {
let request = req.into_inner(); let request = req.into_inner();
let tx = match self.map.read().unwrap().get(&request.path) { let tx = match self.map.read().unwrap().get(&request.path) {
@ -106,8 +145,10 @@ impl Buffer for BufferService {
impl BufferService { impl BufferService {
pub fn new() -> BufferService { pub fn new() -> BufferService {
let (cur_tx, _cur_rx) = broadcast::channel(64); // TODO hardcoded capacity
BufferService { BufferService {
map: Arc::new(RwLock::new(HashMap::new().into())), map: Arc::new(RwLock::new(HashMap::new().into())),
cursor: cur_tx,
} }
} }