mirror of
https://github.com/hexedtech/codemp.git
synced 2024-11-22 07:14:50 +01:00
fix: use Weak refs to prevent leaky cycles
This commit is contained in:
parent
03cb9a5acf
commit
4d418c814e
6 changed files with 120 additions and 183 deletions
|
@ -5,21 +5,13 @@
|
||||||
|
|
||||||
use crate::errors::ControllerResult;
|
use crate::errors::ControllerResult;
|
||||||
|
|
||||||
pub(crate) trait ControllerWorker<T : Sized + Send + Sync> {
|
|
||||||
type Controller : Controller<T>;
|
|
||||||
type Tx;
|
|
||||||
type Rx;
|
|
||||||
|
|
||||||
fn controller(&self) -> Self::Controller;
|
|
||||||
async fn work(self, tx: Self::Tx, rx: Self::Rx);
|
|
||||||
}
|
|
||||||
|
|
||||||
// note that we don't use thiserror's #[from] because we don't want the error structs to contain
|
// note that we don't use thiserror's #[from] because we don't want the error structs to contain
|
||||||
// these foreign types, and also we want these to be easily constructable
|
// these foreign types, and also we want these to be easily constructable
|
||||||
|
|
||||||
/// Asynchronous and thread-safe handle to a generic bidirectional stream.
|
/// Asynchronous and thread-safe handle to a generic bidirectional stream.
|
||||||
///
|
///
|
||||||
/// This generic trait is implemented by actors managing stream procedures.
|
/// This generic trait is implemented by actors managing stream procedures, and will generally
|
||||||
|
/// imply a background worker.
|
||||||
///
|
///
|
||||||
/// Events can be enqueued for dispatching without blocking with [`Controller::send`].
|
/// Events can be enqueued for dispatching without blocking with [`Controller::send`].
|
||||||
///
|
///
|
||||||
|
@ -27,6 +19,9 @@ pub(crate) trait ControllerWorker<T : Sized + Send + Sync> {
|
||||||
/// provided; if that is not feasible, consider using [`Controller::callback`] or, alternatively,
|
/// provided; if that is not feasible, consider using [`Controller::callback`] or, alternatively,
|
||||||
/// [`Controller::poll`] combined with [`Controller::try_recv`].
|
/// [`Controller::poll`] combined with [`Controller::try_recv`].
|
||||||
///
|
///
|
||||||
|
/// Every [`Controller`]'s worker will stop cleanly when all references to its [`Controller`] have
|
||||||
|
/// been dropped.
|
||||||
|
///
|
||||||
/// [`crate::ext::select_buffer`] may provide a useful helper for managing multiple controllers.
|
/// [`crate::ext::select_buffer`] may provide a useful helper for managing multiple controllers.
|
||||||
#[allow(async_fn_in_trait)]
|
#[allow(async_fn_in_trait)]
|
||||||
#[cfg_attr(feature = "async-trait", async_trait::async_trait)]
|
#[cfg_attr(feature = "async-trait", async_trait::async_trait)]
|
||||||
|
@ -57,15 +52,6 @@ pub trait Controller<T : Sized + Send + Sync> : Sized + Send + Sync {
|
||||||
|
|
||||||
/// Attempt to receive a value, return None if nothing is currently available.
|
/// Attempt to receive a value, return None if nothing is currently available.
|
||||||
async fn try_recv(&self) -> ControllerResult<Option<T>>;
|
async fn try_recv(&self) -> ControllerResult<Option<T>>;
|
||||||
|
|
||||||
/// Stop underlying worker.
|
|
||||||
///
|
|
||||||
/// After this is called, nothing can be received or sent anymore; however, existing
|
|
||||||
/// controllers will still be accessible until all handles are dropped.
|
|
||||||
///
|
|
||||||
/// Returns true if the stop signal was successfully sent, false if channel was
|
|
||||||
/// closed (probably because worker had already been stopped).
|
|
||||||
fn stop(&self) -> bool;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -47,7 +47,6 @@ pub(crate) struct BufferControllerInner {
|
||||||
pub(crate) last_update: InternallyMutable<diamond_types::LocalVersion>,
|
pub(crate) last_update: InternallyMutable<diamond_types::LocalVersion>,
|
||||||
pub(crate) ops_in: mpsc::UnboundedSender<(TextChange, oneshot::Sender<LocalVersion>)>,
|
pub(crate) ops_in: mpsc::UnboundedSender<(TextChange, oneshot::Sender<LocalVersion>)>,
|
||||||
pub(crate) poller: mpsc::UnboundedSender<oneshot::Sender<()>>,
|
pub(crate) poller: mpsc::UnboundedSender<oneshot::Sender<()>>,
|
||||||
pub(crate) stopper: mpsc::UnboundedSender<()>, // just exist
|
|
||||||
pub(crate) content_request: mpsc::Sender<oneshot::Sender<String>>,
|
pub(crate) content_request: mpsc::Sender<oneshot::Sender<String>>,
|
||||||
pub(crate) delta_request: mpsc::Sender<DeltaRequest>,
|
pub(crate) delta_request: mpsc::Sender<DeltaRequest>,
|
||||||
pub(crate) callback: watch::Sender<Option<ControllerCallback<BufferController>>>,
|
pub(crate) callback: watch::Sender<Option<ControllerCallback<BufferController>>>,
|
||||||
|
@ -101,8 +100,4 @@ impl Controller<TextChange> for BufferController {
|
||||||
tracing::warn!("no active buffer worker to clear callback");
|
tracing::warn!("no active buffer worker to clear callback");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn stop(&self) -> bool {
|
|
||||||
self.0.stopper.send(()).is_ok()
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,7 +5,7 @@ use tokio::sync::{mpsc, oneshot, watch};
|
||||||
use tonic::Streaming;
|
use tonic::Streaming;
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
use crate::api::controller::{ControllerCallback, ControllerWorker};
|
use crate::api::controller::ControllerCallback;
|
||||||
use crate::api::TextChange;
|
use crate::api::TextChange;
|
||||||
use crate::ext::{IgnorableError, InternallyMutable};
|
use crate::ext::{IgnorableError, InternallyMutable};
|
||||||
|
|
||||||
|
@ -16,21 +16,21 @@ use super::controller::{BufferController, BufferControllerInner};
|
||||||
pub(crate) type DeltaOp = (LocalVersion, Option<TextChange>);
|
pub(crate) type DeltaOp = (LocalVersion, Option<TextChange>);
|
||||||
pub(crate) type DeltaRequest = (LocalVersion, oneshot::Sender<DeltaOp>);
|
pub(crate) type DeltaRequest = (LocalVersion, oneshot::Sender<DeltaOp>);
|
||||||
|
|
||||||
pub(crate) struct BufferWorker {
|
struct BufferWorker {
|
||||||
user_id: Uuid,
|
user_id: Uuid,
|
||||||
|
path: String,
|
||||||
latest_version: watch::Sender<diamond_types::LocalVersion>,
|
latest_version: watch::Sender<diamond_types::LocalVersion>,
|
||||||
ops_in: mpsc::UnboundedReceiver<(TextChange, oneshot::Sender<LocalVersion>)>,
|
ops_in: mpsc::UnboundedReceiver<(TextChange, oneshot::Sender<LocalVersion>)>,
|
||||||
poller: mpsc::UnboundedReceiver<oneshot::Sender<()>>,
|
poller: mpsc::UnboundedReceiver<oneshot::Sender<()>>,
|
||||||
pollers: Vec<oneshot::Sender<()>>,
|
pollers: Vec<oneshot::Sender<()>>,
|
||||||
content_checkout: mpsc::Receiver<oneshot::Sender<String>>,
|
content_checkout: mpsc::Receiver<oneshot::Sender<String>>,
|
||||||
delta_req: mpsc::Receiver<DeltaRequest>,
|
delta_req: mpsc::Receiver<DeltaRequest>,
|
||||||
stop: mpsc::UnboundedReceiver<()>,
|
controller: std::sync::Weak<BufferControllerInner>,
|
||||||
controller: BufferController,
|
|
||||||
callback: watch::Receiver<Option<ControllerCallback<BufferController>>>,
|
callback: watch::Receiver<Option<ControllerCallback<BufferController>>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl BufferWorker {
|
impl BufferController {
|
||||||
pub fn new(user_id: Uuid, path: &str) -> Self {
|
pub(crate) fn spawn(user_id: Uuid, path: &str, tx: mpsc::Sender<Operation>, rx: Streaming<BufferEvent>) -> Self {
|
||||||
let init = diamond_types::LocalVersion::default();
|
let init = diamond_types::LocalVersion::default();
|
||||||
|
|
||||||
let (latest_version_tx, latest_version_rx) = watch::channel(init.clone());
|
let (latest_version_tx, latest_version_rx) = watch::channel(init.clone());
|
||||||
|
@ -42,67 +42,63 @@ impl BufferWorker {
|
||||||
|
|
||||||
let (poller_tx, poller_rx) = mpsc::unbounded_channel();
|
let (poller_tx, poller_rx) = mpsc::unbounded_channel();
|
||||||
|
|
||||||
let (end_tx, end_rx) = mpsc::unbounded_channel();
|
let controller = Arc::new(BufferControllerInner {
|
||||||
|
|
||||||
let controller = BufferControllerInner {
|
|
||||||
name: path.to_string(),
|
name: path.to_string(),
|
||||||
latest_version: latest_version_rx,
|
latest_version: latest_version_rx,
|
||||||
last_update: InternallyMutable::new(diamond_types::LocalVersion::default()),
|
last_update: InternallyMutable::new(diamond_types::LocalVersion::default()),
|
||||||
ops_in: opin_tx,
|
ops_in: opin_tx,
|
||||||
poller: poller_tx,
|
poller: poller_tx,
|
||||||
stopper: end_tx,
|
|
||||||
content_request: req_tx,
|
content_request: req_tx,
|
||||||
delta_request: recv_tx,
|
delta_request: recv_tx,
|
||||||
callback: cb_tx,
|
callback: cb_tx,
|
||||||
};
|
});
|
||||||
|
|
||||||
BufferWorker {
|
let weak = Arc::downgrade(&controller);
|
||||||
|
|
||||||
|
let worker = BufferWorker {
|
||||||
user_id,
|
user_id,
|
||||||
|
path: path.to_string(),
|
||||||
latest_version: latest_version_tx,
|
latest_version: latest_version_tx,
|
||||||
ops_in: opin_rx,
|
ops_in: opin_rx,
|
||||||
poller: poller_rx,
|
poller: poller_rx,
|
||||||
pollers: Vec::new(),
|
pollers: Vec::new(),
|
||||||
stop: end_rx,
|
controller: weak,
|
||||||
controller: BufferController(Arc::new(controller)),
|
|
||||||
content_checkout: req_rx,
|
content_checkout: req_rx,
|
||||||
delta_req: recv_rx,
|
delta_req: recv_rx,
|
||||||
callback: cb_rx,
|
callback: cb_rx,
|
||||||
}
|
};
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl ControllerWorker<TextChange> for BufferWorker {
|
tokio::spawn(async move {
|
||||||
type Controller = BufferController;
|
BufferController::work(worker, tx, rx).await
|
||||||
type Tx = mpsc::Sender<Operation>;
|
});
|
||||||
type Rx = Streaming<BufferEvent>;
|
|
||||||
|
|
||||||
fn controller(&self) -> BufferController {
|
BufferController(controller)
|
||||||
self.controller.clone()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn work(mut self, tx: Self::Tx, mut rx: Self::Rx) {
|
async fn work(mut worker: BufferWorker, tx: mpsc::Sender<Operation>, mut rx: Streaming<BufferEvent>) {
|
||||||
let mut branch = diamond_types::list::Branch::new();
|
let mut branch = diamond_types::list::Branch::new();
|
||||||
let mut oplog = diamond_types::list::OpLog::new();
|
let mut oplog = diamond_types::list::OpLog::new();
|
||||||
let mut timer = Timer::new(10); // TODO configurable!!
|
let mut timer = Timer::new(10); // TODO configurable!!
|
||||||
|
tracing::debug!("controller worker started");
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
|
if worker.controller.upgrade().is_none() { break };
|
||||||
|
|
||||||
// block until one of these is ready
|
// block until one of these is ready
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
biased;
|
biased;
|
||||||
|
|
||||||
// received stop signal
|
|
||||||
_ = self.stop.recv() => break,
|
|
||||||
|
|
||||||
// received a new poller, add it to collection
|
// received a new poller, add it to collection
|
||||||
res = self.poller.recv() => match res {
|
res = worker.poller.recv() => match res {
|
||||||
None => break tracing::error!("poller channel closed"),
|
None => break tracing::error!("poller channel closed"),
|
||||||
Some(tx) => self.pollers.push(tx),
|
Some(tx) => worker.pollers.push(tx),
|
||||||
},
|
},
|
||||||
|
|
||||||
// received a text change from editor
|
// received a text change from editor
|
||||||
res = self.ops_in.recv() => match res {
|
res = worker.ops_in.recv() => match res {
|
||||||
None => break tracing::debug!("stopping: editor closed channel"),
|
None => break tracing::debug!("stopping: editor closed channel"),
|
||||||
Some((change, ack)) => {
|
Some((change, ack)) => {
|
||||||
let agent_id = oplog.get_or_create_agent_id(&self.user_id.to_string());
|
let agent_id = oplog.get_or_create_agent_id(&worker.user_id.to_string());
|
||||||
let last_ver = oplog.local_version();
|
let last_ver = oplog.local_version();
|
||||||
// clip to buffer extents
|
// clip to buffer extents
|
||||||
let clip_end = std::cmp::min(branch.len(), change.end as usize);
|
let clip_end = std::cmp::min(branch.len(), change.end as usize);
|
||||||
|
@ -120,7 +116,7 @@ impl ControllerWorker<TextChange> for BufferWorker {
|
||||||
if change.is_delete() || change.is_insert() {
|
if change.is_delete() || change.is_insert() {
|
||||||
tx.send(Operation { data: oplog.encode_from(Default::default(), &last_ver) }).await
|
tx.send(Operation { data: oplog.encode_from(Default::default(), &last_ver) }).await
|
||||||
.unwrap_or_warn("failed to send change!");
|
.unwrap_or_warn("failed to send change!");
|
||||||
self.latest_version.send(oplog.local_version())
|
worker.latest_version.send(oplog.local_version())
|
||||||
.unwrap_or_warn("failed to update latest version!");
|
.unwrap_or_warn("failed to update latest version!");
|
||||||
}
|
}
|
||||||
ack.send(branch.local_version()).unwrap_or_warn("controller didn't wait for ack");
|
ack.send(branch.local_version()).unwrap_or_warn("controller didn't wait for ack");
|
||||||
|
@ -129,18 +125,19 @@ impl ControllerWorker<TextChange> for BufferWorker {
|
||||||
|
|
||||||
// received a message from server: add to oplog and update latest version (+unlock pollers)
|
// received a message from server: add to oplog and update latest version (+unlock pollers)
|
||||||
res = rx.message() => match res {
|
res = rx.message() => match res {
|
||||||
Err(_e) => break,
|
Err(e) => break tracing::warn!("error receiving from server for buffer {}: {e}", worker.path),
|
||||||
Ok(None) => break,
|
Ok(None) => break tracing::info!("disconnected from buffer {}", worker.path),
|
||||||
Ok(Some(change)) => {
|
Ok(Some(change)) => match worker.controller.upgrade() {
|
||||||
match oplog.decode_and_add(&change.op.data) {
|
None => break, // clean exit actually, just weird we caught it here
|
||||||
|
Some(controller) => match oplog.decode_and_add(&change.op.data) {
|
||||||
Ok(local_version) => {
|
Ok(local_version) => {
|
||||||
self.latest_version.send(local_version)
|
worker.latest_version.send(local_version)
|
||||||
.unwrap_or_warn("failed to update latest version!");
|
.unwrap_or_warn("failed to update latest version!");
|
||||||
for tx in self.pollers.drain(..) {
|
for tx in worker.pollers.drain(..) {
|
||||||
tx.send(()).unwrap_or_warn("could not wake up poller");
|
tx.send(()).unwrap_or_warn("could not wake up poller");
|
||||||
}
|
}
|
||||||
if let Some(cb) = self.callback.borrow().as_ref() {
|
if let Some(cb) = worker.callback.borrow().as_ref() {
|
||||||
cb.call(self.controller.clone()); // TODO should we run this on another task/thread?
|
cb.call(BufferController(controller)); // TODO should we run this on another task/thread?
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
Err(e) => tracing::error!("could not deserialize operation from server: {}", e),
|
Err(e) => tracing::error!("could not deserialize operation from server: {}", e),
|
||||||
|
@ -149,7 +146,7 @@ impl ControllerWorker<TextChange> for BufferWorker {
|
||||||
},
|
},
|
||||||
|
|
||||||
// controller is ready to apply change and recv(), calculate it and send it back
|
// controller is ready to apply change and recv(), calculate it and send it back
|
||||||
res = self.delta_req.recv() => match res {
|
res = worker.delta_req.recv() => match res {
|
||||||
None => break tracing::error!("no more active controllers: can't send changes"),
|
None => break tracing::error!("no more active controllers: can't send changes"),
|
||||||
Some((last_ver, tx)) => {
|
Some((last_ver, tx)) => {
|
||||||
if let Some((lv, Some(dtop))) = oplog.iter_xf_operations_from(&last_ver, oplog.local_version_ref()).next() {
|
if let Some((lv, Some(dtop))) = oplog.iter_xf_operations_from(&last_ver, oplog.local_version_ref()).next() {
|
||||||
|
@ -194,7 +191,7 @@ impl ControllerWorker<TextChange> for BufferWorker {
|
||||||
},
|
},
|
||||||
|
|
||||||
// received a request for full CRDT content
|
// received a request for full CRDT content
|
||||||
res = self.content_checkout.recv() => match res {
|
res = worker.content_checkout.recv() => match res {
|
||||||
None => break tracing::error!("no more active controllers: can't update content"),
|
None => break tracing::error!("no more active controllers: can't update content"),
|
||||||
Some(tx) => {
|
Some(tx) => {
|
||||||
branch.merge(&oplog, oplog.local_version_ref());
|
branch.merge(&oplog, oplog.local_version_ref());
|
||||||
|
@ -204,6 +201,8 @@ impl ControllerWorker<TextChange> for BufferWorker {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
tracing::debug!("controller worker stopped");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -28,7 +28,6 @@ pub(crate) struct CursorControllerInner {
|
||||||
pub(crate) stream: mpsc::Sender<oneshot::Sender<Option<Cursor>>>,
|
pub(crate) stream: mpsc::Sender<oneshot::Sender<Option<Cursor>>>,
|
||||||
pub(crate) poll: mpsc::UnboundedSender<oneshot::Sender<()>>,
|
pub(crate) poll: mpsc::UnboundedSender<oneshot::Sender<()>>,
|
||||||
pub(crate) callback: watch::Sender<Option<ControllerCallback<CursorController>>>,
|
pub(crate) callback: watch::Sender<Option<ControllerCallback<CursorController>>>,
|
||||||
pub(crate) stop: mpsc::UnboundedSender<()>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg_attr(feature = "async-trait", async_trait::async_trait)]
|
#[cfg_attr(feature = "async-trait", async_trait::async_trait)]
|
||||||
|
@ -81,8 +80,4 @@ impl Controller<Cursor> for CursorController {
|
||||||
tracing::warn!("no active cursor worker to clear callback");
|
tracing::warn!("no active cursor worker to clear callback");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn stop(&self) -> bool {
|
|
||||||
self.0.stop.send(()).is_ok()
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -4,81 +4,74 @@ use tokio::sync::{mpsc, oneshot, watch};
|
||||||
use tonic::Streaming;
|
use tonic::Streaming;
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
|
|
||||||
use crate::{api::{controller::{ControllerCallback, ControllerWorker}, Cursor, User}, ext::IgnorableError};
|
use crate::{api::{controller::ControllerCallback, Cursor, User}, ext::IgnorableError};
|
||||||
use codemp_proto::cursor::{CursorPosition, CursorEvent};
|
use codemp_proto::cursor::{CursorPosition, CursorEvent};
|
||||||
|
|
||||||
use super::controller::{CursorController, CursorControllerInner};
|
use super::controller::{CursorController, CursorControllerInner};
|
||||||
|
|
||||||
pub(crate) struct CursorWorker {
|
struct CursorWorker {
|
||||||
op: mpsc::Receiver<CursorPosition>,
|
op: mpsc::Receiver<CursorPosition>,
|
||||||
map: Arc<dashmap::DashMap<Uuid, User>>,
|
map: Arc<dashmap::DashMap<Uuid, User>>,
|
||||||
stream: mpsc::Receiver<oneshot::Sender<Option<Cursor>>>,
|
stream: mpsc::Receiver<oneshot::Sender<Option<Cursor>>>,
|
||||||
poll: mpsc::UnboundedReceiver<oneshot::Sender<()>>,
|
poll: mpsc::UnboundedReceiver<oneshot::Sender<()>>,
|
||||||
pollers: Vec<oneshot::Sender<()>>,
|
pollers: Vec<oneshot::Sender<()>>,
|
||||||
store: std::collections::VecDeque<Cursor>,
|
store: std::collections::VecDeque<Cursor>,
|
||||||
stop: mpsc::UnboundedReceiver<()>,
|
controller: std::sync::Weak<CursorControllerInner>,
|
||||||
controller: CursorController,
|
|
||||||
callback: watch::Receiver<Option<ControllerCallback<CursorController>>>,
|
callback: watch::Receiver<Option<ControllerCallback<CursorController>>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl CursorWorker {
|
impl CursorController {
|
||||||
pub fn new(user_map: Arc<dashmap::DashMap<Uuid, User>>) -> Self {
|
pub(crate) fn spawn(user_map: Arc<dashmap::DashMap<Uuid, User>>, tx: mpsc::Sender<CursorPosition>, rx: Streaming<CursorEvent>) -> Self {
|
||||||
// TODO we should tweak the channel buffer size to better propagate backpressure
|
// TODO we should tweak the channel buffer size to better propagate backpressure
|
||||||
let (op_tx, op_rx) = mpsc::channel(64);
|
let (op_tx, op_rx) = mpsc::channel(64);
|
||||||
let (stream_tx, stream_rx) = mpsc::channel(1);
|
let (stream_tx, stream_rx) = mpsc::channel(1);
|
||||||
let (end_tx, end_rx) = mpsc::unbounded_channel();
|
|
||||||
let (cb_tx, cb_rx) = watch::channel(None);
|
let (cb_tx, cb_rx) = watch::channel(None);
|
||||||
let (poll_tx, poll_rx) = mpsc::unbounded_channel();
|
let (poll_tx, poll_rx) = mpsc::unbounded_channel();
|
||||||
let controller = CursorControllerInner {
|
let controller = Arc::new(CursorControllerInner {
|
||||||
op: op_tx,
|
op: op_tx,
|
||||||
stream: stream_tx,
|
stream: stream_tx,
|
||||||
stop: end_tx,
|
|
||||||
callback: cb_tx,
|
callback: cb_tx,
|
||||||
poll: poll_tx,
|
poll: poll_tx,
|
||||||
};
|
});
|
||||||
Self {
|
|
||||||
|
let weak = Arc::downgrade(&controller);
|
||||||
|
|
||||||
|
let worker = CursorWorker {
|
||||||
op: op_rx,
|
op: op_rx,
|
||||||
map: user_map,
|
map: user_map,
|
||||||
stream: stream_rx,
|
stream: stream_rx,
|
||||||
store: std::collections::VecDeque::default(),
|
store: std::collections::VecDeque::default(),
|
||||||
stop: end_rx,
|
controller: weak,
|
||||||
controller: CursorController(Arc::new(controller)),
|
|
||||||
callback: cb_rx,
|
callback: cb_rx,
|
||||||
poll: poll_rx,
|
poll: poll_rx,
|
||||||
pollers: Vec::new(),
|
pollers: Vec::new(),
|
||||||
}
|
};
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl ControllerWorker<Cursor> for CursorWorker {
|
tokio::spawn(async move { CursorController::work(worker, tx, rx).await });
|
||||||
type Controller = CursorController;
|
|
||||||
type Tx = mpsc::Sender<CursorPosition>;
|
|
||||||
type Rx = Streaming<CursorEvent>;
|
|
||||||
|
|
||||||
fn controller(&self) -> CursorController {
|
CursorController(controller)
|
||||||
self.controller.clone()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn work(mut self, tx: Self::Tx, mut rx: Self::Rx) {
|
async fn work(mut worker: CursorWorker, tx: mpsc::Sender<CursorPosition>, mut rx: Streaming<CursorEvent>) {
|
||||||
loop {
|
loop {
|
||||||
tracing::debug!("cursor worker polling");
|
tracing::debug!("cursor worker polling");
|
||||||
|
if worker.controller.upgrade().is_none() { break }; // clean exit: all controllers dropped
|
||||||
tokio::select!{
|
tokio::select!{
|
||||||
biased;
|
biased;
|
||||||
|
|
||||||
// received stop signal
|
|
||||||
Some(()) = self.stop.recv() => { break; },
|
|
||||||
|
|
||||||
// new poller
|
// new poller
|
||||||
Some(poller) = self.poll.recv() => self.pollers.push(poller),
|
Some(poller) = worker.poll.recv() => worker.pollers.push(poller),
|
||||||
|
|
||||||
// client moved their cursor
|
// client moved their cursor
|
||||||
Some(op) = self.op.recv() => {
|
Some(op) = worker.op.recv() => {
|
||||||
tracing::debug!("received cursor from editor");
|
tracing::debug!("received cursor from editor");
|
||||||
tx.send(op).await.unwrap_or_warn("could not update cursor");
|
tx.send(op).await.unwrap_or_warn("could not update cursor");
|
||||||
},
|
},
|
||||||
|
|
||||||
// server sents us a cursor
|
// server sents us a cursor
|
||||||
Ok(Some(cur)) = rx.message() => {
|
Ok(Some(cur)) = rx.message() => match worker.controller.upgrade() {
|
||||||
|
None => break, // clean exit, just weird that we got it here
|
||||||
|
Some(controller) => {
|
||||||
tracing::debug!("received cursor from server");
|
tracing::debug!("received cursor from server");
|
||||||
let mut cursor = Cursor {
|
let mut cursor = Cursor {
|
||||||
buffer: cur.position.buffer.path,
|
buffer: cur.position.buffer.path,
|
||||||
|
@ -87,21 +80,22 @@ impl ControllerWorker<Cursor> for CursorWorker {
|
||||||
user: None,
|
user: None,
|
||||||
};
|
};
|
||||||
let user_id = Uuid::from(cur.user);
|
let user_id = Uuid::from(cur.user);
|
||||||
if let Some(user) = self.map.get(&user_id) {
|
if let Some(user) = worker.map.get(&user_id) {
|
||||||
cursor.user = Some(user.name.clone());
|
cursor.user = Some(user.name.clone());
|
||||||
}
|
}
|
||||||
self.store.push_back(cursor);
|
worker.store.push_back(cursor);
|
||||||
for tx in self.pollers.drain(..) {
|
for tx in worker.pollers.drain(..) {
|
||||||
tx.send(()).unwrap_or_warn("poller dropped before unblocking");
|
tx.send(()).unwrap_or_warn("poller dropped before unblocking");
|
||||||
}
|
}
|
||||||
if let Some(cb) = self.callback.borrow().as_ref() {
|
if let Some(cb) = worker.callback.borrow().as_ref() {
|
||||||
tracing::debug!("running cursor callback");
|
tracing::debug!("running cursor callback");
|
||||||
cb.call(self.controller.clone()); // TODO should this run in its own task/thread?
|
cb.call(CursorController(controller)); // TODO should this run in its own task/thread?
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
},
|
||||||
|
|
||||||
// client wants to get next cursor event
|
// client wants to get next cursor event
|
||||||
Some(tx) = self.stream.recv() => tx.send(self.store.pop_front())
|
Some(tx) = worker.stream.recv() => tx.send(worker.store.pop_front())
|
||||||
.unwrap_or_warn("client gave up receiving"),
|
.unwrap_or_warn("client gave up receiving"),
|
||||||
|
|
||||||
else => break,
|
else => break,
|
||||||
|
|
|
@ -4,9 +4,8 @@
|
||||||
//! Buffers are typically organized in a filetree-like reminiscent of POSIX filesystems.
|
//! Buffers are typically organized in a filetree-like reminiscent of POSIX filesystems.
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
api::{controller::ControllerWorker, Controller, Event, User},
|
api::{Event, User},
|
||||||
buffer::{self, worker::BufferWorker},
|
buffer, cursor,
|
||||||
cursor::{self, worker::CursorWorker},
|
|
||||||
errors::{ConnectionResult, ControllerResult, RemoteResult},
|
errors::{ConnectionResult, ControllerResult, RemoteResult},
|
||||||
ext::InternallyMutable,
|
ext::InternallyMutable,
|
||||||
network::Services,
|
network::Services,
|
||||||
|
@ -49,9 +48,11 @@ struct WorkspaceInner {
|
||||||
user: User, // TODO back-reference to global user id... needed for buffer controllers
|
user: User, // TODO back-reference to global user id... needed for buffer controllers
|
||||||
cursor: cursor::Controller,
|
cursor: cursor::Controller,
|
||||||
buffers: DashMap<String, buffer::Controller>,
|
buffers: DashMap<String, buffer::Controller>,
|
||||||
|
services: Services,
|
||||||
|
// TODO these two are Arced so that the inner worker can hold them without holding the
|
||||||
|
// WorkspaceInner itself, otherwise its impossible to drop Workspace
|
||||||
filetree: DashSet<String>,
|
filetree: DashSet<String>,
|
||||||
users: Arc<DashMap<Uuid, User>>,
|
users: Arc<DashMap<Uuid, User>>,
|
||||||
services: Services,
|
|
||||||
// TODO can we drop the mutex?
|
// TODO can we drop the mutex?
|
||||||
events: tokio::sync::Mutex<mpsc::UnboundedReceiver<crate::api::Event>>,
|
events: tokio::sync::Mutex<mpsc::UnboundedReceiver<crate::api::Event>>,
|
||||||
}
|
}
|
||||||
|
@ -79,13 +80,7 @@ impl Workspace {
|
||||||
|
|
||||||
let users = Arc::new(DashMap::default());
|
let users = Arc::new(DashMap::default());
|
||||||
|
|
||||||
let worker = CursorWorker::new(users.clone());
|
let controller = cursor::Controller::spawn(users.clone(), tx, cur_stream);
|
||||||
let controller = worker.controller();
|
|
||||||
tokio::spawn(async move {
|
|
||||||
tracing::debug!("controller worker started");
|
|
||||||
worker.work(tx, cur_stream).await;
|
|
||||||
tracing::debug!("controller worker stopped");
|
|
||||||
});
|
|
||||||
|
|
||||||
let ws = Self(Arc::new(WorkspaceInner {
|
let ws = Self(Arc::new(WorkspaceInner {
|
||||||
name,
|
name,
|
||||||
|
@ -141,14 +136,7 @@ impl Workspace {
|
||||||
);
|
);
|
||||||
let stream = self.0.services.buf().attach(req).await?.into_inner();
|
let stream = self.0.services.buf().attach(req).await?.into_inner();
|
||||||
|
|
||||||
let worker = BufferWorker::new(self.0.user.id, path);
|
let controller = buffer::Controller::spawn(self.0.user.id, path, tx, stream);
|
||||||
let controller = worker.controller();
|
|
||||||
tokio::spawn(async move {
|
|
||||||
tracing::debug!("controller worker started");
|
|
||||||
worker.work(tx, stream).await;
|
|
||||||
tracing::debug!("controller worker stopped");
|
|
||||||
});
|
|
||||||
|
|
||||||
self.0.buffers.insert(path.to_string(), controller.clone());
|
self.0.buffers.insert(path.to_string(), controller.clone());
|
||||||
|
|
||||||
Ok(controller)
|
Ok(controller)
|
||||||
|
@ -156,18 +144,19 @@ impl Workspace {
|
||||||
|
|
||||||
/// Detach from an active buffer.
|
/// Detach from an active buffer.
|
||||||
///
|
///
|
||||||
/// This option will be carried in background. BufferWorker will be stopped and dropped.
|
/// This will stop and drop its [`buffer::Controller`].
|
||||||
/// There may still be some events enqueued in buffers to poll, but the [buffer::Controller] itself won't be
|
///
|
||||||
/// accessible anymore from [`Workspace`].
|
/// Returns `true` if connectly dropped or wasn't present, `false` if dropped but wasn't last ref
|
||||||
pub fn detach(&self, path: &str) -> DetachResult {
|
///
|
||||||
|
/// If this method returns `false` you have a dangling ref, maybe just waiting for garbage
|
||||||
|
/// collection or maybe preventing the controller from being dropped completely
|
||||||
|
#[allow(clippy::redundant_pattern_matching)] // all cases are clearer this way
|
||||||
|
pub fn detach(&self, path: &str) -> bool {
|
||||||
match self.0.buffers.remove(path) {
|
match self.0.buffers.remove(path) {
|
||||||
None => DetachResult::NotAttached,
|
None => true, // noop: we werent attached in the first place
|
||||||
Some((_name, controller)) => {
|
Some((_name, controller)) => match Arc::into_inner(controller.0) {
|
||||||
if controller.stop() {
|
None => false, // dangling ref! we can't drop this
|
||||||
DetachResult::Detaching
|
Some(_) => true, // dropping it now
|
||||||
} else {
|
|
||||||
DetachResult::AlreadyDetached
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -241,6 +230,8 @@ impl Workspace {
|
||||||
|
|
||||||
/// Delete a buffer.
|
/// Delete a buffer.
|
||||||
pub async fn delete(&self, path: &str) -> RemoteResult<()> {
|
pub async fn delete(&self, path: &str) -> RemoteResult<()> {
|
||||||
|
self.detach(path); // just in case
|
||||||
|
|
||||||
let mut workspace_client = self.0.services.ws();
|
let mut workspace_client = self.0.services.ws();
|
||||||
workspace_client
|
workspace_client
|
||||||
.delete_buffer(tonic::Request::new(BufferNode {
|
.delete_buffer(tonic::Request::new(BufferNode {
|
||||||
|
@ -248,9 +239,6 @@ impl Workspace {
|
||||||
}))
|
}))
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
if let Some((_name, controller)) = self.0.buffers.remove(path) {
|
|
||||||
controller.stop();
|
|
||||||
}
|
|
||||||
|
|
||||||
self.0.filetree.remove(path);
|
self.0.filetree.remove(path);
|
||||||
|
|
||||||
|
@ -320,17 +308,24 @@ impl Workspace {
|
||||||
tx: mpsc::UnboundedSender<crate::api::Event>,
|
tx: mpsc::UnboundedSender<crate::api::Event>,
|
||||||
) {
|
) {
|
||||||
// TODO for buffer and cursor controller we invoke the tokio::spawn outside, but here inside..?
|
// TODO for buffer and cursor controller we invoke the tokio::spawn outside, but here inside..?
|
||||||
let inner = self.0.clone();
|
let weak = Arc::downgrade(&self.0);
|
||||||
let name = self.id();
|
let name = self.id();
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
loop {
|
loop {
|
||||||
match stream.message().await {
|
// TODO can we stop responsively rather than poll for Arc being dropped?
|
||||||
|
if weak.upgrade().is_none() { break };
|
||||||
|
let Some(res) = tokio::select!(
|
||||||
|
x = stream.message() => Some(x),
|
||||||
|
_ = tokio::time::sleep(std::time::Duration::from_secs(5)) => None,
|
||||||
|
) else { continue };
|
||||||
|
match res {
|
||||||
Err(e) => break tracing::error!("workspace '{}' stream closed: {}", name, e),
|
Err(e) => break tracing::error!("workspace '{}' stream closed: {}", name, e),
|
||||||
Ok(None) => break tracing::info!("leaving workspace {}", name),
|
Ok(None) => break tracing::info!("leaving workspace {}", name),
|
||||||
Ok(Some(WorkspaceEvent { event: None })) => {
|
Ok(Some(WorkspaceEvent { event: None })) => {
|
||||||
tracing::warn!("workspace {} received empty event", name)
|
tracing::warn!("workspace {} received empty event", name)
|
||||||
}
|
}
|
||||||
Ok(Some(WorkspaceEvent { event: Some(ev) })) => {
|
Ok(Some(WorkspaceEvent { event: Some(ev) })) => {
|
||||||
|
let Some(inner) = weak.upgrade() else { break };
|
||||||
let update = crate::api::Event::from(&ev);
|
let update = crate::api::Event::from(&ev);
|
||||||
match ev {
|
match ev {
|
||||||
// user
|
// user
|
||||||
|
@ -350,9 +345,7 @@ impl Workspace {
|
||||||
}
|
}
|
||||||
WorkspaceEventInner::Delete(FileDelete { path }) => {
|
WorkspaceEventInner::Delete(FileDelete { path }) => {
|
||||||
inner.filetree.remove(&path);
|
inner.filetree.remove(&path);
|
||||||
if let Some((_name, controller)) = inner.buffers.remove(&path) {
|
let _ = inner.buffers.remove(&path);
|
||||||
controller.stop();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if tx.send(update).is_err() {
|
if tx.send(update).is_err() {
|
||||||
|
@ -364,28 +357,3 @@ impl Workspace {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Drop for WorkspaceInner {
|
|
||||||
fn drop(&mut self) {
|
|
||||||
for entry in self.buffers.iter() {
|
|
||||||
if !entry.value().stop() {
|
|
||||||
tracing::warn!(
|
|
||||||
"could not stop buffer worker {} for workspace {}",
|
|
||||||
entry.value().path(),
|
|
||||||
self.name
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if !self.cursor.stop() {
|
|
||||||
tracing::warn!("could not stop cursor worker for workspace {}", self.name);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg_attr(any(feature = "py", feature = "py-noabi"), pyo3::pyclass(eq, eq_int))]
|
|
||||||
#[cfg_attr(any(feature = "py", feature = "py-noabi"), derive(PartialEq))]
|
|
||||||
pub enum DetachResult {
|
|
||||||
NotAttached,
|
|
||||||
Detaching,
|
|
||||||
AlreadyDetached,
|
|
||||||
}
|
|
||||||
|
|
Loading…
Reference in a new issue