From ebf25fee44b3441a8b4864610886bcc24649efb3 Mon Sep 17 00:00:00 2001 From: alemi Date: Thu, 20 Apr 2023 03:47:35 +0200 Subject: [PATCH] feat: cleaner way to detach and stop workers actually the stopping channel doesn't fit super well inside the OperationController itself since the tasks are handled above that abstraction layer, but storing it inside makes my life incredibly simpler so im gonna do that for now --- src/client/nvim/codemp.lua | 9 ++++++--- src/client/nvim/main.rs | 27 +++++++++++++++++++-------- src/lib/client.rs | 8 +++++--- src/lib/operation/controller.rs | 20 +++++++++++++++++++- 4 files changed, 49 insertions(+), 15 deletions(-) diff --git a/src/client/nvim/codemp.lua b/src/client/nvim/codemp.lua index 3fc6fe4..15df869 100644 --- a/src/client/nvim/codemp.lua +++ b/src/client/nvim/codemp.lua @@ -69,7 +69,6 @@ local function unhook_callbacks(buffer) vim.api.nvim_clear_autocmds({ group = codemp_autocmds, buffer = buffer }) vim.keymap.del('i', '', { buffer = buffer }) vim.keymap.del('i', '', { buffer = buffer }) - vim.keymap.del('i', '', { buffer = buffer }) end local function auto_address(addr) @@ -162,8 +161,12 @@ vim.api.nvim_create_user_command('Join', vim.api.nvim_create_user_command('Detach', function(args) local bufnr = vim.api.nvim_get_current_buf() - unhook_callbacks(bufnr) - M.detach(args.fargs[1]) + if M.detach(args.fargs[1]) then + unhook_callbacks(bufnr) + print("[/] detached from buffer") + else + print("[!] error detaching from buffer") + end end, { nargs=1 }) diff --git a/src/client/nvim/main.rs b/src/client/nvim/main.rs index 4760693..4043fc1 100644 --- a/src/client/nvim/main.rs +++ b/src/client/nvim/main.rs @@ -147,6 +147,7 @@ impl Handler for NeovimHandler { Ok(()) => { tokio::spawn(async move { loop { + if !_controller.run() { break } let _span = _controller.wait().await; // TODO only change lines affected! let lines : Vec = _controller.content().split("\n").map(|x| x.to_string()).collect(); @@ -164,17 +165,26 @@ impl Handler for NeovimHandler { }, "detach" => { - Err(Value::from("unimplemented! try with :q!")) - // if args.len() < 1 { - // return Err(Value::from("no path given")); - // } - // let path = default_empty_str(&args, 0); - // let mut c = self.client.clone(); - // c.detach(path); - // Ok(Value::Nil) + if args.len() < 1 { + return Err(Value::from("no path given")); + } + let path = default_empty_str(&args, 0); + match self.buffer_controller(&path) { + None => Err(Value::from("no controller for given path")), + Some(controller) => Ok(Value::from(controller.stop())), + } }, "listen" => { + if args.len() < 1 { + return Err(Value::from("no path given")); + } + let path = default_empty_str(&args, 0); + let controller = match self.buffer_controller(&path) { + None => return Err(Value::from("no controller for given path")), + Some(c) => c, + }; + let ns = nvim.create_namespace("Cursor").await .map_err(|e| Value::from(format!("could not create namespace: {}", e)))?; @@ -189,6 +199,7 @@ impl Handler for NeovimHandler { debug!("spawning cursor processing worker"); tokio::spawn(async move { loop { + if !controller.run() { break } match sub.recv().await { Err(e) => return error!("error receiving cursor update from controller: {}", e), Ok((_usr, cur)) => { diff --git a/src/lib/client.rs b/src/lib/client.rs index 50d407d..53e79e1 100644 --- a/src/lib/client.rs +++ b/src/lib/client.rs @@ -2,12 +2,12 @@ use std::sync::Arc; use operational_transform::OperationSeq; use tonic::{transport::Channel, Status}; -use tracing::{error, warn}; +use tracing::{error, warn, info}; use uuid::Uuid; use crate::{ cursor::{CursorController, CursorStorage}, - operation::{OperationController, OperationProcessor}, + operation::{OperationProcessor, OperationController}, proto::{buffer_client::BufferClient, BufferPayload, OperationRequest, CursorMov}, }; @@ -79,11 +79,12 @@ impl CodempClient { let _factory = factory.clone(); tokio::spawn(async move { loop { + if !_factory.run() { break } match stream.message().await { Err(e) => break error!("error receiving update: {}", e), Ok(None) => break, // clean exit Ok(Some(x)) => match serde_json::from_str::(&x.opseq) { - Err(e) => break error!("error deserializing opseq: {}", e), + Err(e) => error!("error deserializing opseq: {}", e), Ok(v) => match _factory.process(v).await { Err(e) => break error!("could not apply operation from server: {}", e), Ok(_range) => { } // user gets this range by awaiting wait() so we can drop it here @@ -99,6 +100,7 @@ impl CodempClient { let _path = path.clone(); tokio::spawn(async move { while let Some(op) = _factory.poll().await { + if !_factory.run() { break } let req = OperationRequest { hash: "".into(), opseq: serde_json::to_string(&op).unwrap(), diff --git a/src/lib/operation/controller.rs b/src/lib/operation/controller.rs index be36712..dbe2ce6 100644 --- a/src/lib/operation/controller.rs +++ b/src/lib/operation/controller.rs @@ -2,7 +2,7 @@ use std::{sync::Mutex, collections::VecDeque, ops::Range}; use operational_transform::{OperationSeq, OTError}; use tokio::sync::watch; -use tracing::warn; +use tracing::{warn, error}; use super::{OperationFactory, OperationProcessor, op_effective_range}; @@ -14,12 +14,15 @@ pub struct OperationController { notifier: watch::Sender, changed: Mutex>>, changed_notifier: watch::Sender>, + run: watch::Receiver, + stop: watch::Sender, } impl OperationController { pub fn new(content: String) -> Self { let (tx, rx) = watch::channel(OperationSeq::default()); let (done, wait) = watch::channel(0..0); + let (stop, run) = watch::channel(true); OperationController { text: Mutex::new(content), queue: Mutex::new(VecDeque::new()), @@ -27,6 +30,7 @@ impl OperationController { notifier: tx, changed: Mutex::new(wait), changed_notifier: done, + run, stop, } } @@ -53,6 +57,20 @@ impl OperationController { pub async fn ack(&self) -> Option { self.queue.lock().unwrap().pop_front() } + + pub fn stop(&self) -> bool { + match self.stop.send(false) { + Ok(()) => true, + Err(e) => { + error!("could not send stop signal to workers: {}", e); + false + } + } + } + + pub fn run(&self) -> bool { + *self.run.borrow() + } } impl OperationFactory for OperationController {