feat: cleaner way to detach and stop workers

actually the stopping channel doesn't fit super well inside the
OperationController itself since the tasks are handled above that
abstraction layer, but storing it inside makes my life incredibly
simpler so im gonna do that for now
This commit is contained in:
əlemi 2023-04-20 03:47:35 +02:00
parent 1bde0d414e
commit ebf25fee44
4 changed files with 49 additions and 15 deletions

View file

@ -69,7 +69,6 @@ local function unhook_callbacks(buffer)
vim.api.nvim_clear_autocmds({ group = codemp_autocmds, buffer = buffer }) vim.api.nvim_clear_autocmds({ group = codemp_autocmds, buffer = buffer })
vim.keymap.del('i', '<BS>', { buffer = buffer }) vim.keymap.del('i', '<BS>', { buffer = buffer })
vim.keymap.del('i', '<Del>', { buffer = buffer }) vim.keymap.del('i', '<Del>', { buffer = buffer })
vim.keymap.del('i', '<CR>', { buffer = buffer })
end end
local function auto_address(addr) local function auto_address(addr)
@ -162,8 +161,12 @@ vim.api.nvim_create_user_command('Join',
vim.api.nvim_create_user_command('Detach', vim.api.nvim_create_user_command('Detach',
function(args) function(args)
local bufnr = vim.api.nvim_get_current_buf() local bufnr = vim.api.nvim_get_current_buf()
if M.detach(args.fargs[1]) then
unhook_callbacks(bufnr) unhook_callbacks(bufnr)
M.detach(args.fargs[1]) print("[/] detached from buffer")
else
print("[!] error detaching from buffer")
end
end, end,
{ nargs=1 }) { nargs=1 })

View file

@ -147,6 +147,7 @@ impl Handler for NeovimHandler {
Ok(()) => { Ok(()) => {
tokio::spawn(async move { tokio::spawn(async move {
loop { loop {
if !_controller.run() { break }
let _span = _controller.wait().await; let _span = _controller.wait().await;
// TODO only change lines affected! // TODO only change lines affected!
let lines : Vec<String> = _controller.content().split("\n").map(|x| x.to_string()).collect(); let lines : Vec<String> = _controller.content().split("\n").map(|x| x.to_string()).collect();
@ -164,17 +165,26 @@ impl Handler for NeovimHandler {
}, },
"detach" => { "detach" => {
Err(Value::from("unimplemented! try with :q!")) if args.len() < 1 {
// if args.len() < 1 { return Err(Value::from("no path given"));
// return Err(Value::from("no path given")); }
// } let path = default_empty_str(&args, 0);
// let path = default_empty_str(&args, 0); match self.buffer_controller(&path) {
// let mut c = self.client.clone(); None => Err(Value::from("no controller for given path")),
// c.detach(path); Some(controller) => Ok(Value::from(controller.stop())),
// Ok(Value::Nil) }
}, },
"listen" => { "listen" => {
if args.len() < 1 {
return Err(Value::from("no path given"));
}
let path = default_empty_str(&args, 0);
let controller = match self.buffer_controller(&path) {
None => return Err(Value::from("no controller for given path")),
Some(c) => c,
};
let ns = nvim.create_namespace("Cursor").await let ns = nvim.create_namespace("Cursor").await
.map_err(|e| Value::from(format!("could not create namespace: {}", e)))?; .map_err(|e| Value::from(format!("could not create namespace: {}", e)))?;
@ -189,6 +199,7 @@ impl Handler for NeovimHandler {
debug!("spawning cursor processing worker"); debug!("spawning cursor processing worker");
tokio::spawn(async move { tokio::spawn(async move {
loop { loop {
if !controller.run() { break }
match sub.recv().await { match sub.recv().await {
Err(e) => return error!("error receiving cursor update from controller: {}", e), Err(e) => return error!("error receiving cursor update from controller: {}", e),
Ok((_usr, cur)) => { Ok((_usr, cur)) => {

View file

@ -2,12 +2,12 @@ use std::sync::Arc;
use operational_transform::OperationSeq; use operational_transform::OperationSeq;
use tonic::{transport::Channel, Status}; use tonic::{transport::Channel, Status};
use tracing::{error, warn}; use tracing::{error, warn, info};
use uuid::Uuid; use uuid::Uuid;
use crate::{ use crate::{
cursor::{CursorController, CursorStorage}, cursor::{CursorController, CursorStorage},
operation::{OperationController, OperationProcessor}, operation::{OperationProcessor, OperationController},
proto::{buffer_client::BufferClient, BufferPayload, OperationRequest, CursorMov}, proto::{buffer_client::BufferClient, BufferPayload, OperationRequest, CursorMov},
}; };
@ -79,11 +79,12 @@ impl CodempClient {
let _factory = factory.clone(); let _factory = factory.clone();
tokio::spawn(async move { tokio::spawn(async move {
loop { loop {
if !_factory.run() { break }
match stream.message().await { match stream.message().await {
Err(e) => break error!("error receiving update: {}", e), Err(e) => break error!("error receiving update: {}", e),
Ok(None) => break, // clean exit Ok(None) => break, // clean exit
Ok(Some(x)) => match serde_json::from_str::<OperationSeq>(&x.opseq) { Ok(Some(x)) => match serde_json::from_str::<OperationSeq>(&x.opseq) {
Err(e) => break error!("error deserializing opseq: {}", e), Err(e) => error!("error deserializing opseq: {}", e),
Ok(v) => match _factory.process(v).await { Ok(v) => match _factory.process(v).await {
Err(e) => break error!("could not apply operation from server: {}", e), Err(e) => break error!("could not apply operation from server: {}", e),
Ok(_range) => { } // user gets this range by awaiting wait() so we can drop it here Ok(_range) => { } // user gets this range by awaiting wait() so we can drop it here
@ -99,6 +100,7 @@ impl CodempClient {
let _path = path.clone(); let _path = path.clone();
tokio::spawn(async move { tokio::spawn(async move {
while let Some(op) = _factory.poll().await { while let Some(op) = _factory.poll().await {
if !_factory.run() { break }
let req = OperationRequest { let req = OperationRequest {
hash: "".into(), hash: "".into(),
opseq: serde_json::to_string(&op).unwrap(), opseq: serde_json::to_string(&op).unwrap(),

View file

@ -2,7 +2,7 @@ use std::{sync::Mutex, collections::VecDeque, ops::Range};
use operational_transform::{OperationSeq, OTError}; use operational_transform::{OperationSeq, OTError};
use tokio::sync::watch; use tokio::sync::watch;
use tracing::warn; use tracing::{warn, error};
use super::{OperationFactory, OperationProcessor, op_effective_range}; use super::{OperationFactory, OperationProcessor, op_effective_range};
@ -14,12 +14,15 @@ pub struct OperationController {
notifier: watch::Sender<OperationSeq>, notifier: watch::Sender<OperationSeq>,
changed: Mutex<watch::Receiver<Range<u64>>>, changed: Mutex<watch::Receiver<Range<u64>>>,
changed_notifier: watch::Sender<Range<u64>>, changed_notifier: watch::Sender<Range<u64>>,
run: watch::Receiver<bool>,
stop: watch::Sender<bool>,
} }
impl OperationController { impl OperationController {
pub fn new(content: String) -> Self { pub fn new(content: String) -> Self {
let (tx, rx) = watch::channel(OperationSeq::default()); let (tx, rx) = watch::channel(OperationSeq::default());
let (done, wait) = watch::channel(0..0); let (done, wait) = watch::channel(0..0);
let (stop, run) = watch::channel(true);
OperationController { OperationController {
text: Mutex::new(content), text: Mutex::new(content),
queue: Mutex::new(VecDeque::new()), queue: Mutex::new(VecDeque::new()),
@ -27,6 +30,7 @@ impl OperationController {
notifier: tx, notifier: tx,
changed: Mutex::new(wait), changed: Mutex::new(wait),
changed_notifier: done, changed_notifier: done,
run, stop,
} }
} }
@ -53,6 +57,20 @@ impl OperationController {
pub async fn ack(&self) -> Option<OperationSeq> { pub async fn ack(&self) -> Option<OperationSeq> {
self.queue.lock().unwrap().pop_front() self.queue.lock().unwrap().pop_front()
} }
pub fn stop(&self) -> bool {
match self.stop.send(false) {
Ok(()) => true,
Err(e) => {
error!("could not send stop signal to workers: {}", e);
false
}
}
}
pub fn run(&self) -> bool {
*self.run.borrow()
}
} }
impl OperationFactory for OperationController { impl OperationFactory for OperationController {