From 38911bdc31990c2053a3bc7ad7e54ec056ba414c Mon Sep 17 00:00:00 2001 From: alemi Date: Tue, 4 Jul 2023 22:54:25 +0200 Subject: [PATCH] feat: reworked cursor handle mechanism instead of storing cursors it just streams them as they happen. instead of just getting cursors from the controller, now you also send your operations into it, mimicking more the behavior used for text ops --- src/client.rs | 41 ++++++------ src/cursor.rs | 179 +++++++++++++++++++++++++++++++++----------------- 2 files changed, 140 insertions(+), 80 deletions(-) diff --git a/src/client.rs b/src/client.rs index f8d68d8..105c458 100644 --- a/src/client.rs +++ b/src/client.rs @@ -6,9 +6,9 @@ use tracing::{error, warn, debug}; use uuid::Uuid; use crate::{ - cursor::{CursorController, CursorStorage}, + cursor::{CursorControllerHandle, CursorControllerWorker, CursorProvider}, operation::{OperationProcessor, OperationController}, - proto::{buffer_client::BufferClient, BufferPayload, OperationRequest, CursorMov}, + proto::{buffer_client::BufferClient, BufferPayload, OperationRequest, CursorMov}, errors::IgnorableError, }; #[derive(Clone)] @@ -24,6 +24,8 @@ impl From::> for CodempClient { } impl CodempClient { + pub fn id(&self) -> &str { &self.id } + pub async fn create(&mut self, path: String, content: Option) -> Result { let req = BufferPayload { path, content, @@ -35,7 +37,7 @@ impl CodempClient { Ok(res.into_inner().accepted) } - pub async fn listen(&mut self) -> Result, Status> { + pub async fn listen(&mut self) -> Result { let req = BufferPayload { path: "".into(), content: None, @@ -44,20 +46,30 @@ impl CodempClient { let mut stream = self.client.listen(req).await?.into_inner(); - let controller = Arc::new(CursorController::new()); + let mut controller = CursorControllerWorker::new(self.id().to_string()); + let handle = controller.subscribe(); + let mut _client = self.client.clone(); - let _controller = controller.clone(); tokio::spawn(async move { loop { - match stream.message().await { - Err(e) => break error!("error receiving cursor: {}", e), - Ok(None) => break debug!("cursor worker clean exit"), - Ok(Some(x)) => { _controller.update(x); }, + tokio::select!{ + res = stream.message() => { + match res { + Err(e) => break error!("error receiving cursor: {}", e), + Ok(None) => break debug!("cursor worker clean exit"), + Ok(Some(x)) => { controller.broadcast(x); }, + } + }, + Some(op) = controller.wait() => { + _client.cursor(CursorMov::from(op)).await + .unwrap_or_warn("could not send cursor update") + } + } } }); - Ok(controller) + Ok(handle) } pub async fn attach(&mut self, path: String) -> Result, Status> { @@ -125,13 +137,4 @@ impl CodempClient { Ok(factory) } - - pub async fn cursor(&mut self, path: String, row: i64, col: i64) -> Result { - let req = CursorMov { - path, row, col, - user: self.id.clone(), - }; - let res = self.client.cursor(req).await?.into_inner(); - Ok(res.accepted) - } } diff --git a/src/cursor.rs b/src/cursor.rs index a7b8bdc..28dcc60 100644 --- a/src/cursor.rs +++ b/src/cursor.rs @@ -1,24 +1,42 @@ -use std::{collections::HashMap, sync::Mutex}; +use std::sync::Arc; -use tokio::sync::broadcast; -use tracing::{error, debug, warn}; +use tokio::sync::{mpsc, broadcast}; +use tonic::async_trait; -use crate::proto::CursorMov; +use crate::{proto::CursorMov, errors::IgnorableError}; -/// Note that this differs from any hashmap in its put method: no &mut! -pub trait CursorStorage { - fn get(&self, id: &str) -> Option; - fn put(&self, id: String, val: Cursor); +// TODO temp struct before we update protocol for real +#[derive(Clone, Debug, Default)] +pub struct Cursor { + pub user: String, + pub buffer: String, + pub start: Position, + pub end: Position, +} - fn update(&self, event: CursorMov) -> Option { - let mut cur = self.get(&event.user)?; - cur.buffer = event.path; - cur.start = (event.row, event.col).into(); - self.put(event.user, cur.clone()); - Some(cur) +impl From:: for CursorMov { + fn from(cursor: Cursor) -> CursorMov { + CursorMov { + user: cursor.user, + path: cursor.buffer, + row: cursor.start.row, + col: cursor.start.col, + } } } +impl From:: for Cursor { + fn from(cursor: CursorMov) -> Self { + Cursor { + user: cursor.user, + buffer: cursor.path, + start: (cursor.row, cursor.col).into(), + end: (0,0).into(), // TODO temp! + } + } + +} + #[derive(Copy, Clone, Debug, Default)] pub struct Position { pub row: i64, @@ -31,64 +49,103 @@ impl From::<(i64, i64)> for Position { } } -#[derive(Clone, Debug, Default)] -pub struct Cursor { - pub buffer: String, - pub start: Position, - pub end: Position, +impl From::<(i32, i32)> for Position { + fn from((row, col): (i32, i32)) -> Self { + Position { row: row as i64, col: col as i64 } + } } -#[derive(Debug)] -pub struct CursorController { - users: Mutex>, - bus: broadcast::Sender<(String, Cursor)>, - _bus_keepalive: Mutex>, +#[async_trait] +pub trait CursorSubscriber { + async fn send(&self, path: &str, start: Position, end: Position); + async fn poll(&mut self) -> Option; } -impl Default for CursorController { - fn default() -> Self { - let (tx, _rx) = broadcast::channel(64); - CursorController { - users: Mutex::new(HashMap::new()), - bus: tx, - _bus_keepalive: Mutex::new(_rx), +pub struct CursorControllerHandle { + uid: String, + op: mpsc::Sender, + stream: broadcast::Receiver, + original: Arc>, +} + +impl Clone for CursorControllerHandle { + fn clone(&self) -> Self { + Self { + uid: self.uid.clone(), + op: self.op.clone(), + stream: self.original.subscribe(), + original: self.original.clone() } } } -impl CursorController { - pub fn new() -> Self { - CursorController::default() +#[async_trait] +impl CursorSubscriber for CursorControllerHandle { + async fn send(&self, path: &str, start: Position, end: Position) { + self.op.send(Cursor { + user: self.uid.clone(), + buffer: path.to_string(), + start, end + }).await.unwrap_or_warn("could not send cursor op") } - pub fn sub(&self) -> broadcast::Receiver<(String, Cursor)> { - self.bus.subscribe() - } -} - -impl CursorStorage for CursorController { - fn update(&self, event: CursorMov) -> Option { - debug!("processing cursor event: {:?}", event); - let mut cur = self.get(&event.user).unwrap_or(Cursor::default()); - cur.buffer = event.path; - cur.start = (event.row, event.col).into(); - cur.end = (event.row, event.col).into(); - self.put(event.user.clone(), cur.clone()); - if let Err(e) = self.bus.send((event.user, cur.clone())) { - error!("could not broadcast cursor event: {}", e); - } else { // this is because once there are no receivers, nothing else can be sent - if let Err(e) = self._bus_keepalive.lock().unwrap().try_recv() { - warn!("could not consume event: {}", e); + async fn poll(&mut self) -> Option { + match self.stream.recv().await { + Ok(x) => Some(x), + Err(e) => { + tracing::warn!("could not poll for cursor: {}", e); + None } } - Some(cur) - } - - fn get(&self, id: &str) -> Option { - Some(self.users.lock().unwrap().get(id)?.clone()) - } - - fn put(&self, id: String, val: Cursor) { - self.users.lock().unwrap().insert(id, val); } } + +#[async_trait] +pub(crate) trait CursorProvider +where T : CursorSubscriber { + fn subscribe(&self) -> T; + fn broadcast(&self, op: CursorMov); + async fn wait(&mut self) -> Option; +} + +pub(crate) struct CursorControllerWorker { + uid: String, + producer: mpsc::Sender, + op: mpsc::Receiver, + channel: Arc>, +} + +impl CursorControllerWorker { + pub(crate) fn new(uid: String) -> Self { + let (op_tx, op_rx) = mpsc::channel(64); + let (cur_tx, _cur_rx) = broadcast::channel(64); + CursorControllerWorker { + uid, + producer: op_tx, + op: op_rx, + channel: Arc::new(cur_tx), + } + } +} + +#[async_trait] +impl CursorProvider for CursorControllerWorker { + fn broadcast(&self, op: CursorMov) { + self.channel.send(op.into()).unwrap_or_warn("could not broadcast cursor event") + } + + async fn wait(&mut self) -> Option { + self.op.recv().await + } + + fn subscribe(&self) -> CursorControllerHandle { + CursorControllerHandle { + uid: self.uid.clone(), + op: self.producer.clone(), + stream: self.channel.subscribe(), + original: self.channel.clone(), + } + } + +} +