diff --git a/src/client/nvim/codemp.lua b/src/client/nvim/codemp.lua index 3fc6fe4..15df869 100644 --- a/src/client/nvim/codemp.lua +++ b/src/client/nvim/codemp.lua @@ -69,7 +69,6 @@ local function unhook_callbacks(buffer) vim.api.nvim_clear_autocmds({ group = codemp_autocmds, buffer = buffer }) vim.keymap.del('i', '', { buffer = buffer }) vim.keymap.del('i', '', { buffer = buffer }) - vim.keymap.del('i', '', { buffer = buffer }) end local function auto_address(addr) @@ -162,8 +161,12 @@ vim.api.nvim_create_user_command('Join', vim.api.nvim_create_user_command('Detach', function(args) local bufnr = vim.api.nvim_get_current_buf() - unhook_callbacks(bufnr) - M.detach(args.fargs[1]) + if M.detach(args.fargs[1]) then + unhook_callbacks(bufnr) + print("[/] detached from buffer") + else + print("[!] error detaching from buffer") + end end, { nargs=1 }) diff --git a/src/client/nvim/main.rs b/src/client/nvim/main.rs index 4760693..4043fc1 100644 --- a/src/client/nvim/main.rs +++ b/src/client/nvim/main.rs @@ -147,6 +147,7 @@ impl Handler for NeovimHandler { Ok(()) => { tokio::spawn(async move { loop { + if !_controller.run() { break } let _span = _controller.wait().await; // TODO only change lines affected! let lines : Vec = _controller.content().split("\n").map(|x| x.to_string()).collect(); @@ -164,17 +165,26 @@ impl Handler for NeovimHandler { }, "detach" => { - Err(Value::from("unimplemented! try with :q!")) - // if args.len() < 1 { - // return Err(Value::from("no path given")); - // } - // let path = default_empty_str(&args, 0); - // let mut c = self.client.clone(); - // c.detach(path); - // Ok(Value::Nil) + if args.len() < 1 { + return Err(Value::from("no path given")); + } + let path = default_empty_str(&args, 0); + match self.buffer_controller(&path) { + None => Err(Value::from("no controller for given path")), + Some(controller) => Ok(Value::from(controller.stop())), + } }, "listen" => { + if args.len() < 1 { + return Err(Value::from("no path given")); + } + let path = default_empty_str(&args, 0); + let controller = match self.buffer_controller(&path) { + None => return Err(Value::from("no controller for given path")), + Some(c) => c, + }; + let ns = nvim.create_namespace("Cursor").await .map_err(|e| Value::from(format!("could not create namespace: {}", e)))?; @@ -189,6 +199,7 @@ impl Handler for NeovimHandler { debug!("spawning cursor processing worker"); tokio::spawn(async move { loop { + if !controller.run() { break } match sub.recv().await { Err(e) => return error!("error receiving cursor update from controller: {}", e), Ok((_usr, cur)) => { diff --git a/src/lib/client.rs b/src/lib/client.rs index 50d407d..53e79e1 100644 --- a/src/lib/client.rs +++ b/src/lib/client.rs @@ -2,12 +2,12 @@ use std::sync::Arc; use operational_transform::OperationSeq; use tonic::{transport::Channel, Status}; -use tracing::{error, warn}; +use tracing::{error, warn, info}; use uuid::Uuid; use crate::{ cursor::{CursorController, CursorStorage}, - operation::{OperationController, OperationProcessor}, + operation::{OperationProcessor, OperationController}, proto::{buffer_client::BufferClient, BufferPayload, OperationRequest, CursorMov}, }; @@ -79,11 +79,12 @@ impl CodempClient { let _factory = factory.clone(); tokio::spawn(async move { loop { + if !_factory.run() { break } match stream.message().await { Err(e) => break error!("error receiving update: {}", e), Ok(None) => break, // clean exit Ok(Some(x)) => match serde_json::from_str::(&x.opseq) { - Err(e) => break error!("error deserializing opseq: {}", e), + Err(e) => error!("error deserializing opseq: {}", e), Ok(v) => match _factory.process(v).await { Err(e) => break error!("could not apply operation from server: {}", e), Ok(_range) => { } // user gets this range by awaiting wait() so we can drop it here @@ -99,6 +100,7 @@ impl CodempClient { let _path = path.clone(); tokio::spawn(async move { while let Some(op) = _factory.poll().await { + if !_factory.run() { break } let req = OperationRequest { hash: "".into(), opseq: serde_json::to_string(&op).unwrap(), diff --git a/src/lib/operation/controller.rs b/src/lib/operation/controller.rs index be36712..dbe2ce6 100644 --- a/src/lib/operation/controller.rs +++ b/src/lib/operation/controller.rs @@ -2,7 +2,7 @@ use std::{sync::Mutex, collections::VecDeque, ops::Range}; use operational_transform::{OperationSeq, OTError}; use tokio::sync::watch; -use tracing::warn; +use tracing::{warn, error}; use super::{OperationFactory, OperationProcessor, op_effective_range}; @@ -14,12 +14,15 @@ pub struct OperationController { notifier: watch::Sender, changed: Mutex>>, changed_notifier: watch::Sender>, + run: watch::Receiver, + stop: watch::Sender, } impl OperationController { pub fn new(content: String) -> Self { let (tx, rx) = watch::channel(OperationSeq::default()); let (done, wait) = watch::channel(0..0); + let (stop, run) = watch::channel(true); OperationController { text: Mutex::new(content), queue: Mutex::new(VecDeque::new()), @@ -27,6 +30,7 @@ impl OperationController { notifier: tx, changed: Mutex::new(wait), changed_notifier: done, + run, stop, } } @@ -53,6 +57,20 @@ impl OperationController { pub async fn ack(&self) -> Option { self.queue.lock().unwrap().pop_front() } + + pub fn stop(&self) -> bool { + match self.stop.send(false) { + Ok(()) => true, + Err(e) => { + error!("could not send stop signal to workers: {}", e); + false + } + } + } + + pub fn run(&self) -> bool { + *self.run.borrow() + } } impl OperationFactory for OperationController {