feat: basic nvim RPC client

This commit is contained in:
əlemi 2023-04-11 06:20:40 +02:00
parent 665b8ea2e0
commit 4f43573aa0
5 changed files with 258 additions and 29 deletions

View file

@ -39,11 +39,12 @@ md5 = "0.7.0"
prost = "0.11.8" prost = "0.11.8"
clap = { version = "4.2.1", features = ["derive"], optional = true } clap = { version = "4.2.1", features = ["derive"], optional = true }
nvim-rs = { version = "0.5", features = ["use_tokio"], optional = true } nvim-rs = { version = "0.5", features = ["use_tokio"], optional = true }
uuid = { version = "1.3.1", features = ["v4"] }
[build-dependencies] [build-dependencies]
tonic-build = "0.9" tonic-build = "0.9"
[features] [features]
default = [] default = ["nvim"]
cli = ["dep:clap"] cli = ["dep:clap"]
nvim = ["dep:nvim-rs"] nvim = ["dep:nvim-rs", "dep:clap"]

104
src/client/nvim/client.rs Normal file
View file

@ -0,0 +1,104 @@
use std::sync::{Arc, Mutex};
use codemp::{proto::{buffer_client::BufferClient, BufferPayload, RawOp, OperationRequest}, tonic::{transport::Channel, Status, Streaming}, opfactory::OperationFactory};
use tracing::{error, warn};
use uuid::Uuid;
type FactoryHandle = Arc<Mutex<OperationFactory>>;
impl From::<BufferClient<Channel>> for CodempClient {
fn from(x: BufferClient<Channel>) -> CodempClient {
CodempClient {
id: Uuid::new_v4(),
client:x,
factory: Arc::new(Mutex::new(OperationFactory::new(None)))
}
}
}
#[derive(Clone)]
pub struct CodempClient {
id: Uuid,
client: BufferClient<Channel>,
factory: FactoryHandle, // TODO less jank solution than Arc<Mutex>
}
impl CodempClient {
pub async fn create(&mut self, path: String, content: Option<String>) -> Result<bool, Status> {
Ok(
self.client.create(
BufferPayload {
path,
content,
user: self.id.to_string(),
}
)
.await?
.into_inner()
.accepted
)
}
pub async fn insert(&mut self, path: String, txt: String, pos: u64) -> Result<bool, Status> {
let res = { self.factory.lock().unwrap().insert(&txt, pos) };
match res {
Ok(op) => {
Ok(
self.client.edit(
OperationRequest {
path,
hash: "".into(),
opseq: serde_json::to_string(&op).unwrap(),
user: self.id.to_string(),
}
)
.await?
.into_inner()
.accepted
)
},
Err(e) => Err(Status::internal(format!("invalid operation: {}", e))),
}
}
pub async fn attach<F : Fn(String) -> () + Send + 'static>(&mut self, path: String, callback: F) -> Result<(), Status> {
let stream = self.client.attach(
BufferPayload {
path,
content: None,
user: self.id.to_string(),
}
)
.await?
.into_inner();
let factory = self.factory.clone();
tokio::spawn(async move { Self::worker(stream, factory, callback).await } );
Ok(())
}
async fn worker<F : Fn(String) -> ()>(mut stream: Streaming<RawOp>, factory: FactoryHandle, callback: F) {
loop {
match stream.message().await {
Ok(v) => match v {
Some(operation) => {
let op = serde_json::from_str(&operation.opseq).unwrap();
let res = { factory.lock().unwrap().process(op) };
match res {
Ok(x) => callback(x),
Err(e) => break error!("desynched: {}", e),
}
}
None => break warn!("stream closed"),
},
Err(e) => break error!("error receiving change: {}", e),
}
}
}
pub fn content(&self) -> String {
let factory = self.factory.lock().unwrap();
factory.content()
}
}

View file

@ -0,0 +1,24 @@
local BINARY = "/home/alemi/projects/codemp/target/debug/client-nvim --debug"
if vim.g.codemp_jobid == nil then
vim.g.codemp_jobid = vim.fn.jobstart(BINARY, { rpc = true })
end
local M = {}
M.create = function(path, content) return vim.rpcrequest(vim.g.codemp_jobid, "create", path, content) end
M.insert = function(path, txt, pos) return vim.rpcrequest(vim.g.codemp_jobid, "insert", path, txt, pos) end
M.dump = function() return vim.rpcrequest(vim.g.codemp_jobid, "dump") end
M.attach = function(path)
vim.api.nvim_create_autocmd(
{ "InsertCharPre" },
{
callback = function()
local cursor = vim.api.nvim_win_get_cursor(0)
M.insert(path, vim.v.char, cursor[2])
end,
}
)
return vim.rpcrequest(vim.g.codemp_jobid, "attach", path)
end
return M

View file

@ -1,19 +1,28 @@
//! A basic example. Mainly for use in a test, but also shows off some basic mod client;
//! functionality.
use std::{env, error::Error, fs};
use std::error::Error;
use client::CodempClient;
use codemp::proto::buffer_client::BufferClient;
use rmpv::Value; use rmpv::Value;
use tokio::io::Stdout; use tokio::io::Stdout;
use clap::Parser;
use nvim_rs::{ use nvim_rs::{
compat::tokio::Compat, create::tokio as create, rpc::IntoVal, Handler, Neovim, compat::tokio::Compat, create::tokio as create, Handler, Neovim,
}; };
use tonic::async_trait; use tonic::async_trait;
use tracing::{error, warn, debug};
#[derive(Clone)] #[derive(Clone)]
struct NeovimHandler { struct NeovimHandler {
client: CodempClient,
}
fn nullable_optional_str(args: &Vec<Value>, index: usize) -> Option<String> {
Some(args.get(index)?.as_str()?.to_string())
} }
#[async_trait] #[async_trait]
@ -28,34 +37,116 @@ impl Handler for NeovimHandler {
) -> Result<Value, Value> { ) -> Result<Value, Value> {
match name.as_ref() { match name.as_ref() {
"ping" => Ok(Value::from("pong")), "ping" => Ok(Value::from("pong")),
_ => unimplemented!(),
"dump" => Ok(Value::from(self.client.content())),
"create" => {
if args.len() < 1 {
return Err(Value::from("no path given"));
}
let path = args.get(0).unwrap().as_str().unwrap().into();
let content = nullable_optional_str(&args, 1);
let mut c = self.client.clone();
match c.create(path, content).await {
Ok(r) => match r {
true => Ok(Value::from("accepted")),
false => Err(Value::from("rejected")),
},
Err(e) => Err(Value::from(format!("could not create buffer: {}", e))),
}
},
"insert" => {
if args.len() < 3 {
return Err(Value::from("not enough arguments"));
}
let path = args.get(0).unwrap().as_str().unwrap().into();
let txt = args.get(1).unwrap().as_str().unwrap().into();
let pos = args.get(2).unwrap().as_u64().unwrap();
let mut c = self.client.clone();
match c.insert(path, txt, pos).await {
Ok(res) => match res {
true => Ok(Value::from("accepted")),
false => Err(Value::from("rejected")),
},
Err(e) => Err(Value::from(format!("could not send insert: {}", e))),
}
},
"attach" => {
if args.len() < 1 {
return Err(Value::from("no path given"));
}
let path = args.get(0).unwrap().as_str().unwrap().into();
let buf = nvim.get_current_buf().await.unwrap();
let mut c = self.client.clone();
match c.attach(path, move |x| {
let lines : Vec<String> = x.split("\n").map(|x| x.to_string()).collect();
let b = buf.clone();
tokio::spawn(async move {
if let Err(e) = b.set_lines(0, lines.len() as i64, false, lines).await {
error!("could not update buffer: {}", e);
}
});
}).await {
Ok(()) => Ok(Value::from("spawned worker")),
Err(e) => Err(Value::from(format!("could not attach to stream: {}", e))),
}
},
_ => Err(Value::from("unimplemented")),
} }
} }
async fn handle_notify( async fn handle_notify(
&self, &self,
name: String, _name: String,
args: Vec<Value>, _args: Vec<Value>,
nvim: Neovim<Compat<Stdout>>, _nvim: Neovim<Compat<Stdout>>,
) { ) {
warn!("notify not handled");
} }
} }
#[derive(Parser, Debug)]
struct CliArgs {
/// server host to connect to
#[arg(long, default_value = "http://[::1]:50051")]
host: String,
/// show debug level logs
#[arg(long, default_value_t = false)]
debug: bool,
}
#[tokio::main] #[tokio::main]
async fn main() { async fn main() -> Result<(), tonic::transport::Error> {
let handler: NeovimHandler = NeovimHandler {};
let args = CliArgs::parse();
tracing_subscriber::fmt()
.compact()
.without_time()
.with_ansi(false)
.with_writer(std::io::stderr)
.with_max_level(if args.debug { tracing::Level::DEBUG } else { tracing::Level::INFO })
.init();
let client = BufferClient::connect(args.host).await?;
debug!("client connected");
let handler: NeovimHandler = NeovimHandler {
client: client.into(),
};
let (nvim, io_handler) = create::new_parent(handler).await; let (nvim, io_handler) = create::new_parent(handler).await;
let curbuf = nvim.get_current_buf().await.unwrap();
let mut envargs = env::args();
let _ = envargs.next();
let testfile = envargs.next().unwrap();
fs::write(testfile, &format!("{:?}", curbuf.into_val())).unwrap();
// Any error should probably be logged, as stderr is not visible to users. // Any error should probably be logged, as stderr is not visible to users.
match io_handler.await { match io_handler.await {
Err(joinerr) => eprintln!("Error joining IO loop: '{}'", joinerr), Err(joinerr) => error!("Error joining IO loop: '{}'", joinerr),
Ok(Err(err)) => { Ok(Err(err)) => {
if !err.is_reader_error() { if !err.is_reader_error() {
// One last try, since there wasn't an error with writing to the // One last try, since there wasn't an error with writing to the
@ -68,7 +159,7 @@ async fn main() {
// maybe retry, but at this point it's probably best // maybe retry, but at this point it's probably best
// to assume the worst and print a friendly and // to assume the worst and print a friendly and
// supportive message to our users // supportive message to our users
eprintln!("Well, dang... '{}'", e); error!("Well, dang... '{}'", e);
}); });
} }
@ -76,16 +167,18 @@ async fn main() {
// Closed channel usually means neovim quit itself, or this plugin was // Closed channel usually means neovim quit itself, or this plugin was
// told to quit by closing the channel, so it's not always an error // told to quit by closing the channel, so it's not always an error
// condition. // condition.
eprintln!("Error: '{}'", err); error!("Error: '{}'", err);
let mut source = err.source(); let mut source = err.source();
while let Some(e) = source { while let Some(e) = source {
eprintln!("Caused by: '{}'", e); error!("Caused by: '{}'", e);
source = e.source(); source = e.source();
} }
} }
} }
Ok(Ok(())) => {} Ok(Ok(())) => {}
} }
Ok(())
} }

View file

@ -1,4 +1,5 @@
use operational_transform::{OperationSeq, OTError}; use operational_transform::{OperationSeq, OTError};
use tracing::{debug, info};
#[derive(Clone)] #[derive(Clone)]
@ -11,6 +12,11 @@ impl OperationFactory {
OperationFactory { content: init.unwrap_or(String::new()) } OperationFactory { content: init.unwrap_or(String::new()) }
} }
// TODO remove the need for this
pub fn content(&self) -> String {
self.content.clone()
}
pub fn check(&self, txt: &str) -> bool { pub fn check(&self, txt: &str) -> bool {
self.content == txt self.content == txt
} }
@ -25,10 +31,11 @@ impl OperationFactory {
} }
pub fn insert(&mut self, txt: &str, pos: u64) -> Result<OperationSeq, OTError> { pub fn insert(&mut self, txt: &str, pos: u64) -> Result<OperationSeq, OTError> {
info!("inserting {} at {}", txt, pos);
let mut out = OperationSeq::default(); let mut out = OperationSeq::default();
out.retain(pos); out.retain(pos);
out.insert(txt); out.insert(txt);
self.content = out.apply(&self.content)?; // TODO does aplying mutate the OpSeq itself? self.content = out.apply(&self.content)?; // TODO does applying mutate the OpSeq itself?
Ok(out) Ok(out)
} }
@ -36,7 +43,7 @@ impl OperationFactory {
let mut out = OperationSeq::default(); let mut out = OperationSeq::default();
out.retain(pos - count); out.retain(pos - count);
out.delete(count); out.delete(count);
self.content = out.apply(&self.content)?; // TODO does aplying mutate the OpSeq itself? self.content = out.apply(&self.content)?; // TODO does applying mutate the OpSeq itself?
Ok(out) Ok(out)
} }
@ -44,13 +51,13 @@ impl OperationFactory {
let mut out = OperationSeq::default(); let mut out = OperationSeq::default();
out.retain(pos); out.retain(pos);
out.delete(count); out.delete(count);
self.content = out.apply(&self.content)?; // TODO does aplying mutate the OpSeq itself? self.content = out.apply(&self.content)?; // TODO does applying mutate the OpSeq itself?
Ok(out) Ok(out)
} }
pub fn process(&mut self, op: OperationSeq) -> Result<(), OTError> { pub fn process(&mut self, op: OperationSeq) -> Result<String, OTError> {
self.content = op.apply(&self.content)?; self.content = op.apply(&self.content)?;
Ok(()) Ok(self.content.clone())
} }
} }