chore: finished reimplementing features modularly

now everything that worked in 0.2 seems to work again, and should
actually be better. plus, merging differences is done properly and thus
should be way more reliable
This commit is contained in:
əlemi 2023-04-19 04:18:22 +02:00
parent b8aa7d5fce
commit 3609dbfa84
5 changed files with 158 additions and 87 deletions

View file

@ -49,13 +49,13 @@ local function hook_callbacks(path, buffer)
{ {
callback = function(args) callback = function(args)
local cursor = vim.api.nvim_win_get_cursor(0) local cursor = vim.api.nvim_win_get_cursor(0)
pcall(M.cursor, path, cursor[1], cursor[2]) -- TODO log errors
if cursor[1] == last_line then if cursor[1] == last_line then
return return
end end
last_line = cursor[1] last_line = cursor[1]
local lines = vim.api.nvim_buf_get_lines(args.buf, 0, -1, false) local lines = vim.api.nvim_buf_get_lines(args.buf, 0, -1, false)
pcall(M.replace, path, vim.fn.join(lines, "\n")) -- TODO log errors pcall(M.replace, path, vim.fn.join(lines, "\n")) -- TODO log errors
pcall(M.cursor, path, cursor[1], cursor[2]) -- TODO log errors
end, end,
buffer = buffer, buffer = buffer,
group = codemp_autocmds, group = codemp_autocmds,
@ -111,6 +111,7 @@ vim.api.nvim_create_user_command('Connect',
-- print(vim.fn.join(data, "\n")) -- print(vim.fn.join(data, "\n"))
end, end,
stderr_buffered = false, stderr_buffered = false,
env = { RUST_BACKTRACE = 1 }
} }
) )
if M.jobid <= 0 then if M.jobid <= 0 then

View file

@ -1,7 +1,7 @@
use std::sync::Arc;
use std::{net::TcpStream, sync::Mutex, collections::BTreeMap}; use std::{net::TcpStream, sync::Mutex, collections::BTreeMap};
use codemp::cursor::CursorController; use codemp::operation::{OperationController, OperationFactory};
use codemp::operation::OperationController;
use codemp::{client::CodempClient, operation::OperationProcessor}; use codemp::{client::CodempClient, operation::OperationProcessor};
use codemp::proto::buffer_client::BufferClient; use codemp::proto::buffer_client::BufferClient;
use rmpv::Value; use rmpv::Value;
@ -16,8 +16,7 @@ use tracing::{error, warn, debug, info};
#[derive(Clone)] #[derive(Clone)]
struct NeovimHandler { struct NeovimHandler {
client: CodempClient, client: CodempClient,
factories: BTreeMap<String, OperationController>, factories: Arc<Mutex<BTreeMap<String, Arc<OperationController>>>>,
cursor: Option<CursorController>,
} }
fn nullable_optional_str(args: &Vec<Value>, index: usize) -> Option<String> { fn nullable_optional_str(args: &Vec<Value>, index: usize) -> Option<String> {
@ -36,6 +35,12 @@ fn default_zero_number(args: &Vec<Value>, index: usize) -> i64 {
nullable_optional_number(args, index).unwrap_or(0) nullable_optional_number(args, index).unwrap_or(0)
} }
impl NeovimHandler {
fn buffer_controller(&self, path: &String) -> Option<Arc<OperationController>> {
Some(self.factories.lock().unwrap().get(path)?.clone())
}
}
#[tonic::async_trait] #[tonic::async_trait]
impl Handler for NeovimHandler { impl Handler for NeovimHandler {
type Writer = Compat<Stdout>; type Writer = Compat<Stdout>;
@ -72,16 +77,18 @@ impl Handler for NeovimHandler {
} }
let path = default_empty_str(&args, 0); let path = default_empty_str(&args, 0);
let txt = default_empty_str(&args, 1); let txt = default_empty_str(&args, 1);
let pos = default_zero_number(&args, 2) as u64; let mut pos = default_zero_number(&args, 2) as i64;
let mut c = self.client.clone();
match c.insert(path, txt, pos).await { if pos <= 0 { pos = 0 } // TODO wtf vim??
Ok(res) => {
match res { match self.buffer_controller(&path) {
true => Ok(Value::Nil), None => Err(Value::from("no controller for given path")),
false => Err(Value::from("rejected")), Some(controller) => {
} match controller.apply(controller.insert(&txt, pos as u64)).await {
},
Err(e) => Err(Value::from(format!("could not send insert: {}", e))), Err(e) => Err(Value::from(format!("could not send insert: {}", e))),
Ok(_res) => Ok(Value::Nil),
}
}
} }
}, },
@ -93,13 +100,12 @@ impl Handler for NeovimHandler {
let pos = default_zero_number(&args, 1) as u64; let pos = default_zero_number(&args, 1) as u64;
let count = default_zero_number(&args, 2) as u64; let count = default_zero_number(&args, 2) as u64;
let mut c = self.client.clone(); match self.buffer_controller(&path) {
match c.delete(path, pos, count).await { None => Err(Value::from("no controller for given path")),
Ok(res) => match res { Some(controller) => match controller.apply(controller.delete(pos, count)).await {
true => Ok(Value::Nil), Err(e) => Err(Value::from(format!("could not send delete: {}", e))),
false => Err(Value::from("rejected")), Ok(_res) => Ok(Value::Nil),
}, }
Err(e) => Err(Value::from(format!("could not send insert: {}", e))),
} }
}, },
@ -110,13 +116,12 @@ impl Handler for NeovimHandler {
let path = default_empty_str(&args, 0); let path = default_empty_str(&args, 0);
let txt = default_empty_str(&args, 1); let txt = default_empty_str(&args, 1);
let mut c = self.client.clone(); match self.buffer_controller(&path) {
match c.replace(path, txt).await { None => Err(Value::from("no controller for given path")),
Ok(res) => match res { Some(controller) => match controller.apply(controller.replace(&txt)).await {
true => Ok(Value::Nil),
false => Err(Value::from("rejected")),
},
Err(e) => Err(Value::from(format!("could not send replace: {}", e))), Err(e) => Err(Value::from(format!("could not send replace: {}", e))),
Ok(_res) => Ok(Value::Nil),
}
} }
}, },
@ -132,63 +137,72 @@ impl Handler for NeovimHandler {
let mut c = self.client.clone(); let mut c = self.client.clone();
let buf = buffer.clone(); match c.attach(path.clone()).await {
match c.attach(path, move |x| {
let lines : Vec<String> = x.split("\n").map(|x| x.to_string()).collect();
let b = buf.clone();
tokio::spawn(async move {
if let Err(e) = b.set_lines(0, -1, false, lines).await {
error!("could not update buffer: {}", e);
}
});
}).await {
Err(e) => Err(Value::from(format!("could not attach to stream: {}", e))), Err(e) => Err(Value::from(format!("could not attach to stream: {}", e))),
Ok(content) => { Ok(controller) => {
let lines : Vec<String> = content.split("\n").map(|x| x.to_string()).collect(); let _controller = controller.clone();
let lines : Vec<String> = _controller.content().split("\n").map(|x| x.to_string()).collect();
match buffer.set_lines(0, -1, false, lines).await {
Err(e) => Err(Value::from(format!("could not sync buffer: {}", e))),
Ok(()) => {
tokio::spawn(async move {
loop {
_controller.wait().await;
let lines : Vec<String> = _controller.content().split("\n").map(|x| x.to_string()).collect();
if let Err(e) = buffer.set_lines(0, -1, false, lines).await { if let Err(e) = buffer.set_lines(0, -1, false, lines).await {
error!("could not update buffer: {}", e); error!("could not update buffer: {}", e);
} }
}
});
self.factories.lock().unwrap().insert(path, controller);
Ok(Value::Nil) Ok(Value::Nil)
}
}
}, },
} }
}, },
"detach" => { "detach" => {
if args.len() < 1 { Err(Value::from("unimplemented! try with :q!"))
return Err(Value::from("no path given")); // if args.len() < 1 {
} // return Err(Value::from("no path given"));
let path = default_empty_str(&args, 0); // }
let mut c = self.client.clone(); // let path = default_empty_str(&args, 0);
c.detach(path); // let mut c = self.client.clone();
Ok(Value::Nil) // c.detach(path);
// Ok(Value::Nil)
}, },
"listen" => { "listen" => {
if args.len() < 1 {
return Err(Value::from("no path given"));
}
let path = default_empty_str(&args, 0);
let mut c = self.client.clone();
let ns = nvim.create_namespace("Cursor").await let ns = nvim.create_namespace("Cursor").await
.map_err(|e| Value::from(format!("could not create namespace: {}", e)))?; .map_err(|e| Value::from(format!("could not create namespace: {}", e)))?;
let buf = nvim.get_current_buf().await let buf = nvim.get_current_buf().await
.map_err(|e| Value::from(format!("could not get current buf: {}", e)))?; .map_err(|e| Value::from(format!("could not get current buf: {}", e)))?;
match c.listen(path, move |cur| { let mut c = self.client.clone();
let _b = buf.clone(); match c.listen().await {
Err(e) => Err(Value::from(format!("could not listen cursors: {}", e))),
Ok(cursor) => {
let mut sub = cursor.sub();
debug!("spawning cursor processing worker");
tokio::spawn(async move { tokio::spawn(async move {
if let Err(e) = _b.clear_namespace(ns, 0, -1).await { loop {
match sub.recv().await {
Err(e) => return error!("error receiving cursor update from controller: {}", e),
Ok((_usr, cur)) => {
if let Err(e) = buf.clear_namespace(ns, 0, -1).await {
error!("could not clear previous cursor highlight: {}", e); error!("could not clear previous cursor highlight: {}", e);
} }
if let Err(e) = _b.add_highlight(ns, "ErrorMsg", cur.row-1, cur.col, cur.col+1).await { if let Err(e) = buf.add_highlight(ns, "ErrorMsg", cur.start.row-1, cur.start.col, cur.start.col+1).await {
error!("could not create highlight for cursor: {}", e); error!("could not create highlight for cursor: {}", e);
} }
}
}
}
}); });
}).await { Ok(Value::Nil)
Ok(()) => Ok(Value::Nil), },
Err(e) => Err(Value::from(format!("could not listen cursors: {}", e))),
} }
}, },
@ -202,8 +216,8 @@ impl Handler for NeovimHandler {
let mut c = self.client.clone(); let mut c = self.client.clone();
match c.cursor(path, row, col).await { match c.cursor(path, row, col).await {
Ok(()) => Ok(Value::Nil), Ok(_) => Ok(Value::Nil),
Err(e) => Err(Value::from(format!("could not send cursor update: {}", e))), Err(e) => Err(Value:: from(format!("could not update cursor: {}", e))),
} }
}, },
@ -262,6 +276,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let handler: NeovimHandler = NeovimHandler { let handler: NeovimHandler = NeovimHandler {
client: client.into(), client: client.into(),
factories: Arc::new(Mutex::new(BTreeMap::new())),
}; };
let (_nvim, io_handler) = create::new_parent(handler).await; let (_nvim, io_handler) = create::new_parent(handler).await;

View file

@ -8,7 +8,7 @@ use uuid::Uuid;
use crate::{ use crate::{
cursor::{CursorController, CursorStorage}, cursor::{CursorController, CursorStorage},
operation::{OperationController, OperationProcessor}, operation::{OperationController, OperationProcessor},
proto::{buffer_client::BufferClient, BufferPayload, OperationRequest}, proto::{buffer_client::BufferClient, BufferPayload, OperationRequest, CursorMov},
}; };
#[derive(Clone)] #[derive(Clone)]
@ -26,7 +26,6 @@ impl From::<BufferClient<Channel>> for CodempClient {
impl CodempClient { impl CodempClient {
pub fn new(id: String, client: BufferClient<Channel>) -> Self { pub fn new(id: String, client: BufferClient<Channel>) -> Self {
CodempClient { id, client } CodempClient { id, client }
} }
pub async fn create(&mut self, path: String, content: Option<String>) -> Result<bool, Status> { pub async fn create(&mut self, path: String, content: Option<String>) -> Result<bool, Status> {
@ -91,7 +90,9 @@ impl CodempClient {
Err(e) => break error!("error deserializing opseq: {}", e), Err(e) => break error!("error deserializing opseq: {}", e),
Ok(v) => match _factory.process(v).await { Ok(v) => match _factory.process(v).await {
Err(e) => break error!("could not apply operation from server: {}", e), Err(e) => break error!("could not apply operation from server: {}", e),
Ok(_txt) => { } Ok(_txt) => {
// send event containing where the change happened
}
} }
}, },
} }
@ -125,4 +126,13 @@ impl CodempClient {
Ok(factory) Ok(factory)
} }
pub async fn cursor(&mut self, path: String, row: i64, col: i64) -> Result<bool, Status> {
let req = CursorMov {
path, row, col,
user: self.id.clone(),
};
let res = self.client.cursor(req).await?.into_inner();
Ok(res.accepted)
}
} }

View file

@ -1,5 +1,8 @@
use std::{collections::HashMap, sync::Mutex}; use std::{collections::HashMap, sync::Mutex};
use tokio::sync::broadcast;
use tracing::{info, error, debug, warn};
use crate::proto::CursorMov; use crate::proto::CursorMov;
/// Note that this differs from any hashmap in its put method: no &mut! /// Note that this differs from any hashmap in its put method: no &mut!
@ -16,10 +19,10 @@ pub trait CursorStorage {
} }
} }
#[derive(Copy, Clone)] #[derive(Copy, Clone, Debug, Default)]
pub struct Position { pub struct Position {
row: i64, pub row: i64,
col: i64, pub col: i64,
} }
impl From::<(i64, i64)> for Position { impl From::<(i64, i64)> for Position {
@ -28,24 +31,52 @@ impl From::<(i64, i64)> for Position {
} }
} }
#[derive(Clone)] #[derive(Clone, Debug, Default)]
pub struct Cursor { pub struct Cursor {
buffer: String, pub buffer: String,
start: Position, pub start: Position,
end: Position, pub end: Position,
} }
pub struct CursorController { pub struct CursorController {
users: Mutex<HashMap<String, Cursor>>, users: Mutex<HashMap<String, Cursor>>,
bus: broadcast::Sender<(String, Cursor)>,
_bus_keepalive: Mutex<broadcast::Receiver<(String, Cursor)>>,
} }
impl CursorController { impl CursorController {
pub fn new() -> Self { pub fn new() -> Self {
CursorController { users: Mutex::new(HashMap::new()) } let (tx, _rx) = broadcast::channel(64);
CursorController {
users: Mutex::new(HashMap::new()),
bus: tx,
_bus_keepalive: Mutex::new(_rx),
}
}
pub fn sub(&self) -> broadcast::Receiver<(String, Cursor)> {
self.bus.subscribe()
} }
} }
impl CursorStorage for CursorController { impl CursorStorage for CursorController {
fn update(&self, event: CursorMov) -> Option<Cursor> {
debug!("processing cursor event: {:?}", event);
let mut cur = self.get(&event.user).unwrap_or(Cursor::default());
cur.buffer = event.path;
cur.start = (event.row, event.col).into();
cur.end = (event.row, event.col).into();
self.put(event.user.clone(), cur.clone());
if let Err(e) = self.bus.send((event.user, cur.clone())) {
error!("could not broadcast cursor event: {}", e);
} else { // this is because once there are no receivers, nothing else can be sent
if let Err(e) = self._bus_keepalive.lock().unwrap().try_recv() {
warn!("could not consume event: {}", e);
}
}
Some(cur)
}
fn get(&self, id: &String) -> Option<Cursor> { fn get(&self, id: &String) -> Option<Cursor> {
Some(self.users.lock().unwrap().get(id)?.clone()) Some(self.users.lock().unwrap().get(id)?.clone())
} }

View file

@ -1,8 +1,7 @@
use std::{sync::{Mutex, Arc}, collections::VecDeque}; use std::{sync::Mutex, collections::VecDeque};
use operational_transform::{OperationSeq, OTError}; use operational_transform::{OperationSeq, OTError};
use tokio::sync::{watch, oneshot, mpsc}; use tokio::sync::watch;
use tracing::error;
use crate::operation::factory::OperationFactory; use crate::operation::factory::OperationFactory;
@ -14,6 +13,7 @@ pub trait OperationProcessor : OperationFactory{
async fn poll(&self) -> Option<OperationSeq>; async fn poll(&self) -> Option<OperationSeq>;
async fn ack(&self) -> Option<OperationSeq>; async fn ack(&self) -> Option<OperationSeq>;
async fn wait(&self);
} }
@ -22,16 +22,21 @@ pub struct OperationController {
queue: Mutex<VecDeque<OperationSeq>>, queue: Mutex<VecDeque<OperationSeq>>,
last: Mutex<watch::Receiver<OperationSeq>>, last: Mutex<watch::Receiver<OperationSeq>>,
notifier: watch::Sender<OperationSeq>, notifier: watch::Sender<OperationSeq>,
changed: Mutex<watch::Receiver<()>>,
changed_notifier: watch::Sender<()>,
} }
impl OperationController { impl OperationController {
pub fn new(content: String) -> Self { pub fn new(content: String) -> Self {
let (tx, rx) = watch::channel(OperationSeq::default()); let (tx, rx) = watch::channel(OperationSeq::default());
let (done, wait) = watch::channel(());
OperationController { OperationController {
text: Mutex::new(content), text: Mutex::new(content),
queue: Mutex::new(VecDeque::new()), queue: Mutex::new(VecDeque::new()),
last: Mutex::new(rx), last: Mutex::new(rx),
notifier: tx, notifier: tx,
changed: Mutex::new(wait),
changed_notifier: done,
} }
} }
} }
@ -49,10 +54,16 @@ impl OperationProcessor for OperationController {
let res = op.apply(&txt)?; let res = op.apply(&txt)?;
*self.text.lock().unwrap() = res.clone(); *self.text.lock().unwrap() = res.clone();
self.queue.lock().unwrap().push_back(op.clone()); self.queue.lock().unwrap().push_back(op.clone());
self.notifier.send(op).unwrap(); self.notifier.send(op);
Ok(res) Ok(res)
} }
async fn wait(&self) {
let mut blocker = self.changed.lock().unwrap().clone();
blocker.changed().await;
blocker.changed().await;
}
async fn process(&self, mut op: OperationSeq) -> Result<String, OTError> { async fn process(&self, mut op: OperationSeq) -> Result<String, OTError> {
let mut queue = self.queue.lock().unwrap(); let mut queue = self.queue.lock().unwrap();
for el in queue.iter_mut() { for el in queue.iter_mut() {
@ -61,6 +72,7 @@ impl OperationProcessor for OperationController {
let txt = self.content(); let txt = self.content();
let res = op.apply(&txt)?; let res = op.apply(&txt)?;
*self.text.lock().unwrap() = res.clone(); *self.text.lock().unwrap() = res.clone();
self.changed_notifier.send(());
Ok(res) Ok(res)
} }
@ -68,7 +80,9 @@ impl OperationProcessor for OperationController {
let len = self.queue.lock().unwrap().len(); let len = self.queue.lock().unwrap().len();
if len <= 0 { if len <= 0 {
let mut recv = self.last.lock().unwrap().clone(); let mut recv = self.last.lock().unwrap().clone();
recv.changed().await.unwrap(); // TODO this is not 100% reliable
recv.changed().await; // acknowledge current state
recv.changed().await; // wait for a change in state
} }
Some(self.queue.lock().unwrap().get(0)?.clone()) Some(self.queue.lock().unwrap().get(0)?.clone())
} }