From 4f43573aa035bd0752852de92527c7172effeb84 Mon Sep 17 00:00:00 2001 From: alemi Date: Tue, 11 Apr 2023 06:20:40 +0200 Subject: [PATCH] feat: basic nvim RPC client --- Cargo.toml | 5 +- src/client/nvim/client.rs | 104 ++++++++++++++++++++++++++++ src/client/nvim/codemp.lua | 24 +++++++ src/client/nvim/main.rs | 137 +++++++++++++++++++++++++++++++------ src/lib/opfactory.rs | 17 +++-- 5 files changed, 258 insertions(+), 29 deletions(-) create mode 100644 src/client/nvim/client.rs create mode 100644 src/client/nvim/codemp.lua diff --git a/Cargo.toml b/Cargo.toml index 0c9f819..98e3c18 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -39,11 +39,12 @@ md5 = "0.7.0" prost = "0.11.8" clap = { version = "4.2.1", features = ["derive"], optional = true } nvim-rs = { version = "0.5", features = ["use_tokio"], optional = true } +uuid = { version = "1.3.1", features = ["v4"] } [build-dependencies] tonic-build = "0.9" [features] -default = [] +default = ["nvim"] cli = ["dep:clap"] -nvim = ["dep:nvim-rs"] +nvim = ["dep:nvim-rs", "dep:clap"] diff --git a/src/client/nvim/client.rs b/src/client/nvim/client.rs new file mode 100644 index 0000000..2fab297 --- /dev/null +++ b/src/client/nvim/client.rs @@ -0,0 +1,104 @@ +use std::sync::{Arc, Mutex}; + +use codemp::{proto::{buffer_client::BufferClient, BufferPayload, RawOp, OperationRequest}, tonic::{transport::Channel, Status, Streaming}, opfactory::OperationFactory}; +use tracing::{error, warn}; +use uuid::Uuid; + +type FactoryHandle = Arc>; + +impl From::> for CodempClient { + fn from(x: BufferClient) -> CodempClient { + CodempClient { + id: Uuid::new_v4(), + client:x, + factory: Arc::new(Mutex::new(OperationFactory::new(None))) + } + } +} + +#[derive(Clone)] +pub struct CodempClient { + id: Uuid, + client: BufferClient, + factory: FactoryHandle, // TODO less jank solution than Arc +} + +impl CodempClient { + pub async fn create(&mut self, path: String, content: Option) -> Result { + Ok( + self.client.create( + BufferPayload { + path, + content, + user: self.id.to_string(), + } + ) + .await? + .into_inner() + .accepted + ) + } + + pub async fn insert(&mut self, path: String, txt: String, pos: u64) -> Result { + let res = { self.factory.lock().unwrap().insert(&txt, pos) }; + match res { + Ok(op) => { + Ok( + self.client.edit( + OperationRequest { + path, + hash: "".into(), + opseq: serde_json::to_string(&op).unwrap(), + user: self.id.to_string(), + } + ) + .await? + .into_inner() + .accepted + ) + }, + Err(e) => Err(Status::internal(format!("invalid operation: {}", e))), + } + } + + pub async fn attach () + Send + 'static>(&mut self, path: String, callback: F) -> Result<(), Status> { + let stream = self.client.attach( + BufferPayload { + path, + content: None, + user: self.id.to_string(), + } + ) + .await? + .into_inner(); + + let factory = self.factory.clone(); + tokio::spawn(async move { Self::worker(stream, factory, callback).await } ); + + Ok(()) + } + + async fn worker ()>(mut stream: Streaming, factory: FactoryHandle, callback: F) { + loop { + match stream.message().await { + Ok(v) => match v { + Some(operation) => { + let op = serde_json::from_str(&operation.opseq).unwrap(); + let res = { factory.lock().unwrap().process(op) }; + match res { + Ok(x) => callback(x), + Err(e) => break error!("desynched: {}", e), + } + } + None => break warn!("stream closed"), + }, + Err(e) => break error!("error receiving change: {}", e), + } + } + } + + pub fn content(&self) -> String { + let factory = self.factory.lock().unwrap(); + factory.content() + } +} diff --git a/src/client/nvim/codemp.lua b/src/client/nvim/codemp.lua new file mode 100644 index 0000000..3b55d74 --- /dev/null +++ b/src/client/nvim/codemp.lua @@ -0,0 +1,24 @@ +local BINARY = "/home/alemi/projects/codemp/target/debug/client-nvim --debug" + +if vim.g.codemp_jobid == nil then + vim.g.codemp_jobid = vim.fn.jobstart(BINARY, { rpc = true }) +end + +local M = {} +M.create = function(path, content) return vim.rpcrequest(vim.g.codemp_jobid, "create", path, content) end +M.insert = function(path, txt, pos) return vim.rpcrequest(vim.g.codemp_jobid, "insert", path, txt, pos) end +M.dump = function() return vim.rpcrequest(vim.g.codemp_jobid, "dump") end +M.attach = function(path) + vim.api.nvim_create_autocmd( + { "InsertCharPre" }, + { + callback = function() + local cursor = vim.api.nvim_win_get_cursor(0) + M.insert(path, vim.v.char, cursor[2]) + end, + } + ) + return vim.rpcrequest(vim.g.codemp_jobid, "attach", path) +end + +return M diff --git a/src/client/nvim/main.rs b/src/client/nvim/main.rs index f71ec8b..51f2331 100644 --- a/src/client/nvim/main.rs +++ b/src/client/nvim/main.rs @@ -1,19 +1,28 @@ -//! A basic example. Mainly for use in a test, but also shows off some basic -//! functionality. -use std::{env, error::Error, fs}; +mod client; +use std::error::Error; + +use client::CodempClient; +use codemp::proto::buffer_client::BufferClient; use rmpv::Value; + use tokio::io::Stdout; +use clap::Parser; use nvim_rs::{ - compat::tokio::Compat, create::tokio as create, rpc::IntoVal, Handler, Neovim, + compat::tokio::Compat, create::tokio as create, Handler, Neovim, }; use tonic::async_trait; +use tracing::{error, warn, debug}; #[derive(Clone)] struct NeovimHandler { - + client: CodempClient, +} + +fn nullable_optional_str(args: &Vec, index: usize) -> Option { + Some(args.get(index)?.as_str()?.to_string()) } #[async_trait] @@ -28,34 +37,116 @@ impl Handler for NeovimHandler { ) -> Result { match name.as_ref() { "ping" => Ok(Value::from("pong")), - _ => unimplemented!(), + + "dump" => Ok(Value::from(self.client.content())), + + "create" => { + if args.len() < 1 { + return Err(Value::from("no path given")); + } + let path = args.get(0).unwrap().as_str().unwrap().into(); + let content = nullable_optional_str(&args, 1); + let mut c = self.client.clone(); + match c.create(path, content).await { + Ok(r) => match r { + true => Ok(Value::from("accepted")), + false => Err(Value::from("rejected")), + }, + Err(e) => Err(Value::from(format!("could not create buffer: {}", e))), + } + }, + + "insert" => { + if args.len() < 3 { + return Err(Value::from("not enough arguments")); + } + let path = args.get(0).unwrap().as_str().unwrap().into(); + let txt = args.get(1).unwrap().as_str().unwrap().into(); + let pos = args.get(2).unwrap().as_u64().unwrap(); + + let mut c = self.client.clone(); + match c.insert(path, txt, pos).await { + Ok(res) => match res { + true => Ok(Value::from("accepted")), + false => Err(Value::from("rejected")), + }, + Err(e) => Err(Value::from(format!("could not send insert: {}", e))), + } + }, + + "attach" => { + if args.len() < 1 { + return Err(Value::from("no path given")); + } + let path = args.get(0).unwrap().as_str().unwrap().into(); + let buf = nvim.get_current_buf().await.unwrap(); + let mut c = self.client.clone(); + + match c.attach(path, move |x| { + let lines : Vec = x.split("\n").map(|x| x.to_string()).collect(); + let b = buf.clone(); + tokio::spawn(async move { + if let Err(e) = b.set_lines(0, lines.len() as i64, false, lines).await { + error!("could not update buffer: {}", e); + } + }); + }).await { + Ok(()) => Ok(Value::from("spawned worker")), + Err(e) => Err(Value::from(format!("could not attach to stream: {}", e))), + } + }, + + _ => Err(Value::from("unimplemented")), } } async fn handle_notify( &self, - name: String, - args: Vec, - nvim: Neovim>, + _name: String, + _args: Vec, + _nvim: Neovim>, ) { + warn!("notify not handled"); } } +#[derive(Parser, Debug)] +struct CliArgs { + /// server host to connect to + #[arg(long, default_value = "http://[::1]:50051")] + host: String, + + /// show debug level logs + #[arg(long, default_value_t = false)] + debug: bool, +} + + #[tokio::main] -async fn main() { - let handler: NeovimHandler = NeovimHandler {}; +async fn main() -> Result<(), tonic::transport::Error> { + + let args = CliArgs::parse(); + + tracing_subscriber::fmt() + .compact() + .without_time() + .with_ansi(false) + .with_writer(std::io::stderr) + .with_max_level(if args.debug { tracing::Level::DEBUG } else { tracing::Level::INFO }) + .init(); + + let client = BufferClient::connect(args.host).await?; + debug!("client connected"); + + let handler: NeovimHandler = NeovimHandler { + client: client.into(), + }; + let (nvim, io_handler) = create::new_parent(handler).await; - let curbuf = nvim.get_current_buf().await.unwrap(); - - let mut envargs = env::args(); - let _ = envargs.next(); - let testfile = envargs.next().unwrap(); - - fs::write(testfile, &format!("{:?}", curbuf.into_val())).unwrap(); // Any error should probably be logged, as stderr is not visible to users. match io_handler.await { - Err(joinerr) => eprintln!("Error joining IO loop: '{}'", joinerr), + Err(joinerr) => error!("Error joining IO loop: '{}'", joinerr), Ok(Err(err)) => { if !err.is_reader_error() { // One last try, since there wasn't an error with writing to the @@ -68,7 +159,7 @@ async fn main() { // maybe retry, but at this point it's probably best // to assume the worst and print a friendly and // supportive message to our users - eprintln!("Well, dang... '{}'", e); + error!("Well, dang... '{}'", e); }); } @@ -76,16 +167,18 @@ async fn main() { // Closed channel usually means neovim quit itself, or this plugin was // told to quit by closing the channel, so it's not always an error // condition. - eprintln!("Error: '{}'", err); + error!("Error: '{}'", err); let mut source = err.source(); while let Some(e) = source { - eprintln!("Caused by: '{}'", e); + error!("Caused by: '{}'", e); source = e.source(); } } } Ok(Ok(())) => {} } + + Ok(()) } diff --git a/src/lib/opfactory.rs b/src/lib/opfactory.rs index 2743eff..b327120 100644 --- a/src/lib/opfactory.rs +++ b/src/lib/opfactory.rs @@ -1,4 +1,5 @@ use operational_transform::{OperationSeq, OTError}; +use tracing::{debug, info}; #[derive(Clone)] @@ -11,6 +12,11 @@ impl OperationFactory { OperationFactory { content: init.unwrap_or(String::new()) } } + // TODO remove the need for this + pub fn content(&self) -> String { + self.content.clone() + } + pub fn check(&self, txt: &str) -> bool { self.content == txt } @@ -25,10 +31,11 @@ impl OperationFactory { } pub fn insert(&mut self, txt: &str, pos: u64) -> Result { + info!("inserting {} at {}", txt, pos); let mut out = OperationSeq::default(); out.retain(pos); out.insert(txt); - self.content = out.apply(&self.content)?; // TODO does aplying mutate the OpSeq itself? + self.content = out.apply(&self.content)?; // TODO does applying mutate the OpSeq itself? Ok(out) } @@ -36,7 +43,7 @@ impl OperationFactory { let mut out = OperationSeq::default(); out.retain(pos - count); out.delete(count); - self.content = out.apply(&self.content)?; // TODO does aplying mutate the OpSeq itself? + self.content = out.apply(&self.content)?; // TODO does applying mutate the OpSeq itself? Ok(out) } @@ -44,13 +51,13 @@ impl OperationFactory { let mut out = OperationSeq::default(); out.retain(pos); out.delete(count); - self.content = out.apply(&self.content)?; // TODO does aplying mutate the OpSeq itself? + self.content = out.apply(&self.content)?; // TODO does applying mutate the OpSeq itself? Ok(out) } - pub fn process(&mut self, op: OperationSeq) -> Result<(), OTError> { + pub fn process(&mut self, op: OperationSeq) -> Result { self.content = op.apply(&self.content)?; - Ok(()) + Ok(self.content.clone()) } }