feat: reworked cursor handle mechanism

instead of storing cursors it just streams them as they happen. instead
of just getting cursors from the controller, now you also send your
operations into it, mimicking more the behavior used for text ops
This commit is contained in:
əlemi 2023-07-04 22:54:25 +02:00
parent 340eafb432
commit 38911bdc31
2 changed files with 140 additions and 80 deletions

View file

@ -6,9 +6,9 @@ use tracing::{error, warn, debug};
use uuid::Uuid;
use crate::{
cursor::{CursorController, CursorStorage},
cursor::{CursorControllerHandle, CursorControllerWorker, CursorProvider},
operation::{OperationProcessor, OperationController},
proto::{buffer_client::BufferClient, BufferPayload, OperationRequest, CursorMov},
proto::{buffer_client::BufferClient, BufferPayload, OperationRequest, CursorMov}, errors::IgnorableError,
};
#[derive(Clone)]
@ -24,6 +24,8 @@ impl From::<BufferClient<Channel>> for CodempClient {
}
impl CodempClient {
pub fn id(&self) -> &str { &self.id }
pub async fn create(&mut self, path: String, content: Option<String>) -> Result<bool, Status> {
let req = BufferPayload {
path, content,
@ -35,7 +37,7 @@ impl CodempClient {
Ok(res.into_inner().accepted)
}
pub async fn listen(&mut self) -> Result<Arc<CursorController>, Status> {
pub async fn listen(&mut self) -> Result<CursorControllerHandle, Status> {
let req = BufferPayload {
path: "".into(),
content: None,
@ -44,20 +46,30 @@ impl CodempClient {
let mut stream = self.client.listen(req).await?.into_inner();
let controller = Arc::new(CursorController::new());
let mut controller = CursorControllerWorker::new(self.id().to_string());
let handle = controller.subscribe();
let mut _client = self.client.clone();
let _controller = controller.clone();
tokio::spawn(async move {
loop {
match stream.message().await {
tokio::select!{
res = stream.message() => {
match res {
Err(e) => break error!("error receiving cursor: {}", e),
Ok(None) => break debug!("cursor worker clean exit"),
Ok(Some(x)) => { _controller.update(x); },
Ok(Some(x)) => { controller.broadcast(x); },
}
},
Some(op) = controller.wait() => {
_client.cursor(CursorMov::from(op)).await
.unwrap_or_warn("could not send cursor update")
}
}
}
});
Ok(controller)
Ok(handle)
}
pub async fn attach(&mut self, path: String) -> Result<Arc<OperationController>, Status> {
@ -125,13 +137,4 @@ impl CodempClient {
Ok(factory)
}
pub async fn cursor(&mut self, path: String, row: i64, col: i64) -> Result<bool, Status> {
let req = CursorMov {
path, row, col,
user: self.id.clone(),
};
let res = self.client.cursor(req).await?.into_inner();
Ok(res.accepted)
}
}

View file

@ -1,22 +1,40 @@
use std::{collections::HashMap, sync::Mutex};
use std::sync::Arc;
use tokio::sync::broadcast;
use tracing::{error, debug, warn};
use tokio::sync::{mpsc, broadcast};
use tonic::async_trait;
use crate::proto::CursorMov;
use crate::{proto::CursorMov, errors::IgnorableError};
/// Note that this differs from any hashmap in its put method: no &mut!
pub trait CursorStorage {
fn get(&self, id: &str) -> Option<Cursor>;
fn put(&self, id: String, val: Cursor);
fn update(&self, event: CursorMov) -> Option<Cursor> {
let mut cur = self.get(&event.user)?;
cur.buffer = event.path;
cur.start = (event.row, event.col).into();
self.put(event.user, cur.clone());
Some(cur)
// TODO temp struct before we update protocol for real
#[derive(Clone, Debug, Default)]
pub struct Cursor {
pub user: String,
pub buffer: String,
pub start: Position,
pub end: Position,
}
impl From::<Cursor> for CursorMov {
fn from(cursor: Cursor) -> CursorMov {
CursorMov {
user: cursor.user,
path: cursor.buffer,
row: cursor.start.row,
col: cursor.start.col,
}
}
}
impl From::<CursorMov> for Cursor {
fn from(cursor: CursorMov) -> Self {
Cursor {
user: cursor.user,
buffer: cursor.path,
start: (cursor.row, cursor.col).into(),
end: (0,0).into(), // TODO temp!
}
}
}
#[derive(Copy, Clone, Debug, Default)]
@ -31,64 +49,103 @@ impl From::<(i64, i64)> for Position {
}
}
#[derive(Clone, Debug, Default)]
pub struct Cursor {
pub buffer: String,
pub start: Position,
pub end: Position,
impl From::<(i32, i32)> for Position {
fn from((row, col): (i32, i32)) -> Self {
Position { row: row as i64, col: col as i64 }
}
}
#[derive(Debug)]
pub struct CursorController {
users: Mutex<HashMap<String, Cursor>>,
bus: broadcast::Sender<(String, Cursor)>,
_bus_keepalive: Mutex<broadcast::Receiver<(String, Cursor)>>,
#[async_trait]
pub trait CursorSubscriber {
async fn send(&self, path: &str, start: Position, end: Position);
async fn poll(&mut self) -> Option<Cursor>;
}
impl Default for CursorController {
fn default() -> Self {
let (tx, _rx) = broadcast::channel(64);
CursorController {
users: Mutex::new(HashMap::new()),
bus: tx,
_bus_keepalive: Mutex::new(_rx),
pub struct CursorControllerHandle {
uid: String,
op: mpsc::Sender<Cursor>,
stream: broadcast::Receiver<Cursor>,
original: Arc<broadcast::Sender<Cursor>>,
}
impl Clone for CursorControllerHandle {
fn clone(&self) -> Self {
Self {
uid: self.uid.clone(),
op: self.op.clone(),
stream: self.original.subscribe(),
original: self.original.clone()
}
}
}
impl CursorController {
pub fn new() -> Self {
CursorController::default()
#[async_trait]
impl CursorSubscriber for CursorControllerHandle {
async fn send(&self, path: &str, start: Position, end: Position) {
self.op.send(Cursor {
user: self.uid.clone(),
buffer: path.to_string(),
start, end
}).await.unwrap_or_warn("could not send cursor op")
}
pub fn sub(&self) -> broadcast::Receiver<(String, Cursor)> {
self.bus.subscribe()
async fn poll(&mut self) -> Option<Cursor> {
match self.stream.recv().await {
Ok(x) => Some(x),
Err(e) => {
tracing::warn!("could not poll for cursor: {}", e);
None
}
}
}
}
impl CursorStorage for CursorController {
fn update(&self, event: CursorMov) -> Option<Cursor> {
debug!("processing cursor event: {:?}", event);
let mut cur = self.get(&event.user).unwrap_or(Cursor::default());
cur.buffer = event.path;
cur.start = (event.row, event.col).into();
cur.end = (event.row, event.col).into();
self.put(event.user.clone(), cur.clone());
if let Err(e) = self.bus.send((event.user, cur.clone())) {
error!("could not broadcast cursor event: {}", e);
} else { // this is because once there are no receivers, nothing else can be sent
if let Err(e) = self._bus_keepalive.lock().unwrap().try_recv() {
warn!("could not consume event: {}", e);
}
}
Some(cur)
#[async_trait]
pub(crate) trait CursorProvider<T>
where T : CursorSubscriber {
fn subscribe(&self) -> T;
fn broadcast(&self, op: CursorMov);
async fn wait(&mut self) -> Option<Cursor>;
}
fn get(&self, id: &str) -> Option<Cursor> {
Some(self.users.lock().unwrap().get(id)?.clone())
pub(crate) struct CursorControllerWorker {
uid: String,
producer: mpsc::Sender<Cursor>,
op: mpsc::Receiver<Cursor>,
channel: Arc<broadcast::Sender<Cursor>>,
}
fn put(&self, id: String, val: Cursor) {
self.users.lock().unwrap().insert(id, val);
impl CursorControllerWorker {
pub(crate) fn new(uid: String) -> Self {
let (op_tx, op_rx) = mpsc::channel(64);
let (cur_tx, _cur_rx) = broadcast::channel(64);
CursorControllerWorker {
uid,
producer: op_tx,
op: op_rx,
channel: Arc::new(cur_tx),
}
}
}
#[async_trait]
impl CursorProvider<CursorControllerHandle> for CursorControllerWorker {
fn broadcast(&self, op: CursorMov) {
self.channel.send(op.into()).unwrap_or_warn("could not broadcast cursor event")
}
async fn wait(&mut self) -> Option<Cursor> {
self.op.recv().await
}
fn subscribe(&self) -> CursorControllerHandle {
CursorControllerHandle {
uid: self.uid.clone(),
op: self.producer.clone(),
stream: self.channel.subscribe(),
original: self.channel.clone(),
}
}
}