From b891c0d2f082a8e99dac6c2b8168334301dd696b Mon Sep 17 00:00:00 2001 From: alemi Date: Tue, 11 Apr 2023 22:35:10 +0200 Subject: [PATCH] feat: added sync rpc --- proto/buffer.proto | 2 ++ src/client/nvim/main.rs | 52 +++++++++++++++++++++++++++--------- src/server/buffer/actor.rs | 19 ++++++++----- src/server/buffer/service.rs | 17 +++++++++--- 4 files changed, 67 insertions(+), 23 deletions(-) diff --git a/proto/buffer.proto b/proto/buffer.proto index 36c26ee..3a322f5 100644 --- a/proto/buffer.proto +++ b/proto/buffer.proto @@ -5,6 +5,7 @@ service Buffer { rpc Attach (BufferPayload) returns (stream RawOp); rpc Edit (OperationRequest) returns (BufferResponse); rpc Create (BufferPayload) returns (BufferResponse); + rpc Sync (BufferPayload) returns (BufferResponse); } message RawOp { @@ -27,4 +28,5 @@ message BufferPayload { message BufferResponse { bool accepted = 1; + optional string content = 2; } diff --git a/src/client/nvim/main.rs b/src/client/nvim/main.rs index 7a8d392..1b4e10b 100644 --- a/src/client/nvim/main.rs +++ b/src/client/nvim/main.rs @@ -10,7 +10,7 @@ use clap::Parser; use nvim_rs::{compat::tokio::Compat, create::tokio as create, Handler, Neovim}; use tonic::async_trait; -use tracing::{error, warn, debug}; +use tracing::{error, warn, debug, info}; #[derive(Clone)] struct NeovimHandler { @@ -47,7 +47,7 @@ impl Handler for NeovimHandler { match name.as_ref() { "ping" => Ok(Value::from("pong")), - "dump" => Ok(Value::from(self.client.content())), + "error" => Err(Value::from("user-requested error")), "create" => { if args.len() < 1 { @@ -72,12 +72,15 @@ impl Handler for NeovimHandler { let path = default_empty_str(&args, 0); let txt = default_empty_str(&args, 1); let pos = default_zero_number(&args, 2); - let mut c = self.client.clone(); + info!("correctly parsed arguments: {} - {} - {}", path, txt, pos); match c.insert(path, txt, pos).await { - Ok(res) => match res { - true => Ok(Value::from("accepted")), - false => Err(Value::from("rejected")), + Ok(res) => { + info!("RPC 'insert' completed"); + match res { + true => Ok(Value::from("accepted")), + false => Err(Value::from("rejected")), + } }, Err(e) => Err(Value::from(format!("could not send insert: {}", e))), } @@ -101,6 +104,28 @@ impl Handler for NeovimHandler { } }, + "sync" => { + if args.len() < 1 { + return Err(Value::from("no path given")); + } + let path = default_empty_str(&args, 0); + + let mut c = self.client.clone(); + match c.sync(path).await { + Err(e) => Err(Value::from(format!("could not sync: {}", e))), + Ok(content) => match nvim.get_current_buf().await { + Err(e) => return Err(Value::from(format!("could not get current buffer: {}", e))), + Ok(b) => { + let lines : Vec = content.split("\n").map(|x| x.to_string()).collect(); + match b.set_lines(0, -1, false, lines).await { + Err(e) => Err(Value::from(format!("failed sync: {}", e))), + Ok(()) => Ok(Value::from("synched")), + } + }, + }, + } + } + "attach" => { if args.len() < 1 { return Err(Value::from("no path given")); @@ -117,7 +142,7 @@ impl Handler for NeovimHandler { let lines : Vec = x.split("\n").map(|x| x.to_string()).collect(); let b = buf.clone(); tokio::spawn(async move { - if let Err(e) = b.set_lines(0, lines.len() as i64, false, lines).await { + if let Err(e) = b.set_lines(0, -1, false, lines).await { error!("could not update buffer: {}", e); } }); @@ -157,18 +182,19 @@ struct CliArgs { async fn main() -> Result<(), Box> { let args = CliArgs::parse(); - let sub = tracing_subscriber::fmt() - .compact() - .without_time() - .with_ansi(false); - match TcpStream::connect("127.0.0.1:6969") { + let sub = tracing_subscriber::fmt(); + match TcpStream::connect("127.0.0.1:6969") { // TODO get rid of this Ok(stream) => { sub.with_writer(Mutex::new(stream)) .with_max_level(if args.debug { tracing::Level::DEBUG } else { tracing::Level::INFO }) .init(); }, Err(_) => { - sub.with_writer(std::io::stderr) + sub + .compact() + .without_time() + .with_ansi(false) + .with_writer(std::io::stderr) .with_max_level(if args.debug { tracing::Level::DEBUG } else { tracing::Level::INFO }) .init(); }, diff --git a/src/server/buffer/actor.rs b/src/server/buffer/actor.rs index 0b67c07..ddbc1fb 100644 --- a/src/server/buffer/actor.rs +++ b/src/server/buffer/actor.rs @@ -20,6 +20,7 @@ pub struct BufferHandle { pub edit: mpsc::Sender, events: broadcast::Sender, pub digest: watch::Receiver, + pub content: watch::Receiver, } impl BufferHandle { @@ -28,15 +29,17 @@ impl BufferHandle { let (edits_tx, edits_rx) = mpsc::channel(64); // TODO hardcoded size let (events_tx, _events_rx) = broadcast::channel(64); // TODO hardcoded size let (digest_tx, digest_rx) = watch::channel(md5::compute(&init_val)); + let (content_tx, content_rx) = watch::channel(init_val.clone()); let events_tx_clone = events_tx.clone(); tokio::spawn(async move { let worker = BufferWorker { - content: init_val, + store: init_val, edits: edits_rx, events: events_tx_clone, digest: digest_tx, + content: content_tx, }; worker.work().await }); @@ -45,20 +48,21 @@ impl BufferHandle { edit: edits_tx, events: events_tx, digest: digest_rx, + content: content_rx, } } pub fn subscribe(&self) -> broadcast::Receiver { self.events.subscribe() - } } struct BufferWorker { - content: String, + store: String, edits: mpsc::Receiver, events: broadcast::Sender, digest: watch::Sender, + content: watch::Sender, } impl BufferWorker { @@ -68,10 +72,11 @@ impl BufferWorker { None => break, Some(v) => { let op : OperationSeq = serde_json::from_str(&v.opseq).unwrap(); - match op.apply(&self.content) { + match op.apply(&self.store) { Ok(res) => { - self.content = res; - self.digest.send(md5::compute(&self.content)).unwrap(); + self.store = res; + self.digest.send(md5::compute(&self.store)).unwrap(); + self.content.send(self.store.clone()).unwrap(); let msg = RawOp { opseq: v.opseq, user: v.user @@ -80,7 +85,7 @@ impl BufferWorker { error!("could not broadcast OpSeq: {}", e); } }, - Err(e) => error!("coult not apply OpSeq '{:?}' on '{}' : {}", v, self.content, e), + Err(e) => error!("coult not apply OpSeq '{:?}' on '{}' : {}", v, self.store, e), } }, } diff --git a/src/server/buffer/service.rs b/src/server/buffer/service.rs index ba9adf9..a7f894b 100644 --- a/src/server/buffer/service.rs +++ b/src/server/buffer/service.rs @@ -78,17 +78,28 @@ impl Buffer for BufferService { }; info!("sending edit to buffer: {}", request.opseq); tx.send(request).await.unwrap(); - Ok(Response::new(BufferResponse { accepted: true })) + Ok(Response::new(BufferResponse { accepted: true, content: None })) } async fn create(&self, req:Request) -> Result, Status> { let request = req.into_inner(); let _handle = self.map.write().unwrap().handle(request.path, request.content); info!("created new buffer"); - let answ = BufferResponse { accepted: true }; + let answ = BufferResponse { accepted: true, content: None }; Ok(Response::new(answ)) } - + + async fn sync(&self, req: Request) -> Result, Status> { + let request = req.into_inner(); + match self.map.read().unwrap().get(&request.path) { + None => Err(Status::not_found("requested buffer does not exist")), + Some(buf) => { + info!("synching buffer"); + let answ = BufferResponse { accepted: true, content: Some(buf.content.borrow().clone()) }; + Ok(Response::new(answ)) + } + } + } } impl BufferService {