diff --git a/src/lib/client.rs b/src/lib/client.rs new file mode 100644 index 0000000..071f178 --- /dev/null +++ b/src/lib/client.rs @@ -0,0 +1,110 @@ +/// TODO better name for this file + +use std::sync::{Arc, Mutex}; +use tracing::{error, warn}; +use uuid::Uuid; + +use crate::{ + opfactory::OperationFactory, + proto::{buffer_client::BufferClient, BufferPayload, OperationRequest, RawOp}, + tonic::{transport::Channel, Status, Streaming}, +}; + +type FactoryHandle = Arc>; + +impl From::> for CodempClient { + fn from(x: BufferClient) -> CodempClient { + CodempClient { + id: Uuid::new_v4(), + client:x, + factory: Arc::new(Mutex::new(OperationFactory::new(None))) + } + } +} + +#[derive(Clone)] +pub struct CodempClient { + id: Uuid, + client: BufferClient, + factory: FactoryHandle, // TODO less jank solution than Arc +} + +impl CodempClient { + pub async fn create(&mut self, path: String, content: Option) -> Result { + Ok( + self.client.create( + BufferPayload { + path, + content, + user: self.id.to_string(), + } + ) + .await? + .into_inner() + .accepted + ) + } + + pub async fn insert(&mut self, path: String, txt: String, pos: u64) -> Result { + let res = { self.factory.lock().unwrap().insert(&txt, pos) }; + match res { + Ok(op) => { + Ok( + self.client.edit( + OperationRequest { + path, + hash: "".into(), + opseq: serde_json::to_string(&op).unwrap(), + user: self.id.to_string(), + } + ) + .await? + .into_inner() + .accepted + ) + }, + Err(e) => Err(Status::internal(format!("invalid operation: {}", e))), + } + } + + pub async fn attach () + Send + 'static>(&mut self, path: String, callback: F) -> Result<(), Status> { + let stream = self.client.attach( + BufferPayload { + path, + content: None, + user: self.id.to_string(), + } + ) + .await? + .into_inner(); + + let factory = self.factory.clone(); + tokio::spawn(async move { Self::worker(stream, factory, callback).await } ); + + Ok(()) + } + + async fn worker ()>(mut stream: Streaming, factory: FactoryHandle, callback: F) { + loop { + match stream.message().await { + Ok(v) => match v { + Some(operation) => { + let op = serde_json::from_str(&operation.opseq).unwrap(); + let res = { factory.lock().unwrap().process(op) }; + match res { + Ok(x) => callback(x), + Err(e) => break error!("desynched: {}", e), + } + } + None => break warn!("stream closed"), + }, + Err(e) => break error!("error receiving change: {}", e), + } + } + } + + pub fn content(&self) -> String { + let factory = self.factory.lock().unwrap(); + factory.content() + } +} diff --git a/src/lib/lib.rs b/src/lib/lib.rs index 97146d3..82b6ce5 100644 --- a/src/lib/lib.rs +++ b/src/lib/lib.rs @@ -1,5 +1,6 @@ pub mod proto; pub mod opfactory; +pub mod client; pub use tonic; pub use tokio; diff --git a/src/lib/opfactory.rs b/src/lib/opfactory.rs index b327120..190c017 100644 --- a/src/lib/opfactory.rs +++ b/src/lib/opfactory.rs @@ -1,5 +1,5 @@ use operational_transform::{OperationSeq, OTError}; -use tracing::{debug, info}; +use tracing::info; #[derive(Clone)]