diff --git a/src/lib/client.rs b/src/lib/client.rs index 6b9d179..d92e6b5 100644 --- a/src/lib/client.rs +++ b/src/lib/client.rs @@ -1,194 +1,80 @@ -/// TODO better name for this file +use std::{future::Future, sync::Arc, pin::Pin}; -use std::{sync::{Arc, RwLock}, collections::BTreeMap}; -use tracing::{error, warn, info}; -use uuid::Uuid; +use operational_transform::OperationSeq; +use tokio::sync::mpsc; +use tonic::{transport::{Channel, Error}, Status}; +use tracing::error; -use crate::{ - opfactory::AsyncFactory, - proto::{buffer_client::BufferClient, BufferPayload, OperationRequest, RawOp, CursorMov}, - tonic::{transport::Channel, Status, Streaming}, -}; +use crate::{proto::{buffer_client::BufferClient, BufferPayload, OperationRequest, RawOp, CursorMov}, opfactory::AsyncFactory}; -pub type FactoryStore = Arc>>>; - -impl From::> for CodempClient { - fn from(x: BufferClient) -> CodempClient { - CodempClient { - id: Uuid::new_v4(), - client:x, - factories: Arc::new(RwLock::new(BTreeMap::new())), - } - } +pub trait EditorDriver : Clone { + fn id(&self) -> String; } #[derive(Clone)] -pub struct CodempClient { - id: Uuid, +pub struct CodempClient { client: BufferClient, - factories: FactoryStore, + driver: T, } -impl CodempClient { - fn get_factory(&self, path: &String) -> Result, Status> { - match self.factories.read().unwrap().get(path) { - Some(f) => Ok(f.clone()), - None => Err(Status::not_found("no active buffer for given path")), - } +impl CodempClient { // TODO wrap tonic 'connect' to allow multiple types + pub async fn new(addr: &str, driver: T) -> Result { + let client = BufferClient::connect(addr.to_string()).await?; + + Ok(CodempClient { client, driver }) } - pub fn add_factory(&self, path: String, factory:Arc) { - self.factories.write().unwrap().insert(path, factory); - } - - pub async fn create(&mut self, path: String, content: Option) -> Result { + pub async fn create_buffer(&mut self, path: String, content: Option) -> Result { let req = BufferPayload { - path: path.clone(), - content: content.clone(), - user: self.id.to_string(), + path, content, + user: self.driver.id(), }; - let res = self.client.create(req).await?.into_inner(); + let res = self.client.create(req).await?; - Ok(res.accepted) + Ok(res.into_inner().accepted) } - pub async fn insert(&mut self, path: String, txt: String, pos: u64) -> Result { - let factory = self.get_factory(&path)?; - match factory.insert(txt, pos).await { - Err(e) => Err(Status::internal(format!("invalid operation: {}", e))), - Ok(op) => { - let req = OperationRequest { - path, - hash: "".into(), - user: self.id.to_string(), - opseq: serde_json::to_string(&op) - .map_err(|_| Status::invalid_argument("could not serialize opseq"))?, - }; - let res = self.client.edit(req).await?.into_inner(); - if let Err(e) = factory.ack(op.clone()).await { - error!("could not ack op '{:?}' : {}", op, e); - } - Ok(res.accepted) - }, - } - } - - pub async fn delete(&mut self, path: String, pos: u64, count: u64) -> Result { - let factory = self.get_factory(&path)?; - match factory.delete(pos, count).await { - Err(e) => Err(Status::internal(format!("invalid operation: {}", e))), - Ok(op) => { - let req = OperationRequest { - path, - hash: "".into(), - user: self.id.to_string(), - opseq: serde_json::to_string(&op) - .map_err(|_| Status::invalid_argument("could not serialize opseq"))?, - }; - let res = self.client.edit(req).await?.into_inner(); - if let Err(e) = factory.ack(op.clone()).await { - error!("could not ack op '{:?}' : {}", op, e); - } - Ok(res.accepted) - }, - } - } - - pub async fn replace(&mut self, path: String, txt: String) -> Result { - let factory = self.get_factory(&path)?; - match factory.replace(txt).await { - Err(e) => Err(Status::internal(format!("invalid operation: {}", e))), - Ok(op) => { - let req = OperationRequest { - path, - hash: "".into(), - user: self.id.to_string(), - opseq: serde_json::to_string(&op) - .map_err(|_| Status::invalid_argument("could not serialize opseq"))?, - }; - let res = self.client.edit(req).await?.into_inner(); - if let Err(e) = factory.ack(op.clone()).await { - error!("could not ack op '{:?}' : {}", op, e); - } - Ok(res.accepted) - }, - } - } - - pub async fn cursor(&mut self, path: String, row: i64, col: i64) -> Result<(), Status> { - let req = CursorMov { - path, user: self.id.to_string(), - row, col, - }; - let _res = self.client.cursor(req).await?.into_inner(); - Ok(()) - } - - pub async fn listen(&mut self, path: String, callback: F) -> Result<(), Status> - where F : Fn(CursorMov) -> () + Send + 'static { + pub async fn attach_buffer(&mut self, path: String) -> Result, Status> { let req = BufferPayload { - path, - content: None, - user: self.id.to_string(), + path, content: None, + user: self.driver.id(), }; - let mut stream = self.client.listen(req).await?.into_inner(); + + let content = self.client.sync(req.clone()) + .await? + .into_inner() + .content; + + let mut stream = self.client.attach(req).await?.into_inner(); + + let factory = Arc::new(AsyncFactory::new(content)); + + let (tx, mut rx) = mpsc::channel(64); + tokio::spawn(async move { - // TODO catch some errors - while let Ok(Some(x)) = stream.message().await { - callback(x) + loop { + match stream.message().await { + Err(e) => break error!("error receiving update: {}", e), + Ok(None) => break, + Ok(Some(x)) => match serde_json::from_str::(&x.opseq) { + Err(e) => break error!("error deserializing opseq: {}", e), + Ok(v) => match factory.process(v).await { + Err(e) => break error!("could not apply operation from server: {}", e), + Ok(txt) => { // send back txt + } + } + }, + } } }); - Ok(()) - } - pub async fn attach(&mut self, path: String, callback: F) -> Result - where F : Fn(String) -> () + Send + 'static { - let content = self.sync(path.clone()).await?; - let factory = Arc::new(AsyncFactory::new(Some(content.clone()))); - self.add_factory(path.clone(), factory.clone()); - let req = BufferPayload { - path, - content: None, - user: self.id.to_string(), - }; - let stream = self.client.attach(req).await?.into_inner(); - tokio::spawn(async move { Self::worker(stream, factory, callback).await } ); - Ok(content) - } + tokio::spawn(async move { + while let Some(op) = rx.recv().await { - pub fn detach(&mut self, path: String) { - self.factories.write().unwrap().remove(&path); - info!("|| detached from buffer"); - } - - async fn sync(&mut self, path: String) -> Result { - let res = self.client.sync( - BufferPayload { - path, content: None, user: self.id.to_string(), } - ).await?; - Ok(res.into_inner().content.unwrap_or("".into())) - } + }); - async fn worker(mut stream: Streaming, factory: Arc, callback: F) - where F : Fn(String) -> () { - info!("|> buffer worker started"); - loop { - match stream.message().await { - Err(e) => break error!("error receiving change: {}", e), - Ok(v) => match v { - None => break warn!("stream closed"), - Some(operation) => match serde_json::from_str(&operation.opseq) { - Err(e) => break error!("could not deserialize opseq: {}", e), - Ok(op) => match factory.process(op).await { - Err(e) => break error!("desynched: {}", e), - Ok(x) => callback(x), - }, - } - }, - } - } - info!("[] buffer worker stopped"); + Ok(tx) } } diff --git a/src/lib/opfactory.rs b/src/lib/opfactory.rs index 320bd84..f336df4 100644 --- a/src/lib/opfactory.rs +++ b/src/lib/opfactory.rs @@ -1,47 +1,24 @@ -use std::collections::VecDeque; +use std::sync::Arc; use operational_transform::{OperationSeq, OTError}; use similar::TextDiff; use tokio::sync::{mpsc, watch, oneshot}; use tracing::{error, warn}; -#[derive(Clone)] -pub struct OperationFactory { - content: String, - queue: VecDeque, -} +#[tonic::async_trait] +pub trait OperationFactory { + fn content(&self) -> String; + async fn apply(&self, op: OperationSeq) -> Result; + async fn process(&self, op: OperationSeq) -> Result; + async fn acknowledge(&self, op: OperationSeq) -> Result<(), OTError>; -impl OperationFactory { - pub fn new(init: Option) -> Self { - OperationFactory { - content: init.unwrap_or(String::new()), - queue: VecDeque::new(), - } - } - - fn apply(&mut self, op: OperationSeq) -> Result { - if op.is_noop() { return Err(OTError) } - self.content = op.apply(&self.content)?; - self.queue.push_back(op.clone()); - Ok(op) - } - - // TODO remove the need for this - pub fn content(&self) -> String { - self.content.clone() - } - - pub fn check(&self, txt: &str) -> bool { - self.content == txt - } - - pub fn replace(&mut self, txt: &str) -> Result { + fn replace(&self, txt: &str) -> OperationSeq { let mut out = OperationSeq::default(); - if self.content == txt { // TODO throw and error rather than wasting everyone's resources - return Err(OTError); // nothing to do + if self.content() == txt { + return out; // TODO this won't work, should we return a noop instead? } - let diff = TextDiff::from_chars(self.content.as_str(), txt); + let diff = TextDiff::from_chars(self.content().as_str(), txt); for change in diff.iter_all_changes() { match change.tag() { @@ -51,59 +28,37 @@ impl OperationFactory { } } - self.content = out.apply(&self.content)?; - Ok(out) + out } - pub fn insert(&mut self, txt: &str, pos: u64) -> Result { + fn insert(&self, txt: &str, pos: u64) -> OperationSeq { let mut out = OperationSeq::default(); - let total = self.content.len() as u64; + let total = self.content().len() as u64; out.retain(pos); out.insert(txt); out.retain(total - pos); - Ok(self.apply(out)?) + out } - pub fn delete(&mut self, pos: u64, count: u64) -> Result { + fn delete(&self, pos: u64, count: u64) -> OperationSeq { let mut out = OperationSeq::default(); - let len = self.content.len() as u64; + let len = self.content().len() as u64; out.retain(pos - count); out.delete(count); out.retain(len - pos); - Ok(self.apply(out)?) + out } - pub fn cancel(&mut self, pos: u64, count: u64) -> Result { + fn cancel(&self, pos: u64, count: u64) -> OperationSeq { let mut out = OperationSeq::default(); - let len = self.content.len() as u64; + let len = self.content().len() as u64; out.retain(pos); out.delete(count); out.retain(len - (pos+count)); - Ok(self.apply(out)?) - } - - pub fn ack(&mut self, op: OperationSeq) -> Result<(), OTError> { // TODO use a different error? - // TODO is manually iterating from behind worth the manual search boilerplate? - for (i, o) in self.queue.iter().enumerate().rev() { - if o == &op { - self.queue.remove(i); - return Ok(()); - } - } - warn!("could not ack op {:?} from {:?}", op, self.queue); - Err(OTError) - } - - pub fn process(&mut self, mut op: OperationSeq) -> Result { - for o in self.queue.iter_mut() { - (op, *o) = op.transform(o)?; - } - self.content = op.apply(&self.content)?; - Ok(self.content.clone()) + out } } - pub struct AsyncFactory { run: watch::Sender, ops: mpsc::Sender, @@ -117,6 +72,32 @@ impl Drop for AsyncFactory { } } +#[tonic::async_trait] +impl OperationFactory for AsyncFactory { + fn content(&self) -> String { + return self.content.borrow().clone(); + } + + async fn apply(&self, op: OperationSeq) -> Result { + let (tx, rx) = oneshot::channel(); + self.ops.send(OpMsg::Apply(op, tx)).await.map_err(|_| OTError)?; + Ok(rx.await.map_err(|_| OTError)?) + } + + async fn process(&self, op: OperationSeq) -> Result { + let (tx, rx) = oneshot::channel(); + self.ops.send(OpMsg::Process(op, tx)).await.map_err(|_| OTError)?; + Ok(rx.await.map_err(|_| OTError)?) + } + + async fn acknowledge(&self, op: OperationSeq) -> Result<(), OTError> { + let (tx, rx) = oneshot::channel(); + self.ops.send(OpMsg::Acknowledge(op, tx)).await.map_err(|_| OTError)?; + Ok(rx.await.map_err(|_| OTError)?) + } + +} + impl AsyncFactory { pub fn new(init: Option) -> Self { let (run_tx, run_rx) = watch::channel(true); @@ -124,7 +105,7 @@ impl AsyncFactory { let (txt_tx, txt_rx) = watch::channel("".into()); let worker = AsyncFactoryWorker { - factory: OperationFactory::new(init), + text: init.unwrap_or("".into()), ops: ops_rx, run: run_rx, content: txt_tx, @@ -134,62 +115,18 @@ impl AsyncFactory { AsyncFactory { run: run_tx, ops: ops_tx, content: txt_rx } } - - pub async fn insert(&self, txt: String, pos: u64) -> Result { - let (tx, rx) = oneshot::channel(); - self.ops.send(OpMsg::Exec(OpWrapper::Insert(txt, pos), tx)).await.map_err(|_| OTError)?; - rx.await.map_err(|_| OTError)? - } - - pub async fn delete(&self, pos: u64, count: u64) -> Result { - let (tx, rx) = oneshot::channel(); - self.ops.send(OpMsg::Exec(OpWrapper::Delete(pos, count), tx)).await.map_err(|_| OTError)?; - rx.await.map_err(|_| OTError)? - } - - pub async fn cancel(&self, pos: u64, count: u64) -> Result { - let (tx, rx) = oneshot::channel(); - self.ops.send(OpMsg::Exec(OpWrapper::Cancel(pos, count), tx)).await.map_err(|_| OTError)?; - rx.await.map_err(|_| OTError)? - } - - pub async fn replace(&self, txt: String) -> Result { - let (tx, rx) = oneshot::channel(); - self.ops.send(OpMsg::Exec(OpWrapper::Replace(txt), tx)).await.map_err(|_| OTError)?; - rx.await.map_err(|_| OTError)? - } - - pub async fn process(&self, opseq: OperationSeq) -> Result { - let (tx, rx) = oneshot::channel(); - self.ops.send(OpMsg::Process(opseq, tx)).await.map_err(|_| OTError)?; - rx.await.map_err(|_| OTError)? - } - - pub async fn ack(&self, opseq: OperationSeq) -> Result<(), OTError> { - let (tx, rx) = oneshot::channel(); - self.ops.send(OpMsg::Ack(opseq, tx)).await.map_err(|_| OTError)?; - rx.await.map_err(|_| OTError)? - } } #[derive(Debug)] enum OpMsg { - Exec(OpWrapper, oneshot::Sender>), - Process(OperationSeq, oneshot::Sender>), - Ack(OperationSeq, oneshot::Sender>), -} - -#[derive(Debug)] -enum OpWrapper { - Insert(String, u64), - Delete(u64, u64), - Cancel(u64, u64), - Replace(String), + Apply(OperationSeq, oneshot::Sender), + Process(OperationSeq, oneshot::Sender), + Acknowledge(OperationSeq, oneshot::Sender<()>) } struct AsyncFactoryWorker { - factory: OperationFactory, + text: String, ops: mpsc::Receiver, run: watch::Receiver, content: watch::Sender @@ -204,7 +141,7 @@ impl AsyncFactoryWorker { match recv { Some(msg) => { match msg { - OpMsg::Exec(op, tx) => tx.send(self.exec(op)).unwrap_or(()), + OpMsg::Apply(op, tx) => tx.send(self.exec(op)).unwrap_or(()), OpMsg::Process(opseq, tx) => tx.send(self.factory.process(opseq)).unwrap_or(()), OpMsg::Ack(opseq, tx) => tx.send(self.factory.ack(opseq)).unwrap_or(()), }