From 3827ab066d477f3ba4f3139f66866ac8a2feed22 Mon Sep 17 00:00:00 2001 From: alemi Date: Wed, 12 Apr 2023 00:32:39 +0200 Subject: [PATCH] fix: one factory per buffer, create on attach --- src/client/nvim/main.rs | 33 +++------- src/lib/client.rs | 143 ++++++++++++++++++++-------------------- 2 files changed, 81 insertions(+), 95 deletions(-) diff --git a/src/client/nvim/main.rs b/src/client/nvim/main.rs index 1b4e10b..3e16d56 100644 --- a/src/client/nvim/main.rs +++ b/src/client/nvim/main.rs @@ -104,40 +104,19 @@ impl Handler for NeovimHandler { } }, - "sync" => { - if args.len() < 1 { - return Err(Value::from("no path given")); - } - let path = default_empty_str(&args, 0); - - let mut c = self.client.clone(); - match c.sync(path).await { - Err(e) => Err(Value::from(format!("could not sync: {}", e))), - Ok(content) => match nvim.get_current_buf().await { - Err(e) => return Err(Value::from(format!("could not get current buffer: {}", e))), - Ok(b) => { - let lines : Vec = content.split("\n").map(|x| x.to_string()).collect(); - match b.set_lines(0, -1, false, lines).await { - Err(e) => Err(Value::from(format!("failed sync: {}", e))), - Ok(()) => Ok(Value::from("synched")), - } - }, - }, - } - } - "attach" => { if args.len() < 1 { return Err(Value::from("no path given")); } let path = default_empty_str(&args, 0); - let buf = match nvim.get_current_buf().await { + let buffer = match nvim.get_current_buf().await { Ok(b) => b, Err(e) => return Err(Value::from(format!("could not get current buffer: {}", e))), }; let mut c = self.client.clone(); + let buf = buffer.clone(); match c.attach(path, move |x| { let lines : Vec = x.split("\n").map(|x| x.to_string()).collect(); let b = buf.clone(); @@ -147,8 +126,14 @@ impl Handler for NeovimHandler { } }); }).await { - Ok(()) => Ok(Value::from("spawned worker")), Err(e) => Err(Value::from(format!("could not attach to stream: {}", e))), + Ok(content) => { + let lines : Vec = content.split("\n").map(|x| x.to_string()).collect(); + if let Err(e) = buffer.set_lines(0, -1, false, lines).await { + error!("could not update buffer: {}", e); + } + Ok(Value::from("spawned worker")) + }, } }, diff --git a/src/lib/client.rs b/src/lib/client.rs index d022955..436fe89 100644 --- a/src/lib/client.rs +++ b/src/lib/client.rs @@ -1,6 +1,6 @@ /// TODO better name for this file -use std::sync::Arc; +use std::{sync::{Arc, RwLock}, collections::BTreeMap}; use tracing::{error, warn}; use uuid::Uuid; @@ -10,12 +10,14 @@ use crate::{ tonic::{transport::Channel, Status, Streaming}, }; +pub type FactoryStore = Arc>>>; + impl From::> for CodempClient { fn from(x: BufferClient) -> CodempClient { CodempClient { id: Uuid::new_v4(), client:x, - factory: Arc::new(AsyncFactory::new(None)), + factories: Arc::new(RwLock::new(BTreeMap::new())), } } } @@ -24,85 +26,85 @@ impl From::> for CodempClient { pub struct CodempClient { id: Uuid, client: BufferClient, - factory: Arc, + factories: FactoryStore, } impl CodempClient { + fn get_factory(&self, path: &String) -> Result, Status> { + match self.factories.read().unwrap().get(path) { + Some(f) => Ok(f.clone()), + None => Err(Status::not_found("no active buffer for given path")), + } + } + + pub fn add_factory(&self, path: String, factory:Arc) { + self.factories.write().unwrap().insert(path, factory); + } + pub async fn create(&mut self, path: String, content: Option) -> Result { - Ok( - self.client.create( - BufferPayload { - path, - content, - user: self.id.to_string(), - } - ) - .await? - .into_inner() - .accepted - ) + let req = BufferPayload { + path: path.clone(), + content: content.clone(), + user: self.id.to_string(), + }; + + let res = self.client.create(req).await?.into_inner(); + + Ok(res.accepted) } pub async fn insert(&mut self, path: String, txt: String, pos: u64) -> Result { - match self.factory.insert(txt, pos).await { - Ok(op) => { - Ok( - self.client.edit( - OperationRequest { - path, - hash: "".into(), - opseq: serde_json::to_string(&op).unwrap(), - user: self.id.to_string(), - } - ) - .await? - .into_inner() - .accepted - ) - }, + let factory = self.get_factory(&path)?; + match factory.insert(txt, pos).await { Err(e) => Err(Status::internal(format!("invalid operation: {}", e))), + Ok(op) => { + let req = OperationRequest { + path, + hash: "".into(), + user: self.id.to_string(), + opseq: serde_json::to_string(&op) + .map_err(|_| Status::invalid_argument("could not serialize opseq"))?, + }; + let res = self.client.edit(req).await?.into_inner(); + Ok(res.accepted) + }, } } pub async fn delete(&mut self, path: String, pos: u64, count: u64) -> Result { - match self.factory.delete(pos, count).await { - Ok(op) => { - Ok( - self.client.edit( - OperationRequest { - path, - hash: "".into(), - opseq: serde_json::to_string(&op).unwrap(), - user: self.id.to_string(), - } - ) - .await? - .into_inner() - .accepted - ) - }, + let factory = self.get_factory(&path)?; + match factory.delete(pos, count).await { Err(e) => Err(Status::internal(format!("invalid operation: {}", e))), + Ok(op) => { + let req = OperationRequest { + path, + hash: "".into(), + user: self.id.to_string(), + opseq: serde_json::to_string(&op) + .map_err(|_| Status::invalid_argument("could not serialize opseq"))?, + }; + let res = self.client.edit(req).await?.into_inner(); + Ok(res.accepted) + }, } } - pub async fn attach () + Send + 'static>(&mut self, path: String, callback: F) -> Result<(), Status> { - let stream = self.client.attach( - BufferPayload { - path, - content: None, - user: self.id.to_string(), - } - ) - .await? - .into_inner(); - - let factory = self.factory.clone(); + pub async fn attach(&mut self, path: String, callback: F) -> Result + where F : Fn(String) -> () + Send + 'static { + let content = self.sync(path.clone()).await?; + let factory = Arc::new(AsyncFactory::new(Some(content.clone()))); + self.add_factory(path.clone(), factory.clone()); + let req = BufferPayload { + path, + content: None, + user: self.id.to_string(), + }; + let stream = self.client.attach(req).await?.into_inner(); tokio::spawn(async move { Self::worker(stream, factory, callback).await } ); - - Ok(()) + Ok(content) } - pub async fn sync(&mut self, path: String) -> Result { + async fn sync(&mut self, path: String) -> Result { let res = self.client.sync( BufferPayload { path, content: None, user: self.id.to_string(), @@ -111,20 +113,19 @@ impl CodempClient { Ok(res.into_inner().content.unwrap_or("".into())) } - async fn worker ()>(mut stream: Streaming, factory: Arc, callback: F) { + async fn worker(mut stream: Streaming, factory: Arc, callback: F) + where F : Fn(String) -> () { loop { match stream.message().await { Err(e) => break error!("error receiving change: {}", e), Ok(v) => match v { None => break warn!("stream closed"), - Some(operation) => { - match serde_json::from_str(&operation.opseq) { - Err(e) => break error!("could not deserialize opseq: {}", e), - Ok(op) => match factory.process(op).await { - Err(e) => break error!("desynched: {}", e), - Ok(x) => callback(x), - }, - } + Some(operation) => match serde_json::from_str(&operation.opseq) { + Err(e) => break error!("could not deserialize opseq: {}", e), + Ok(op) => match factory.process(op).await { + Err(e) => break error!("desynched: {}", e), + Ok(x) => callback(x), + }, } }, }