diff --git a/src/client.rs b/src/client.rs index f8d68d8..105c458 100644 --- a/src/client.rs +++ b/src/client.rs @@ -6,9 +6,9 @@ use tracing::{error, warn, debug}; use uuid::Uuid; use crate::{ - cursor::{CursorController, CursorStorage}, + cursor::{CursorControllerHandle, CursorControllerWorker, CursorProvider}, operation::{OperationProcessor, OperationController}, - proto::{buffer_client::BufferClient, BufferPayload, OperationRequest, CursorMov}, + proto::{buffer_client::BufferClient, BufferPayload, OperationRequest, CursorMov}, errors::IgnorableError, }; #[derive(Clone)] @@ -24,6 +24,8 @@ impl From::> for CodempClient { } impl CodempClient { + pub fn id(&self) -> &str { &self.id } + pub async fn create(&mut self, path: String, content: Option) -> Result { let req = BufferPayload { path, content, @@ -35,7 +37,7 @@ impl CodempClient { Ok(res.into_inner().accepted) } - pub async fn listen(&mut self) -> Result, Status> { + pub async fn listen(&mut self) -> Result { let req = BufferPayload { path: "".into(), content: None, @@ -44,20 +46,30 @@ impl CodempClient { let mut stream = self.client.listen(req).await?.into_inner(); - let controller = Arc::new(CursorController::new()); + let mut controller = CursorControllerWorker::new(self.id().to_string()); + let handle = controller.subscribe(); + let mut _client = self.client.clone(); - let _controller = controller.clone(); tokio::spawn(async move { loop { - match stream.message().await { - Err(e) => break error!("error receiving cursor: {}", e), - Ok(None) => break debug!("cursor worker clean exit"), - Ok(Some(x)) => { _controller.update(x); }, + tokio::select!{ + res = stream.message() => { + match res { + Err(e) => break error!("error receiving cursor: {}", e), + Ok(None) => break debug!("cursor worker clean exit"), + Ok(Some(x)) => { controller.broadcast(x); }, + } + }, + Some(op) = controller.wait() => { + _client.cursor(CursorMov::from(op)).await + .unwrap_or_warn("could not send cursor update") + } + } } }); - Ok(controller) + Ok(handle) } pub async fn attach(&mut self, path: String) -> Result, Status> { @@ -125,13 +137,4 @@ impl CodempClient { Ok(factory) } - - pub async fn cursor(&mut self, path: String, row: i64, col: i64) -> Result { - let req = CursorMov { - path, row, col, - user: self.id.clone(), - }; - let res = self.client.cursor(req).await?.into_inner(); - Ok(res.accepted) - } } diff --git a/src/cursor.rs b/src/cursor.rs index a7b8bdc..28dcc60 100644 --- a/src/cursor.rs +++ b/src/cursor.rs @@ -1,24 +1,42 @@ -use std::{collections::HashMap, sync::Mutex}; +use std::sync::Arc; -use tokio::sync::broadcast; -use tracing::{error, debug, warn}; +use tokio::sync::{mpsc, broadcast}; +use tonic::async_trait; -use crate::proto::CursorMov; +use crate::{proto::CursorMov, errors::IgnorableError}; -/// Note that this differs from any hashmap in its put method: no &mut! -pub trait CursorStorage { - fn get(&self, id: &str) -> Option; - fn put(&self, id: String, val: Cursor); +// TODO temp struct before we update protocol for real +#[derive(Clone, Debug, Default)] +pub struct Cursor { + pub user: String, + pub buffer: String, + pub start: Position, + pub end: Position, +} - fn update(&self, event: CursorMov) -> Option { - let mut cur = self.get(&event.user)?; - cur.buffer = event.path; - cur.start = (event.row, event.col).into(); - self.put(event.user, cur.clone()); - Some(cur) +impl From:: for CursorMov { + fn from(cursor: Cursor) -> CursorMov { + CursorMov { + user: cursor.user, + path: cursor.buffer, + row: cursor.start.row, + col: cursor.start.col, + } } } +impl From:: for Cursor { + fn from(cursor: CursorMov) -> Self { + Cursor { + user: cursor.user, + buffer: cursor.path, + start: (cursor.row, cursor.col).into(), + end: (0,0).into(), // TODO temp! + } + } + +} + #[derive(Copy, Clone, Debug, Default)] pub struct Position { pub row: i64, @@ -31,64 +49,103 @@ impl From::<(i64, i64)> for Position { } } -#[derive(Clone, Debug, Default)] -pub struct Cursor { - pub buffer: String, - pub start: Position, - pub end: Position, +impl From::<(i32, i32)> for Position { + fn from((row, col): (i32, i32)) -> Self { + Position { row: row as i64, col: col as i64 } + } } -#[derive(Debug)] -pub struct CursorController { - users: Mutex>, - bus: broadcast::Sender<(String, Cursor)>, - _bus_keepalive: Mutex>, +#[async_trait] +pub trait CursorSubscriber { + async fn send(&self, path: &str, start: Position, end: Position); + async fn poll(&mut self) -> Option; } -impl Default for CursorController { - fn default() -> Self { - let (tx, _rx) = broadcast::channel(64); - CursorController { - users: Mutex::new(HashMap::new()), - bus: tx, - _bus_keepalive: Mutex::new(_rx), +pub struct CursorControllerHandle { + uid: String, + op: mpsc::Sender, + stream: broadcast::Receiver, + original: Arc>, +} + +impl Clone for CursorControllerHandle { + fn clone(&self) -> Self { + Self { + uid: self.uid.clone(), + op: self.op.clone(), + stream: self.original.subscribe(), + original: self.original.clone() } } } -impl CursorController { - pub fn new() -> Self { - CursorController::default() +#[async_trait] +impl CursorSubscriber for CursorControllerHandle { + async fn send(&self, path: &str, start: Position, end: Position) { + self.op.send(Cursor { + user: self.uid.clone(), + buffer: path.to_string(), + start, end + }).await.unwrap_or_warn("could not send cursor op") } - pub fn sub(&self) -> broadcast::Receiver<(String, Cursor)> { - self.bus.subscribe() - } -} - -impl CursorStorage for CursorController { - fn update(&self, event: CursorMov) -> Option { - debug!("processing cursor event: {:?}", event); - let mut cur = self.get(&event.user).unwrap_or(Cursor::default()); - cur.buffer = event.path; - cur.start = (event.row, event.col).into(); - cur.end = (event.row, event.col).into(); - self.put(event.user.clone(), cur.clone()); - if let Err(e) = self.bus.send((event.user, cur.clone())) { - error!("could not broadcast cursor event: {}", e); - } else { // this is because once there are no receivers, nothing else can be sent - if let Err(e) = self._bus_keepalive.lock().unwrap().try_recv() { - warn!("could not consume event: {}", e); + async fn poll(&mut self) -> Option { + match self.stream.recv().await { + Ok(x) => Some(x), + Err(e) => { + tracing::warn!("could not poll for cursor: {}", e); + None } } - Some(cur) - } - - fn get(&self, id: &str) -> Option { - Some(self.users.lock().unwrap().get(id)?.clone()) - } - - fn put(&self, id: String, val: Cursor) { - self.users.lock().unwrap().insert(id, val); } } + +#[async_trait] +pub(crate) trait CursorProvider +where T : CursorSubscriber { + fn subscribe(&self) -> T; + fn broadcast(&self, op: CursorMov); + async fn wait(&mut self) -> Option; +} + +pub(crate) struct CursorControllerWorker { + uid: String, + producer: mpsc::Sender, + op: mpsc::Receiver, + channel: Arc>, +} + +impl CursorControllerWorker { + pub(crate) fn new(uid: String) -> Self { + let (op_tx, op_rx) = mpsc::channel(64); + let (cur_tx, _cur_rx) = broadcast::channel(64); + CursorControllerWorker { + uid, + producer: op_tx, + op: op_rx, + channel: Arc::new(cur_tx), + } + } +} + +#[async_trait] +impl CursorProvider for CursorControllerWorker { + fn broadcast(&self, op: CursorMov) { + self.channel.send(op.into()).unwrap_or_warn("could not broadcast cursor event") + } + + async fn wait(&mut self) -> Option { + self.op.recv().await + } + + fn subscribe(&self) -> CursorControllerHandle { + CursorControllerHandle { + uid: self.uid.clone(), + op: self.producer.clone(), + stream: self.channel.subscribe(), + original: self.channel.clone(), + } + } + +} +