feat: reworked cursor worker/controller

now its more similar to buffer controller/worker and it behaves more
like an actor/service
This commit is contained in:
əlemi 2024-09-01 03:08:43 +02:00 committed by zaaarf
parent b98be22a8b
commit cf1e910dcb
No known key found for this signature in database
GPG key ID: 102E445F4C3F829B
5 changed files with 58 additions and 37 deletions

View file

@ -7,9 +7,8 @@ use uuid::Uuid;
use crate::api::controller::{ControllerCallback, ControllerWorker}; use crate::api::controller::{ControllerCallback, ControllerWorker};
use crate::api::TextChange; use crate::api::TextChange;
use crate::ext::{IgnorableError, InternallyMutable};
use crate::errors::IgnorableError;
use crate::ext::InternallyMutable;
use codemp_proto::buffer::{BufferEvent, Operation}; use codemp_proto::buffer::{BufferEvent, Operation};
use super::controller::{BufferController, BufferControllerInner}; use super::controller::{BufferController, BufferControllerInner};

View file

@ -3,14 +3,11 @@
//! a controller implementation for cursor actions //! a controller implementation for cursor actions
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::{ use tokio::sync::{mpsc, oneshot, watch};
broadcast::{self, error::TryRecvError},
mpsc, watch, Mutex,
};
use tonic::async_trait; use tonic::async_trait;
use codemp_proto::cursor::{CursorEvent, CursorPosition};
use crate::{api::{controller::ControllerCallback, Controller, Cursor}, errors::ControllerResult}; use crate::{api::{controller::ControllerCallback, Controller, Cursor}, errors::ControllerResult};
use codemp_proto::cursor::CursorPosition;
/// the cursor controller implementation /// the cursor controller implementation
/// ///
/// this contains /// this contains
@ -30,8 +27,8 @@ pub struct CursorController(pub(crate) Arc<CursorControllerInner>);
#[derive(Debug)] #[derive(Debug)]
pub(crate) struct CursorControllerInner { pub(crate) struct CursorControllerInner {
pub(crate) op: mpsc::Sender<CursorPosition>, pub(crate) op: mpsc::Sender<CursorPosition>,
pub(crate) last_op: Mutex<watch::Receiver<CursorEvent>>, pub(crate) stream: mpsc::Sender<oneshot::Sender<Option<Cursor>>>,
pub(crate) stream: Mutex<broadcast::Receiver<CursorEvent>>, pub(crate) poll: mpsc::UnboundedSender<oneshot::Sender<()>>,
pub(crate) callback: watch::Sender<Option<ControllerCallback<CursorController>>>, pub(crate) callback: watch::Sender<Option<ControllerCallback<CursorController>>>,
pub(crate) stop: mpsc::UnboundedSender<()>, pub(crate) stop: mpsc::UnboundedSender<()>,
} }
@ -48,22 +45,18 @@ impl Controller<Cursor> for CursorController {
} }
/// try to receive without blocking, but will still block on stream mutex /// try to receive without blocking, but will still block on stream mutex
let mut stream = self.0.stream.lock().await;
match stream.try_recv() {
Ok(x) => Ok(Some(x.into())),
Err(TryRecvError::Empty) => Ok(None),
Err(TryRecvError::Closed) => Err(crate::Error::Channel { send: false }),
Err(TryRecvError::Lagged(n)) => {
tracing::warn!("cursor channel lagged, skipping {} events", n);
Ok(stream.try_recv().map(|x| x.into()).ok())
}
}
async fn try_recv(&self) -> ControllerResult<Option<Cursor>> { async fn try_recv(&self) -> ControllerResult<Option<Cursor>> {
let (tx, rx) = oneshot::channel();
self.0.stream.send(tx).await?;
Ok(rx.await?)
} }
/// await for changed mutex and then next op change /// await for changed mutex and then next op change
Ok(self.0.last_op.lock().await.changed().await?)
async fn poll(&self) -> ControllerResult<()> { async fn poll(&self) -> ControllerResult<()> {
let (tx, rx) = oneshot::channel();
self.0.poll.send(tx)?;
rx.await?;
Ok(())
} }
fn callback(&self, cb: impl Into<ControllerCallback<CursorController>>) { fn callback(&self, cb: impl Into<ControllerCallback<CursorController>>) {

View file

@ -1,17 +1,19 @@
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::{mpsc, broadcast::{self}, Mutex, watch}; use tokio::sync::{mpsc, oneshot, watch};
use tonic::{Streaming, async_trait}; use tonic::{Streaming, async_trait};
use crate::{api::{controller::{ControllerCallback, ControllerWorker}, Cursor}, errors::IgnorableError}; use crate::{api::{controller::{ControllerCallback, ControllerWorker}, Cursor}, ext::IgnorableError};
use codemp_proto::cursor::{CursorPosition, CursorEvent}; use codemp_proto::cursor::{CursorPosition, CursorEvent};
use super::controller::{CursorController, CursorControllerInner}; use super::controller::{CursorController, CursorControllerInner};
pub(crate) struct CursorWorker { pub(crate) struct CursorWorker {
op: mpsc::Receiver<CursorPosition>, op: mpsc::Receiver<CursorPosition>,
changed: watch::Sender<CursorEvent>, stream: mpsc::Receiver<oneshot::Sender<Option<Cursor>>>,
channel: broadcast::Sender<CursorEvent>, poll: mpsc::UnboundedReceiver<oneshot::Sender<()>>,
pollers: Vec<oneshot::Sender<()>>,
store: std::collections::VecDeque<Cursor>,
stop: mpsc::UnboundedReceiver<()>, stop: mpsc::UnboundedReceiver<()>,
controller: CursorController, controller: CursorController,
callback: watch::Receiver<Option<ControllerCallback<CursorController>>>, callback: watch::Receiver<Option<ControllerCallback<CursorController>>>,
@ -19,25 +21,33 @@ pub(crate) struct CursorWorker {
impl Default for CursorWorker { impl Default for CursorWorker {
fn default() -> Self { fn default() -> Self {
let (op_tx, op_rx) = mpsc::channel(8); Self::new(64)
let (cur_tx, _cur_rx) = broadcast::channel(64); }
}
impl CursorWorker {
fn new(buffer_size: usize) -> Self {
let (op_tx, op_rx) = mpsc::channel(buffer_size);
let (stream_tx, stream_rx) = mpsc::channel(1);
let (end_tx, end_rx) = mpsc::unbounded_channel(); let (end_tx, end_rx) = mpsc::unbounded_channel();
let (change_tx, change_rx) = watch::channel(CursorEvent::default());
let (cb_tx, cb_rx) = watch::channel(None); let (cb_tx, cb_rx) = watch::channel(None);
let (poll_tx, poll_rx) = mpsc::unbounded_channel();
let controller = CursorControllerInner { let controller = CursorControllerInner {
op: op_tx, op: op_tx,
last_op: Mutex::new(change_rx), stream: stream_tx,
stream: Mutex::new(cur_tx.subscribe()),
stop: end_tx, stop: end_tx,
callback: cb_tx, callback: cb_tx,
poll: poll_tx,
}; };
Self { Self {
op: op_rx, op: op_rx,
changed: change_tx, stream: stream_rx,
channel: cur_tx, store: std::collections::VecDeque::default(),
stop: end_rx, stop: end_rx,
controller: CursorController(Arc::new(controller)), controller: CursorController(Arc::new(controller)),
callback: cb_rx, callback: cb_rx,
poll: poll_rx,
pollers: Vec::new(),
} }
} }
} }
@ -54,17 +64,39 @@ impl ControllerWorker<Cursor> for CursorWorker {
async fn work(mut self, tx: Self::Tx, mut rx: Self::Rx) { async fn work(mut self, tx: Self::Tx, mut rx: Self::Rx) {
loop { loop {
tracing::debug!("cursor worker polling");
tokio::select!{ tokio::select!{
biased; biased;
// received stop signal
Some(()) = self.stop.recv() => { break; }, Some(()) = self.stop.recv() => { break; },
Some(op) = self.op.recv() => { tx.send(op).await.unwrap_or_warn("could not update cursor"); },
// new poller
Some(poller) = self.poll.recv() => self.pollers.push(poller),
// client moved their cursor
Some(op) = self.op.recv() => {
tracing::debug!("received cursor from editor");
tx.send(op).await.unwrap_or_warn("could not update cursor");
},
// server sents us a cursor
Ok(Some(cur)) = rx.message() => { Ok(Some(cur)) = rx.message() => {
self.channel.send(cur.clone()).unwrap_or_warn("could not broadcast event"); tracing::debug!("received cursor from server");
self.changed.send(cur).unwrap_or_warn("could not update last event"); self.store.push_back(cur.into());
for tx in self.pollers.drain(..) {
tx.send(()).unwrap_or_warn("poller dropped before unblocking");
}
if let Some(cb) = self.callback.borrow().as_ref() { if let Some(cb) = self.callback.borrow().as_ref() {
tracing::debug!("running cursor callback");
cb.call(self.controller.clone()); // TODO should this run in its own task/thread? cb.call(self.controller.clone()); // TODO should this run in its own task/thread?
} }
}, },
// client wants to get next cursor event
Some(tx) = self.stream.recv() => tx.send(self.store.pop_front())
.unwrap_or_warn("client gave up receiving"),
else => break, else => break,
} }
} }

View file

@ -129,9 +129,6 @@ pub mod buffer;
pub mod workspace; pub mod workspace;
pub use workspace::Workspace; pub use workspace::Workspace;
/// session
pub mod session;
/// codemp client, wrapping all above /// codemp client, wrapping all above
pub mod client; pub mod client;
pub use client::Client; pub use client::Client;

View file