From f8e77f08279e7bc89176ab1e051716a091f10a17 Mon Sep 17 00:00:00 2001 From: alemi Date: Wed, 16 Aug 2023 23:09:47 +0200 Subject: [PATCH] feat: reworked client, added static instance --- proto/cursor.proto | 16 ++-- src/buffer/controller.rs | 106 +++++++------------------ src/buffer/mod.rs | 12 ++- src/buffer/{handle.rs => worker.rs} | 57 +++----------- src/client.rs | 117 ++++++++++++++++++++++++++++ src/cursor/controller.rs | 51 ++++++++++++ src/cursor/mod.rs | 17 ++-- src/cursor/tracker.rs | 101 ------------------------ src/cursor/worker.rs | 54 +++++++++++++ src/errors.rs | 3 + src/instance.rs | 81 +++++++++++++++++++ src/lib.rs | 4 +- src/state.rs | 64 --------------- 13 files changed, 378 insertions(+), 305 deletions(-) rename src/buffer/{handle.rs => worker.rs} (64%) create mode 100644 src/client.rs create mode 100644 src/cursor/controller.rs delete mode 100644 src/cursor/tracker.rs create mode 100644 src/cursor/worker.rs create mode 100644 src/instance.rs delete mode 100644 src/state.rs diff --git a/proto/cursor.proto b/proto/cursor.proto index 39d40df..5af4fa2 100644 --- a/proto/cursor.proto +++ b/proto/cursor.proto @@ -3,22 +3,26 @@ syntax = "proto3"; package codemp.cursor; service Cursor { - rpc Moved (CursorPosition) returns (MovedResponse); - rpc Listen (UserIdentity) returns (stream CursorPosition); + rpc Moved (CursorEvent) returns (MovedResponse); + rpc Listen (UserIdentity) returns (stream CursorEvent); } message MovedResponse {} -message RowColumn { +message RowCol { int32 row = 1; int32 col = 2; } message CursorPosition { + string buffer = 1; + RowCol start = 2; + RowCol end = 3; +} + +message CursorEvent { string user = 1; - string buffer = 2; - RowColumn start = 3; - RowColumn end = 4; + CursorPosition position = 2; } message UserIdentity { diff --git a/src/buffer/controller.rs b/src/buffer/controller.rs index 45c0d59..73e967e 100644 --- a/src/buffer/controller.rs +++ b/src/buffer/controller.rs @@ -1,89 +1,41 @@ -use tonic::{transport::Channel, Status}; -use uuid::Uuid; +use operational_transform::OperationSeq; +use tokio::sync::{watch, mpsc, broadcast, Mutex}; +use tonic::async_trait; -use crate::{ - ControllerWorker, - buffer::handle::{BufferHandle, OperationControllerWorker}, - proto::{buffer_client::BufferClient, BufferPayload}, -}; +use crate::{Controller, CodempError}; +use crate::buffer::factory::{leading_noop, tailing_noop, OperationFactory}; + +use super::TextChange; -#[derive(Clone)] pub struct BufferController { - id: String, - client: BufferClient, + content: watch::Receiver, + operations: mpsc::Sender, + stream: Mutex>, } -impl From::> for BufferController { - fn from(value: BufferClient) -> Self { - BufferController { id: Uuid::new_v4().to_string(), client: value } +#[async_trait] +impl OperationFactory for BufferController { + fn content(&self) -> String { + self.content.borrow().clone() } } -impl BufferController { - pub async fn new(dest: &str) -> Result { - Ok(BufferClient::connect(dest.to_string()).await?.into()) +#[async_trait] +impl Controller for BufferController { + type Input = OperationSeq; + + async fn recv(&self) -> Result { + let op = self.stream.lock().await.recv().await?; + let after = self.content.borrow().clone(); + let skip = leading_noop(op.ops()) as usize; + let before_len = op.base_len(); + let tail = tailing_noop(op.ops()) as usize; + let span = skip..before_len-tail; + let content = after[skip..after.len()-tail].to_string(); + Ok(TextChange { span, content }) } - pub fn id(&self) -> &str { &self.id } - - pub async fn create(&mut self, path: &str, content: Option<&str>) -> Result<(), Status> { - let req = BufferPayload { - path: path.to_string(), - content: content.map(|x| x.to_string()), - user: self.id.clone(), - }; - - self.client.create(req).await?; - - Ok(()) - } - - // pub async fn listen(&mut self) -> Result { - // let req = BufferPayload { - // path: "".into(), - // content: None, - // user: self.id.clone(), - // }; - - // let stream = self.client.listen(req).await?.into_inner(); - - // let controller = CursorTrackerWorker::new(self.id().to_string()); - // let handle = controller.subscribe(); - // let client = self.client.clone(); - - // tokio::spawn(async move { - // tracing::debug!("cursor worker started"); - // controller.work(stream, client).await; - // tracing::debug!("cursor worker stopped"); - // }); - - // Ok(handle) - // } - - pub async fn attach(&mut self, path: &str) -> Result { - let req = BufferPayload { - path: path.to_string(), - content: None, - user: self.id.clone(), - }; - - let content = self.client.sync(req.clone()) - .await? - .into_inner() - .content; - - let stream = self.client.attach(req).await?.into_inner(); - - let controller = OperationControllerWorker::new(self.id().to_string(), &content, path); - let factory = controller.subscribe(); - let client = self.client.clone(); - - tokio::spawn(async move { - tracing::debug!("buffer worker started"); - controller.work(client, stream).await; - tracing::debug!("buffer worker stopped"); - }); - - Ok(factory) + async fn send(&self, op: OperationSeq) -> Result<(), CodempError> { + Ok(self.operations.send(op).await?) } } diff --git a/src/buffer/mod.rs b/src/buffer/mod.rs index 7fe80e1..922e217 100644 --- a/src/buffer/mod.rs +++ b/src/buffer/mod.rs @@ -1,3 +1,11 @@ -pub mod factory; +use std::ops::Range; + +pub(crate) mod worker; pub mod controller; -pub mod handle; +pub mod factory; + + +pub struct TextChange { + pub span: Range, + pub content: String, +} diff --git a/src/buffer/handle.rs b/src/buffer/worker.rs similarity index 64% rename from src/buffer/handle.rs rename to src/buffer/worker.rs index bbebc66..43bb008 100644 --- a/src/buffer/handle.rs +++ b/src/buffer/worker.rs @@ -1,4 +1,4 @@ -use std::{sync::Arc, collections::VecDeque, ops::Range}; +use std::{sync::Arc, collections::VecDeque}; use operational_transform::OperationSeq; use tokio::sync::{watch, mpsc, broadcast, Mutex}; @@ -7,48 +7,13 @@ use tonic::{async_trait, Streaming}; use crate::proto::{OperationRequest, RawOp}; use crate::proto::buffer_client::BufferClient; -use crate::{ControllerWorker, Controller, CodempError}; -use crate::buffer::factory::{leading_noop, tailing_noop, OperationFactory}; +use crate::ControllerWorker; -pub struct TextChange { - pub span: Range, - pub content: String, -} +use super::TextChange; +use super::controller::BufferController; -pub struct BufferHandle { - content: watch::Receiver, - operations: mpsc::Sender, - stream: Mutex>, -} -#[async_trait] -impl OperationFactory for BufferHandle { - fn content(&self) -> String { - self.content.borrow().clone() - } -} - -#[async_trait] -impl Controller for BufferHandle { - type Input = OperationSeq; - - async fn recv(&self) -> Result { - let op = self.stream.lock().await.recv().await?; - let after = self.content.borrow().clone(); - let skip = leading_noop(op.ops()) as usize; - let before_len = op.base_len(); - let tail = tailing_noop(op.ops()) as usize; - let span = skip..before_len-tail; - let content = after[skip..after.len()-tail].to_string(); - Ok(TextChange { span, content }) - } - - async fn send(&self, op: OperationSeq) -> Result<(), CodempError> { - Ok(self.operations.send(op).await?) - } -} - -pub(crate) struct OperationControllerWorker { +pub(crate) struct BufferControllerWorker { uid: String, pub(crate) content: watch::Sender, pub(crate) operations: mpsc::Receiver, @@ -60,12 +25,12 @@ pub(crate) struct OperationControllerWorker { path: String, } -impl OperationControllerWorker { +impl BufferControllerWorker { pub fn new(uid: String, buffer: &str, path: &str) -> Self { let (txt_tx, txt_rx) = watch::channel(buffer.to_string()); let (op_tx, op_rx) = mpsc::channel(64); let (s_tx, _s_rx) = broadcast::channel(64); - OperationControllerWorker { + BufferControllerWorker { uid, content: txt_tx, operations: op_rx, @@ -80,13 +45,13 @@ impl OperationControllerWorker { } #[async_trait] -impl ControllerWorker for OperationControllerWorker { - type Controller = BufferHandle; +impl ControllerWorker for BufferControllerWorker { + type Controller = BufferController; type Tx = BufferClient; type Rx = Streaming; - fn subscribe(&self) -> BufferHandle { - BufferHandle { + fn subscribe(&self) -> BufferController { + BufferController { content: self.receiver.clone(), operations: self.sender.clone(), stream: Mutex::new(self.stream.subscribe()), diff --git a/src/client.rs b/src/client.rs new file mode 100644 index 0000000..9691ee2 --- /dev/null +++ b/src/client.rs @@ -0,0 +1,117 @@ +use std::{sync::Arc, collections::BTreeMap}; + +use tonic::transport::Channel; + +use crate::{ + cursor::{worker::CursorControllerWorker, controller::CursorController}, + proto::{ + buffer_client::BufferClient, cursor_client::CursorClient, UserIdentity, BufferPayload, + }, + CodempError, ControllerWorker, buffer::{controller::BufferController, worker::BufferControllerWorker}, +}; + + +pub struct CodempClient { + id: String, + client: ServiceClients, + workspace: Option, +} + +struct ServiceClients { + buffer: BufferClient, + cursor: CursorClient, +} + +struct Workspace { + cursor: Arc, + buffers: BTreeMap>, +} + + +impl CodempClient { + pub async fn new(dst: &str) -> Result { + let buffer = BufferClient::connect(dst.to_string()).await?; + let cursor = CursorClient::connect(dst.to_string()).await?; + let id = uuid::Uuid::new_v4().to_string(); + + Ok(CodempClient { id, client: ServiceClients { buffer, cursor}, workspace: None }) + } + + pub fn get_cursor(&self) -> Option> { + Some(self.workspace?.cursor.clone()) + } + + pub fn get_buffer(&self, path: &str) -> Option> { + self.workspace?.buffers.get(path).cloned() + } + + pub async fn join(&mut self, _session: &str) -> Result, CodempError> { + // TODO there is no real workspace handling in codemp server so it behaves like one big global + // session. I'm still creating this to start laying out the proper use flow + let stream = self.client.cursor.listen(UserIdentity { id: "".into() }).await?.into_inner(); + + let controller = CursorControllerWorker::new(self.id.clone()); + let client = self.client.cursor.clone(); + + let handle = Arc::new(controller.subscribe()); + + tokio::spawn(async move { + tracing::debug!("cursor worker started"); + controller.work(client, stream).await; + tracing::debug!("cursor worker stopped"); + }); + + self.workspace = Some( + Workspace { + cursor: handle.clone(), + buffers: BTreeMap::new() + } + ); + + Ok(handle) + } + + pub async fn create(&mut self, path: &str, content: Option<&str>) -> Result<(), CodempError> { + if let Some(workspace) = &self.workspace { + self.client.buffer + .create(BufferPayload { + user: self.id.clone(), + path: path.to_string(), + content: content.map(|x| x.to_string()), + }).await?; + + Ok(()) + } else { + Err(CodempError::InvalidState { msg: "join a workspace first".into() }) + } + } + + pub async fn attach(&mut self, path: &str, content: Option<&str>) -> Result, CodempError> { + if let Some(workspace) = &mut self.workspace { + let mut client = self.client.buffer.clone(); + let req = BufferPayload { + path: path.to_string(), user: self.id.clone(), content: None + }; + + let content = client.sync(req.clone()).await?.into_inner().content; + + let stream = client.attach(req).await?.into_inner(); + + let controller = BufferControllerWorker::new(self.id.clone(), &content, path); + let handler = Arc::new(controller.subscribe()); + + let _path = path.to_string(); + tokio::spawn(async move { + tracing::debug!("buffer[{}] worker started", _path); + controller.work(client, stream).await; + tracing::debug!("buffer[{}] worker stopped", _path); + }); + + workspace.buffers.insert(path.to_string(), handler.clone()); + + Ok(handler) + } else { + Err(CodempError::InvalidState { msg: "join a workspace first".into() }) + } + } +} diff --git a/src/cursor/controller.rs b/src/cursor/controller.rs new file mode 100644 index 0000000..7e879a7 --- /dev/null +++ b/src/cursor/controller.rs @@ -0,0 +1,51 @@ +use tokio::sync::{mpsc, broadcast::{self, error::RecvError}, Mutex}; +use tonic::async_trait; + +use crate::{proto::{CursorPosition, CursorEvent}, CodempError, Controller}; + +pub struct CursorController { + uid: String, + op: mpsc::Sender, + stream: Mutex>, +} + +#[async_trait] +impl Controller for CursorController { + type Input = CursorPosition; + + async fn send(&self, cursor: CursorPosition) -> Result<(), CodempError> { + Ok(self.op.send(CursorEvent { + user: self.uid.clone(), + position: Some(cursor), + }).await?) + } + + // TODO is this cancelable? so it can be used in tokio::select! + // TODO is the result type overkill? should be an option? + async fn recv(&self) -> Result { + let mut stream = self.stream.lock().await; + match stream.recv().await { + Ok(x) => Ok(x), + Err(RecvError::Closed) => Err(CodempError::Channel { send: false }), + Err(RecvError::Lagged(n)) => { + tracing::error!("cursor channel lagged behind, skipping {} events", n); + Ok(stream.recv().await.expect("could not receive after lagging")) + } + } + } + + // fn try_poll(&self) -> Option> { + // match self.stream.try_lock() { + // Err(_) => None, + // Ok(mut x) => match x.try_recv() { + // Ok(x) => Some(Some(x)), + // Err(TryRecvError::Empty) => None, + // Err(TryRecvError::Closed) => Some(None), + // Err(TryRecvError::Lagged(n)) => { + // tracing::error!("cursor channel lagged behind, skipping {} events", n); + // Some(Some(x.try_recv().expect("could not receive after lagging"))) + // } + // } + // } + // } +} diff --git a/src/cursor/mod.rs b/src/cursor/mod.rs index 58a0abe..c6fbdd9 100644 --- a/src/cursor/mod.rs +++ b/src/cursor/mod.rs @@ -1,25 +1,26 @@ -pub mod tracker; +pub(crate) mod worker; +pub mod controller; -use crate::proto::{RowColumn, CursorPosition}; +use crate::proto::{RowCol, CursorPosition}; -impl From:: for (i32, i32) { - fn from(pos: RowColumn) -> (i32, i32) { +impl From:: for (i32, i32) { + fn from(pos: RowCol) -> (i32, i32) { (pos.row, pos.col) } } -impl From::<(i32, i32)> for RowColumn { +impl From::<(i32, i32)> for RowCol { fn from((row, col): (i32, i32)) -> Self { - RowColumn { row, col } + RowCol { row, col } } } impl CursorPosition { - pub fn start(&self) -> RowColumn { + pub fn start(&self) -> RowCol { self.start.clone().unwrap_or((0, 0).into()) } - pub fn end(&self) -> RowColumn { + pub fn end(&self) -> RowCol { self.end.clone().unwrap_or((0, 0).into()) } } diff --git a/src/cursor/tracker.rs b/src/cursor/tracker.rs deleted file mode 100644 index 6983f40..0000000 --- a/src/cursor/tracker.rs +++ /dev/null @@ -1,101 +0,0 @@ -use std::sync::Arc; - -use tokio::sync::{mpsc, broadcast::{self, error::RecvError}, Mutex}; -use tonic::{Streaming, transport::Channel, async_trait}; - -use crate::{proto::{CursorPosition, cursor_client::CursorClient, RowColumn}, errors::IgnorableError, CodempError, Controller, ControllerWorker}; - -pub struct CursorTracker { - uid: String, - op: mpsc::Sender, - stream: Mutex>, -} - -#[async_trait] -impl Controller for CursorTracker { - type Input = (String, RowColumn, RowColumn); - - async fn send(&self, (buffer, start, end): Self::Input) -> Result<(), CodempError> { - Ok(self.op.send(CursorPosition { - user: self.uid.clone(), - start: Some(start), - end: Some(end), - buffer, - }).await?) - } - - // TODO is this cancelable? so it can be used in tokio::select! - // TODO is the result type overkill? should be an option? - async fn recv(&self) -> Result { - let mut stream = self.stream.lock().await; - match stream.recv().await { - Ok(x) => Ok(x), - Err(RecvError::Closed) => Err(CodempError::Channel { send: false }), - Err(RecvError::Lagged(n)) => { - tracing::error!("cursor channel lagged behind, skipping {} events", n); - Ok(stream.recv().await.expect("could not receive after lagging")) - } - } - } - - // fn try_poll(&self) -> Option> { - // match self.stream.try_lock() { - // Err(_) => None, - // Ok(mut x) => match x.try_recv() { - // Ok(x) => Some(Some(x)), - // Err(TryRecvError::Empty) => None, - // Err(TryRecvError::Closed) => Some(None), - // Err(TryRecvError::Lagged(n)) => { - // tracing::error!("cursor channel lagged behind, skipping {} events", n); - // Some(Some(x.try_recv().expect("could not receive after lagging"))) - // } - // } - // } - // } -} - -pub(crate) struct CursorTrackerWorker { - uid: String, - producer: mpsc::Sender, - op: mpsc::Receiver, - channel: Arc>, -} - -impl CursorTrackerWorker { - pub(crate) fn new(uid: String) -> Self { - let (op_tx, op_rx) = mpsc::channel(64); - let (cur_tx, _cur_rx) = broadcast::channel(64); - Self { - uid, - producer: op_tx, - op: op_rx, - channel: Arc::new(cur_tx), - } - } -} - -#[async_trait] -impl ControllerWorker for CursorTrackerWorker { - type Controller = CursorTracker; - type Tx = CursorClient; - type Rx = Streaming; - - fn subscribe(&self) -> CursorTracker { - CursorTracker { - uid: self.uid.clone(), - op: self.producer.clone(), - stream: Mutex::new(self.channel.subscribe()), - } - } - - async fn work(mut self, mut tx: Self::Tx, mut rx: Self::Rx) { - loop { - tokio::select!{ - Ok(Some(cur)) = rx.message() => self.channel.send(cur).unwrap_or_warn("could not broadcast event"), - Some(op) = self.op.recv() => { tx.moved(op).await.unwrap_or_warn("could not update cursor"); }, - else => break, - } - } - } -} - diff --git a/src/cursor/worker.rs b/src/cursor/worker.rs new file mode 100644 index 0000000..42fa694 --- /dev/null +++ b/src/cursor/worker.rs @@ -0,0 +1,54 @@ +use std::sync::Arc; + +use tokio::sync::{mpsc, broadcast::{self}, Mutex}; +use tonic::{Streaming, transport::Channel, async_trait}; + +use crate::{proto::{cursor_client::CursorClient, CursorEvent}, errors::IgnorableError, ControllerWorker}; + +use super::controller::CursorController; + +pub(crate) struct CursorControllerWorker { + uid: String, + producer: mpsc::Sender, + op: mpsc::Receiver, + channel: Arc>, +} + +impl CursorControllerWorker { + pub(crate) fn new(uid: String) -> Self { + let (op_tx, op_rx) = mpsc::channel(64); + let (cur_tx, _cur_rx) = broadcast::channel(64); + Self { + uid, + producer: op_tx, + op: op_rx, + channel: Arc::new(cur_tx), + } + } +} + +#[async_trait] +impl ControllerWorker for CursorControllerWorker { + type Controller = CursorController; + type Tx = CursorClient; + type Rx = Streaming; + + fn subscribe(&self) -> CursorController { + CursorController { + uid: self.uid.clone(), + op: self.producer.clone(), + stream: Mutex::new(self.channel.subscribe()), + } + } + + async fn work(mut self, mut tx: Self::Tx, mut rx: Self::Rx) { + loop { + tokio::select!{ + Ok(Some(cur)) = rx.message() => self.channel.send(cur).unwrap_or_warn("could not broadcast event"), + Some(op) = self.op.recv() => { tx.moved(op).await.unwrap_or_warn("could not update cursor"); }, + else => break, + } + } + } +} + diff --git a/src/errors.rs b/src/errors.rs index b9c1f80..6b923a7 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -28,6 +28,9 @@ pub enum CodempError { Channel { send: bool }, + InvalidState { + msg: String, + }, // TODO filler error, remove later Filler { diff --git a/src/instance.rs b/src/instance.rs new file mode 100644 index 0000000..1421d29 --- /dev/null +++ b/src/instance.rs @@ -0,0 +1,81 @@ +use std::sync::Arc; + +use tokio::sync::Mutex; + +use crate::{ + buffer::controller::BufferController, + errors::CodempError, client::CodempClient, cursor::controller::CursorController, +}; + + +use tokio::runtime::Runtime; + +const CODEMP_DEFAULT_HOST : &str = "http://alemi.dev:50051"; + +lazy_static::lazy_static! { + static ref RUNTIME : Runtime = Runtime::new().expect("could not create tokio runtime"); + static ref INSTANCE : Instance = Instance::default(); +} + +pub struct Instance { + client: Mutex>, +} + +impl Default for Instance { + fn default() -> Self { + Instance { client: Mutex::new(None) } + } +} + +// TODO these methods repeat a lot of code but Mutex makes it hard to simplify + +impl Instance { + pub async fn connect(&self, addr: &str) -> Result<(), CodempError> { + *self.client.lock().await = Some(CodempClient::new(addr).await?); + Ok(()) + } + + pub async fn join(&self, session: &str) -> Result<(), CodempError> { + self.client + .lock() + .await + .as_mut() + .ok_or(CodempError::InvalidState { msg: "connect first".into() })? + .join(session) + .await?; + + Ok(()) + } + + pub async fn create(&self, path: &str, content: Option<&str>) -> Result<(), CodempError> { + self.client + .lock() + .await + .as_mut() + .ok_or(CodempError::InvalidState { msg: "connect first".into() })? + .create(path, content) + .await?; + + Ok(()) + } + + pub async fn get_cursor(&self) -> Result, CodempError> { + self.client + .lock() + .await + .as_mut() + .ok_or(CodempError::InvalidState { msg: "connect first".into() })? + .get_cursor() + .ok_or(CodempError::InvalidState { msg: "join a workspace first".into() }) + } + + pub async fn get_buffer(&self, path: &str) -> Result, CodempError> { + self.client + .lock() + .await + .as_mut() + .ok_or(CodempError::InvalidState { msg: "connect first".into() })? + .get_buffer(path) + .ok_or(CodempError::InvalidState { msg: "join a workspace or create requested buffer first".into() }) + } +} diff --git a/src/lib.rs b/src/lib.rs index e376b81..0975600 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,9 +2,11 @@ pub mod cursor; pub mod errors; pub mod buffer; -pub mod state; pub mod client; +#[cfg(feature = "static")] +pub mod instance; + pub use tonic; pub use tokio; pub use operational_transform as ot; diff --git a/src/state.rs b/src/state.rs deleted file mode 100644 index 5755896..0000000 --- a/src/state.rs +++ /dev/null @@ -1,64 +0,0 @@ -use std::{collections::BTreeMap, sync::Arc}; - -use tokio::sync::RwLock; - -use crate::{ - buffer::{controller::BufferController, handle::BufferHandle}, - errors::CodempError, -}; - - -#[cfg(feature = "static")] -pub mod instance { - use tokio::runtime::Runtime; - use super::Workspace; - - const CODEMP_DEFAULT_HOST : &str = "http://fantabos.co:50051"; - - lazy_static::lazy_static! { - static ref RUNTIME : Runtime = Runtime::new().expect("could not create tokio runtime"); - static ref WORKSPACE : Workspace = RUNTIME.block_on( - Workspace::new(&std::env::var("CODEMP_HOST").unwrap_or(CODEMP_DEFAULT_HOST.into())) - ).expect("could not create codemp workspace"); - } -} - -pub struct Workspace { - buffers: RwLock, Arc>>, - // cursor: Arc, - client: BufferController, -} - -impl Workspace { - pub async fn new(dest: &str) -> Result { - let client = BufferController::new(dest).await?; - // let cursor = Arc::new(client.listen().await?); - Ok( - Workspace { - buffers: RwLock::new(BTreeMap::new()), - // cursor, - client, - } - ) - } - - // Cursor - // pub async fn cursor(&self) -> Arc { - // self.cursor.clone() - // } - - // Buffer - pub async fn buffer(&self, path: &str) -> Option> { - self.buffers.read().await.get(path).cloned() - } - - pub async fn create(&self, path: &str, content: Option<&str>) -> Result<(), CodempError> { - Ok(self.client.clone().create(path, content).await?) - } - - pub async fn attach(&self, path: &str) -> Result<(), CodempError> { - let controller = self.client.clone().attach(path).await?; - self.buffers.write().await.insert(path.into(), Arc::new(controller)); - Ok(()) - } -}