feat: controllers now implement stop

Co-authored-by: zaaarf <me@zaaarf.foo>
This commit is contained in:
əlemi 2024-08-08 00:28:15 +02:00
parent cd8f7cd5c5
commit 6e9727128d
Signed by: alemi
GPG key ID: A4895B84D311642C
5 changed files with 60 additions and 72 deletions

View file

@ -54,4 +54,13 @@ pub trait Controller<T : Sized + Send + Sync> : Sized + Send + Sync {
/// attempt to receive a value without blocking, return None if nothing is available /// attempt to receive a value without blocking, return None if nothing is available
fn try_recv(&self) -> Result<Option<T>>; fn try_recv(&self) -> Result<Option<T>>;
/// stop underlying worker
///
/// note that this will mean no more values can be received nor sent,
/// but existing controllers will still be accessible until all are dropped
///
/// returns true if stop signal was sent, false if channel is closed
/// (likely if worker is already stopped)
fn stop(&self) -> bool;
} }

View file

@ -24,36 +24,9 @@ use super::tools::InternallyMutable;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
#[cfg_attr(feature = "python", pyo3::pyclass)] #[cfg_attr(feature = "python", pyo3::pyclass)]
#[cfg_attr(feature = "js", napi_derive::napi)] #[cfg_attr(feature = "js", napi_derive::napi)]
pub struct BufferController(Arc<BufferControllerInner>); pub struct BufferController(pub(crate) Arc<BufferControllerInner>);
#[derive(Debug)]
struct BufferControllerInner {
name: String,
content: watch::Receiver<String>,
seen: InternallyMutable<String>, // internal buffer previous state
operations: mpsc::UnboundedSender<TextChange>,
poller: mpsc::UnboundedSender<oneshot::Sender<()>>,
_stop: Arc<StopOnDrop>, // just exist
}
impl BufferController { impl BufferController {
pub(crate) fn new(
name: String,
content: watch::Receiver<String>,
operations: mpsc::UnboundedSender<TextChange>,
poller: mpsc::UnboundedSender<oneshot::Sender<()>>,
stop: mpsc::UnboundedSender<()>,
) -> Self {
Self(Arc::new(BufferControllerInner {
name,
content,
operations,
poller,
seen: StatusCheck::default(),
_stop: Arc::new(StopOnDrop(stop)),
}))
}
/// unique identifier of buffer /// unique identifier of buffer
pub fn name(&self) -> &str { pub fn name(&self) -> &str {
&self.0.name &self.0.name
@ -61,20 +34,37 @@ impl BufferController {
/// return buffer whole content, updating internal buffer previous state /// return buffer whole content, updating internal buffer previous state
pub fn content(&self) -> String { pub fn content(&self) -> String {
self.0.seen.update(self.0.content.borrow().clone()); self.0.seen.set(self.0.content.borrow().clone());
self.0.content.borrow().clone() self.0.content.borrow().clone()
} }
} }
#[derive(Debug)] #[derive(Debug)]
struct StopOnDrop(mpsc::UnboundedSender<()>); pub(crate) struct BufferControllerInner {
name: String,
content: watch::Receiver<String>,
seen: InternallyMutable<String>, // internal buffer previous state
operations: mpsc::UnboundedSender<TextChange>,
poller: mpsc::UnboundedSender<oneshot::Sender<()>>,
stopper: mpsc::UnboundedSender<()>, // just exist
}
impl Drop for StopOnDrop { impl BufferControllerInner {
fn drop(&mut self) { pub(crate) fn new(
self.0 name: String,
.send(()) content: watch::Receiver<String>,
.unwrap_or_warn("could not send stop message to worker"); operations: mpsc::UnboundedSender<TextChange>,
poller: mpsc::UnboundedSender<oneshot::Sender<()>>,
stop: mpsc::UnboundedSender<()>,
) -> Self {
Self {
name,
content,
operations,
poller,
seen: InternallyMutable::default(), seen: InternallyMutable::default(),
stopper: stop,
}
} }
} }
@ -123,5 +113,7 @@ impl Controller<TextChange> for BufferController {
Ok(self.0.operations.send(op)?) Ok(self.0.operations.send(op)?)
} }
fn stop(&self) -> bool {
self.0.stopper.send(()).is_ok()
} }
} }

View file

@ -1,5 +1,6 @@
use std::collections::hash_map::DefaultHasher; use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher}; use std::hash::{Hash, Hasher};
use std::sync::Arc;
use tokio::sync::{watch, mpsc, oneshot}; use tokio::sync::{watch, mpsc, oneshot};
use tonic::{async_trait, Streaming}; use tonic::{async_trait, Streaming};
@ -12,25 +13,17 @@ use crate::api::controller::ControllerWorker;
use crate::api::TextChange; use crate::api::TextChange;
use codemp_proto::buffer::{BufferEvent, Operation}; use codemp_proto::buffer::{BufferEvent, Operation};
use super::controller::BufferController; use super::controller::{BufferController, BufferControllerInner};
pub(crate) struct BufferWorker { pub(crate) struct BufferWorker {
_user_id: Uuid, _user_id: Uuid,
name: String,
buffer: Woot, buffer: Woot,
content: watch::Sender<String>, content: watch::Sender<String>,
operations: mpsc::UnboundedReceiver<TextChange>, operations: mpsc::UnboundedReceiver<TextChange>,
poller: mpsc::UnboundedReceiver<oneshot::Sender<()>>, poller: mpsc::UnboundedReceiver<oneshot::Sender<()>>,
pollers: Vec<oneshot::Sender<()>>, pollers: Vec<oneshot::Sender<()>>,
handles: ClonableHandlesForController,
stop: mpsc::UnboundedReceiver<()>, stop: mpsc::UnboundedReceiver<()>,
} controller: Arc<BufferControllerInner>,
struct ClonableHandlesForController {
operations: mpsc::UnboundedSender<TextChange>,
poller: mpsc::UnboundedSender<oneshot::Sender<()>>,
stop: mpsc::UnboundedSender<()>,
content: watch::Receiver<String>,
} }
impl BufferWorker { impl BufferWorker {
@ -42,21 +35,22 @@ impl BufferWorker {
let mut hasher = DefaultHasher::new(); let mut hasher = DefaultHasher::new();
user_id.hash(&mut hasher); user_id.hash(&mut hasher);
let site_id = hasher.finish() as usize; let site_id = hasher.finish() as usize;
let controller = BufferControllerInner::new(
path.to_string(),
txt_rx,
op_tx,
poller_tx,
end_tx,
);
BufferWorker { BufferWorker {
_user_id: user_id, _user_id: user_id,
name: path.to_string(),
buffer: Woot::new(site_id % (2<<10), ""), // TODO remove the modulo, only for debugging! buffer: Woot::new(site_id % (2<<10), ""), // TODO remove the modulo, only for debugging!
content: txt_tx, content: txt_tx,
operations: op_rx, operations: op_rx,
poller: poller_rx, poller: poller_rx,
pollers: Vec::new(), pollers: Vec::new(),
handles: ClonableHandlesForController {
operations: op_tx,
poller: poller_tx,
stop: end_tx,
content: txt_rx,
},
stop: end_rx, stop: end_rx,
controller: Arc::new(controller),
} }
} }
} }
@ -68,13 +62,7 @@ impl ControllerWorker<TextChange> for BufferWorker {
type Rx = Streaming<BufferEvent>; type Rx = Streaming<BufferEvent>;
fn subscribe(&self) -> BufferController { fn subscribe(&self) -> BufferController {
BufferController::new( BufferController(self.controller.clone())
self.name.clone(),
self.handles.content.clone(),
self.handles.operations.clone(),
self.handles.poller.clone(),
self.handles.stop.clone(),
)
} }
async fn work(mut self, tx: Self::Tx, mut rx: Self::Rx) { async fn work(mut self, tx: Self::Tx, mut rx: Self::Rx) {

View file

@ -12,10 +12,7 @@ use tokio::sync::{
}; };
use tonic::async_trait; use tonic::async_trait;
use crate::{ use crate::api::{Controller, Cursor};
api::{Controller, Cursor},
errors::IgnorableError,
};
use codemp_proto::cursor::{CursorEvent, CursorPosition}; use codemp_proto::cursor::{CursorEvent, CursorPosition};
/// the cursor controller implementation /// the cursor controller implementation
/// ///
@ -41,15 +38,6 @@ struct CursorControllerInner {
stop: mpsc::UnboundedSender<()>, stop: mpsc::UnboundedSender<()>,
} }
impl Drop for CursorController {
fn drop(&mut self) {
self.0
.stop
.send(())
.unwrap_or_warn("could not stop cursor actor")
}
}
impl CursorController { impl CursorController {
pub(crate) fn new( pub(crate) fn new(
op: mpsc::UnboundedSender<CursorPosition>, op: mpsc::UnboundedSender<CursorPosition>,
@ -114,4 +102,8 @@ impl Controller<Cursor> for CursorController {
async fn poll(&self) -> crate::Result<()> { async fn poll(&self) -> crate::Result<()> {
Ok(self.0.last_op.lock().await.changed().await?) Ok(self.0.last_op.lock().await.changed().await?)
} }
fn stop(&self) -> bool {
self.0.stop.send(()).is_ok()
}
} }

View file

@ -96,6 +96,9 @@ impl Workspace {
} }
WorkspaceEventInner::Delete(FileDelete { path }) => { WorkspaceEventInner::Delete(FileDelete { path }) => {
inner.filetree.remove(&path); inner.filetree.remove(&path);
if let Some((_name, controller)) = inner.buffers.remove(&path) {
controller.stop();
}
} }
} }
}, },
@ -226,6 +229,10 @@ impl Workspace {
})) }))
.await?; .await?;
if let Some((_name, controller)) = self.0.buffers.remove(path) {
controller.stop();
}
self.0.filetree.remove(path); self.0.filetree.remove(path);
Ok(()) Ok(())