feat: added sync rpc

This commit is contained in:
əlemi 2023-04-11 22:35:10 +02:00
parent 228f6a54f0
commit b891c0d2f0
4 changed files with 67 additions and 23 deletions

View file

@ -5,6 +5,7 @@ service Buffer {
rpc Attach (BufferPayload) returns (stream RawOp); rpc Attach (BufferPayload) returns (stream RawOp);
rpc Edit (OperationRequest) returns (BufferResponse); rpc Edit (OperationRequest) returns (BufferResponse);
rpc Create (BufferPayload) returns (BufferResponse); rpc Create (BufferPayload) returns (BufferResponse);
rpc Sync (BufferPayload) returns (BufferResponse);
} }
message RawOp { message RawOp {
@ -27,4 +28,5 @@ message BufferPayload {
message BufferResponse { message BufferResponse {
bool accepted = 1; bool accepted = 1;
optional string content = 2;
} }

View file

@ -10,7 +10,7 @@ use clap::Parser;
use nvim_rs::{compat::tokio::Compat, create::tokio as create, Handler, Neovim}; use nvim_rs::{compat::tokio::Compat, create::tokio as create, Handler, Neovim};
use tonic::async_trait; use tonic::async_trait;
use tracing::{error, warn, debug}; use tracing::{error, warn, debug, info};
#[derive(Clone)] #[derive(Clone)]
struct NeovimHandler { struct NeovimHandler {
@ -47,7 +47,7 @@ impl Handler for NeovimHandler {
match name.as_ref() { match name.as_ref() {
"ping" => Ok(Value::from("pong")), "ping" => Ok(Value::from("pong")),
"dump" => Ok(Value::from(self.client.content())), "error" => Err(Value::from("user-requested error")),
"create" => { "create" => {
if args.len() < 1 { if args.len() < 1 {
@ -72,12 +72,15 @@ impl Handler for NeovimHandler {
let path = default_empty_str(&args, 0); let path = default_empty_str(&args, 0);
let txt = default_empty_str(&args, 1); let txt = default_empty_str(&args, 1);
let pos = default_zero_number(&args, 2); let pos = default_zero_number(&args, 2);
let mut c = self.client.clone(); let mut c = self.client.clone();
info!("correctly parsed arguments: {} - {} - {}", path, txt, pos);
match c.insert(path, txt, pos).await { match c.insert(path, txt, pos).await {
Ok(res) => match res { Ok(res) => {
info!("RPC 'insert' completed");
match res {
true => Ok(Value::from("accepted")), true => Ok(Value::from("accepted")),
false => Err(Value::from("rejected")), false => Err(Value::from("rejected")),
}
}, },
Err(e) => Err(Value::from(format!("could not send insert: {}", e))), Err(e) => Err(Value::from(format!("could not send insert: {}", e))),
} }
@ -101,6 +104,28 @@ impl Handler for NeovimHandler {
} }
}, },
"sync" => {
if args.len() < 1 {
return Err(Value::from("no path given"));
}
let path = default_empty_str(&args, 0);
let mut c = self.client.clone();
match c.sync(path).await {
Err(e) => Err(Value::from(format!("could not sync: {}", e))),
Ok(content) => match nvim.get_current_buf().await {
Err(e) => return Err(Value::from(format!("could not get current buffer: {}", e))),
Ok(b) => {
let lines : Vec<String> = content.split("\n").map(|x| x.to_string()).collect();
match b.set_lines(0, -1, false, lines).await {
Err(e) => Err(Value::from(format!("failed sync: {}", e))),
Ok(()) => Ok(Value::from("synched")),
}
},
},
}
}
"attach" => { "attach" => {
if args.len() < 1 { if args.len() < 1 {
return Err(Value::from("no path given")); return Err(Value::from("no path given"));
@ -117,7 +142,7 @@ impl Handler for NeovimHandler {
let lines : Vec<String> = x.split("\n").map(|x| x.to_string()).collect(); let lines : Vec<String> = x.split("\n").map(|x| x.to_string()).collect();
let b = buf.clone(); let b = buf.clone();
tokio::spawn(async move { tokio::spawn(async move {
if let Err(e) = b.set_lines(0, lines.len() as i64, false, lines).await { if let Err(e) = b.set_lines(0, -1, false, lines).await {
error!("could not update buffer: {}", e); error!("could not update buffer: {}", e);
} }
}); });
@ -157,18 +182,19 @@ struct CliArgs {
async fn main() -> Result<(), Box<dyn std::error::Error>> { async fn main() -> Result<(), Box<dyn std::error::Error>> {
let args = CliArgs::parse(); let args = CliArgs::parse();
let sub = tracing_subscriber::fmt() let sub = tracing_subscriber::fmt();
.compact() match TcpStream::connect("127.0.0.1:6969") { // TODO get rid of this
.without_time()
.with_ansi(false);
match TcpStream::connect("127.0.0.1:6969") {
Ok(stream) => { Ok(stream) => {
sub.with_writer(Mutex::new(stream)) sub.with_writer(Mutex::new(stream))
.with_max_level(if args.debug { tracing::Level::DEBUG } else { tracing::Level::INFO }) .with_max_level(if args.debug { tracing::Level::DEBUG } else { tracing::Level::INFO })
.init(); .init();
}, },
Err(_) => { Err(_) => {
sub.with_writer(std::io::stderr) sub
.compact()
.without_time()
.with_ansi(false)
.with_writer(std::io::stderr)
.with_max_level(if args.debug { tracing::Level::DEBUG } else { tracing::Level::INFO }) .with_max_level(if args.debug { tracing::Level::DEBUG } else { tracing::Level::INFO })
.init(); .init();
}, },

View file

@ -20,6 +20,7 @@ pub struct BufferHandle {
pub edit: mpsc::Sender<OperationRequest>, pub edit: mpsc::Sender<OperationRequest>,
events: broadcast::Sender<RawOp>, events: broadcast::Sender<RawOp>,
pub digest: watch::Receiver<Digest>, pub digest: watch::Receiver<Digest>,
pub content: watch::Receiver<String>,
} }
impl BufferHandle { impl BufferHandle {
@ -28,15 +29,17 @@ impl BufferHandle {
let (edits_tx, edits_rx) = mpsc::channel(64); // TODO hardcoded size let (edits_tx, edits_rx) = mpsc::channel(64); // TODO hardcoded size
let (events_tx, _events_rx) = broadcast::channel(64); // TODO hardcoded size let (events_tx, _events_rx) = broadcast::channel(64); // TODO hardcoded size
let (digest_tx, digest_rx) = watch::channel(md5::compute(&init_val)); let (digest_tx, digest_rx) = watch::channel(md5::compute(&init_val));
let (content_tx, content_rx) = watch::channel(init_val.clone());
let events_tx_clone = events_tx.clone(); let events_tx_clone = events_tx.clone();
tokio::spawn(async move { tokio::spawn(async move {
let worker = BufferWorker { let worker = BufferWorker {
content: init_val, store: init_val,
edits: edits_rx, edits: edits_rx,
events: events_tx_clone, events: events_tx_clone,
digest: digest_tx, digest: digest_tx,
content: content_tx,
}; };
worker.work().await worker.work().await
}); });
@ -45,20 +48,21 @@ impl BufferHandle {
edit: edits_tx, edit: edits_tx,
events: events_tx, events: events_tx,
digest: digest_rx, digest: digest_rx,
content: content_rx,
} }
} }
pub fn subscribe(&self) -> broadcast::Receiver<RawOp> { pub fn subscribe(&self) -> broadcast::Receiver<RawOp> {
self.events.subscribe() self.events.subscribe()
} }
} }
struct BufferWorker { struct BufferWorker {
content: String, store: String,
edits: mpsc::Receiver<OperationRequest>, edits: mpsc::Receiver<OperationRequest>,
events: broadcast::Sender<RawOp>, events: broadcast::Sender<RawOp>,
digest: watch::Sender<Digest>, digest: watch::Sender<Digest>,
content: watch::Sender<String>,
} }
impl BufferWorker { impl BufferWorker {
@ -68,10 +72,11 @@ impl BufferWorker {
None => break, None => break,
Some(v) => { Some(v) => {
let op : OperationSeq = serde_json::from_str(&v.opseq).unwrap(); let op : OperationSeq = serde_json::from_str(&v.opseq).unwrap();
match op.apply(&self.content) { match op.apply(&self.store) {
Ok(res) => { Ok(res) => {
self.content = res; self.store = res;
self.digest.send(md5::compute(&self.content)).unwrap(); self.digest.send(md5::compute(&self.store)).unwrap();
self.content.send(self.store.clone()).unwrap();
let msg = RawOp { let msg = RawOp {
opseq: v.opseq, opseq: v.opseq,
user: v.user user: v.user
@ -80,7 +85,7 @@ impl BufferWorker {
error!("could not broadcast OpSeq: {}", e); error!("could not broadcast OpSeq: {}", e);
} }
}, },
Err(e) => error!("coult not apply OpSeq '{:?}' on '{}' : {}", v, self.content, e), Err(e) => error!("coult not apply OpSeq '{:?}' on '{}' : {}", v, self.store, e),
} }
}, },
} }

View file

@ -78,17 +78,28 @@ impl Buffer for BufferService {
}; };
info!("sending edit to buffer: {}", request.opseq); info!("sending edit to buffer: {}", request.opseq);
tx.send(request).await.unwrap(); tx.send(request).await.unwrap();
Ok(Response::new(BufferResponse { accepted: true })) Ok(Response::new(BufferResponse { accepted: true, content: None }))
} }
async fn create(&self, req:Request<BufferPayload>) -> Result<Response<BufferResponse>, Status> { async fn create(&self, req:Request<BufferPayload>) -> Result<Response<BufferResponse>, Status> {
let request = req.into_inner(); let request = req.into_inner();
let _handle = self.map.write().unwrap().handle(request.path, request.content); let _handle = self.map.write().unwrap().handle(request.path, request.content);
info!("created new buffer"); info!("created new buffer");
let answ = BufferResponse { accepted: true }; let answ = BufferResponse { accepted: true, content: None };
Ok(Response::new(answ)) Ok(Response::new(answ))
} }
async fn sync(&self, req: Request<BufferPayload>) -> Result<Response<BufferResponse>, Status> {
let request = req.into_inner();
match self.map.read().unwrap().get(&request.path) {
None => Err(Status::not_found("requested buffer does not exist")),
Some(buf) => {
info!("synching buffer");
let answ = BufferResponse { accepted: true, content: Some(buf.content.borrow().clone()) };
Ok(Response::new(answ))
}
}
}
} }
impl BufferService { impl BufferService {