feat: stop worker when dropping controller, unwraps

This commit is contained in:
əlemi 2023-08-19 04:02:21 +02:00
parent 853d754d8b
commit bd6132dc1e
8 changed files with 115 additions and 18 deletions

View file

@ -2,6 +2,7 @@ use operational_transform::OperationSeq;
use tokio::sync::{watch, mpsc, broadcast, Mutex};
use tonic::async_trait;
use crate::errors::IgnorableError;
use crate::{Controller, Error};
use crate::buffer::factory::{leading_noop, tailing_noop, OperationFactory};
@ -11,6 +12,7 @@ pub struct BufferController {
content: watch::Receiver<String>,
operations: mpsc::Sender<OperationSeq>,
stream: Mutex<broadcast::Receiver<OperationSeq>>,
stop: mpsc::UnboundedSender<()>,
}
impl BufferController {
@ -18,8 +20,15 @@ impl BufferController {
content: watch::Receiver<String>,
operations: mpsc::Sender<OperationSeq>,
stream: Mutex<broadcast::Receiver<OperationSeq>>,
stop: mpsc::UnboundedSender<()>,
) -> Self {
BufferController { content, operations, stream }
BufferController { content, operations, stream, stop }
}
}
impl Drop for BufferController {
fn drop(&mut self) {
self.stop.send(()).unwrap_or_warn("could not send stop message to worker");
}
}

View file

@ -5,6 +5,7 @@ pub mod controller;
pub mod factory;
#[derive(Debug)]
pub struct TextChange {
pub span: Range<usize>,
pub content: String,

View file

@ -5,6 +5,7 @@ use tokio::sync::{watch, mpsc, broadcast, Mutex};
use tonic::transport::Channel;
use tonic::{async_trait, Streaming};
use crate::errors::IgnorableError;
use crate::proto::{OperationRequest, RawOp};
use crate::proto::buffer_client::BufferClient;
use crate::ControllerWorker;
@ -23,6 +24,8 @@ pub(crate) struct BufferControllerWorker {
sender: mpsc::Sender<OperationSeq>,
buffer: String,
path: String,
stop: mpsc::UnboundedReceiver<()>,
stop_control: mpsc::UnboundedSender<()>,
}
impl BufferControllerWorker {
@ -30,6 +33,7 @@ impl BufferControllerWorker {
let (txt_tx, txt_rx) = watch::channel(buffer.to_string());
let (op_tx, op_rx) = mpsc::channel(64);
let (s_tx, _s_rx) = broadcast::channel(64);
let (end_tx, end_rx) = mpsc::unbounded_channel();
BufferControllerWorker {
uid,
content: txt_tx,
@ -40,6 +44,8 @@ impl BufferControllerWorker {
queue: VecDeque::new(),
buffer: buffer.to_string(),
path: path.to_string(),
stop: end_rx,
stop_control: end_tx,
}
}
}
@ -55,6 +61,7 @@ impl ControllerWorker<TextChange> for BufferControllerWorker {
self.receiver.clone(),
self.sender.clone(),
Mutex::new(self.stream.subscribe()),
self.stop_control.clone(),
)
}
@ -64,19 +71,31 @@ impl ControllerWorker<TextChange> for BufferControllerWorker {
Some(operation) = recv_opseq(&mut rx) => {
let mut out = operation;
for op in self.queue.iter_mut() {
(*op, out) = op.transform(&out).unwrap();
(*op, out) = match op.transform(&out) {
Ok((x, y)) => (x, y),
Err(e) => {
tracing::warn!("could not transform enqueued operation: {}", e);
break
},
}
}
self.stream.send(out.clone()).unwrap();
self.stream.send(out.clone()).unwrap_or_warn("could not send operation to server");
out
},
Some(op) = self.operations.recv() => {
self.queue.push_back(op.clone());
op
},
Some(()) = self.stop.recv() => {
break;
}
else => break
};
self.buffer = op.apply(&self.buffer).unwrap();
self.content.send(self.buffer.clone()).unwrap();
self.buffer = op.apply(&self.buffer).unwrap_or_else(|e| {
tracing::error!("could not update buffer string: {}", e);
self.buffer
});
self.content.send(self.buffer.clone()).unwrap_or_warn("error showing updated buffer");
while let Some(op) = self.queue.get(0) {
if !send_opseq(&mut tx, self.uid.clone(), self.path.clone(), op.clone()).await { break }
@ -87,11 +106,17 @@ impl ControllerWorker<TextChange> for BufferControllerWorker {
}
async fn send_opseq(tx: &mut BufferClient<Channel>, uid: String, path: String, op: OperationSeq) -> bool {
let opseq = match serde_json::to_string(&op) {
Ok(x) => x,
Err(e) => {
tracing::warn!("could not serialize opseq: {}", e);
return false;
}
};
let req = OperationRequest {
hash: "".into(),
opseq: serde_json::to_string(&op).unwrap(),
path,
user: uid,
opseq, path,
};
match tx.edit(req).await {
Ok(_) => true,
@ -104,7 +129,13 @@ async fn send_opseq(tx: &mut BufferClient<Channel>, uid: String, path: String, o
async fn recv_opseq(rx: &mut Streaming<RawOp>) -> Option<OperationSeq> {
match rx.message().await {
Ok(Some(op)) => Some(serde_json::from_str(&op.opseq).unwrap()),
Ok(Some(op)) => match serde_json::from_str(&op.opseq) {
Ok(x) => Some(x),
Err(e) => {
tracing::warn!("could not deserialize opseq: {}", e);
None
}
},
Ok(None) => None,
Err(e) => {
tracing::error!("could not receive edit from server: {}", e);

View file

@ -41,6 +41,17 @@ impl Client {
Some(self.workspace.as_ref()?.cursor.clone())
}
pub fn leave_workspace(&mut self) {
self.workspace = None
}
pub fn disconnect_buffer(&mut self, path: &str) -> bool {
match &mut self.workspace {
Some(w) => w.buffers.remove(path).is_some(),
None => false,
}
}
pub fn get_buffer(&self, path: &str) -> Option<Arc<BufferController>> {
self.workspace.as_ref()?.buffers.get(path).cloned()
}

View file

@ -1,21 +1,29 @@
use tokio::sync::{mpsc, broadcast::{self, error::RecvError}, Mutex};
use tonic::async_trait;
use crate::{proto::{CursorPosition, CursorEvent}, Error, Controller};
use crate::{proto::{CursorPosition, CursorEvent}, Error, Controller, errors::IgnorableError};
pub struct CursorController {
uid: String,
op: mpsc::Sender<CursorEvent>,
stream: Mutex<broadcast::Receiver<CursorEvent>>,
stop: mpsc::UnboundedSender<()>,
}
impl Drop for CursorController {
fn drop(&mut self) {
self.stop.send(()).unwrap_or_warn("could not stop cursor actor")
}
}
impl CursorController {
pub(crate) fn new(
uid: String,
op: mpsc::Sender<CursorEvent>,
stream: Mutex<broadcast::Receiver<CursorEvent>>
stream: Mutex<broadcast::Receiver<CursorEvent>>,
stop: mpsc::UnboundedSender<()>,
) -> Self {
CursorController { uid, op, stream }
CursorController { uid, op, stream, stop }
}
}

View file

@ -12,17 +12,22 @@ pub(crate) struct CursorControllerWorker {
producer: mpsc::Sender<CursorEvent>,
op: mpsc::Receiver<CursorEvent>,
channel: Arc<broadcast::Sender<CursorEvent>>,
stop: mpsc::UnboundedReceiver<()>,
stop_control: mpsc::UnboundedSender<()>,
}
impl CursorControllerWorker {
pub(crate) fn new(uid: String) -> Self {
let (op_tx, op_rx) = mpsc::channel(64);
let (cur_tx, _cur_rx) = broadcast::channel(64);
let (end_tx, end_rx) = mpsc::unbounded_channel();
Self {
uid,
producer: op_tx,
op: op_rx,
channel: Arc::new(cur_tx),
stop: end_rx,
stop_control: end_tx,
}
}
}
@ -37,7 +42,8 @@ impl ControllerWorker<CursorEvent> for CursorControllerWorker {
CursorController::new(
self.uid.clone(),
self.producer.clone(),
Mutex::new(self.channel.subscribe())
Mutex::new(self.channel.subscribe()),
self.stop_control.clone(),
)
}
@ -46,6 +52,7 @@ impl ControllerWorker<CursorEvent> for CursorControllerWorker {
tokio::select!{
Ok(Some(cur)) = rx.message() => self.channel.send(cur).unwrap_or_warn("could not broadcast event"),
Some(op) = self.op.recv() => { tx.moved(op).await.unwrap_or_warn("could not update cursor"); },
Some(()) = self.stop.recv() => { break; },
else => break,
}
}

View file

@ -4,7 +4,7 @@ use tokio::sync::Mutex;
use crate::{
buffer::controller::BufferController,
errors::Error, client::Client, cursor::controller::CursorController, Controller,
errors::Error, client::Client, cursor::controller::CursorController,
};
@ -82,4 +82,25 @@ impl Instance {
.get_buffer(path)
.ok_or(Error::InvalidState { msg: "join a workspace or create requested buffer first".into() })
}
pub async fn leave_workspace(&self) -> Result<(), Error> {
self.client
.lock()
.await
.as_mut()
.ok_or(Error::InvalidState { msg: "connect first".into() })?
.leave_workspace();
Ok(())
}
pub async fn disconnect_buffer(&self, path: &str) -> Result<bool, Error> {
Ok(
self.client
.lock()
.await
.as_mut()
.ok_or(Error::InvalidState { msg: "connect first".into() })?
.disconnect_buffer(path)
)
}
}

View file

@ -41,13 +41,22 @@ pub trait Controller<T> : Sized + Send + Sync {
async fn send(&self, x: Self::Input) -> Result<(), Error>;
async fn recv(&self) -> Result<T, Error>;
fn callback<F>(self: Arc<Self>, rt: &tokio::runtime::Runtime, mut cb: F)
where Self : 'static, F : FnMut(T) + Sync + Send + 'static
fn callback<F>(
self: Arc<Self>,
rt: &tokio::runtime::Runtime,
mut stop: tokio::sync::mpsc::UnboundedReceiver<()>,
mut cb: F
) where
Self : 'static,
F : FnMut(T) + Sync + Send + 'static
{
let x = Arc::new(self);
rt.spawn(async move {
while let Ok(data) = x.recv().await {
cb(data)
loop {
tokio::select! {
Ok(data) = self.recv() => cb(data),
Some(()) = stop.recv() => break,
else => break,
}
}
});
}