From c213536c3be65359fc648e5951e0931f600c7330 Mon Sep 17 00:00:00 2001 From: alemidev Date: Sun, 10 Jul 2022 23:47:14 +0200 Subject: [PATCH] feat: implemented connection and state managers Co-authored-by: f-tlm --- Cargo.toml | 8 ++- plugin/codemp.vim | 87 ++++++++++++++++++++++++++++ src/client.rs | 21 ------- src/client/main.rs | 32 +++++++++++ src/client/manager.rs | 38 +++++++++++++ src/client/nvim/mod.rs | 95 +++++++++++++++++++++++++++++++ src/main.rs | 3 - src/{server.rs => server/main.rs} | 26 ++++++++- 8 files changed, 280 insertions(+), 30 deletions(-) create mode 100644 plugin/codemp.vim delete mode 100644 src/client.rs create mode 100644 src/client/main.rs create mode 100644 src/client/manager.rs create mode 100644 src/client/nvim/mod.rs delete mode 100644 src/main.rs rename src/{server.rs => server/main.rs} (70%) diff --git a/Cargo.toml b/Cargo.toml index c6d2b57..f75e500 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,16 +5,18 @@ edition = "2021" [[bin]] # Bin to run the CodeMP gRPC server name = "codemp-server" -path = "src/server.rs" +path = "src/server/main.rs" [[bin]] # Bin to run the CodeMP gRPC client name = "codemp-client" -path = "src/client.rs" +path = "src/client/main.rs" [dependencies] tonic = "0.7" prost = "0.10" -tokio = { version = "1.0", features = ["macros", "rt-multi-thread"] } +tokio = { version = "1.0", features = ["macros", "rt-multi-thread", "sync", "full"] } +rmpv = "1" +nvim-rs = { version = "0.4", features = ["use_tokio"] } # TODO put this behind a conditional feature [build-dependencies] tonic-build = "0.7" diff --git a/plugin/codemp.vim b/plugin/codemp.vim new file mode 100644 index 0000000..a01484a --- /dev/null +++ b/plugin/codemp.vim @@ -0,0 +1,87 @@ +" Copyright 2017 Justin Charette +" +" Licensed under the Apache License, Version 2.0 or the MIT license +" , at your +" option. This file may not be copied, modified, or distributed +" except according to those terms. + +if ! exists('s:jobid') + let s:jobid = 0 +endif + +let s:bin = "/home/alemi/projects/codemp/target/debug/codemp-client" + +function! codemp#init() + let result = s:StartJob() + + if 0 == result + echoerr "codeMP: cannot start rpc process" + elseif -1 == result + echoerr "codeMP: rpc process is not executable" + else + let s:jobid = result + call s:ConfigureJob(result) + endif +endfunction + +function! s:StartJob() + if 0 == s:jobid + let id = jobstart([s:bin], { 'rpc': v:true, 'on_stderr': function('s:OnStderr') }) + return id + else + return 0 + endif +endfunction + +function! s:StopJob() + if 0 < s:jobid + augroup codeMp + autocmd! " clear all previous autocommands + augroup END + + call rpcnotify(s:jobid, 'quit') + let result = jobwait(s:jobid, 500) + + if -1 == result + " kill the job + call jobstop(s:jobid) + endif + + " reset job id back to zero + let s:jobid = 0 + endif +endfunction + +function! s:ConfigureJob(jobid) + augroup codeMp + " clear all previous autocommands + autocmd! + + autocmd VimLeavePre * :call s:StopJob() + + autocmd InsertEnter * :call s:NotifyInsertEnter() + autocmd InsertLeave * :call s:NotifyInsertLeave() + + augroup END +endfunction + +function! s:NotifyInsertEnter() + let [ bufnum, lnum, column, off ] = getpos('.') + call rpcnotify(s:jobid, 'insert-enter', v:insertmode, lnum, column) +endfunction + +function! s:NotifyInsertLeave() +endfunction + +function! codemp#ping() + call rpcnotify(s:jobid, "ping") +endfunction + +function! codemp#test() + call rpcnotify(s:jobid, "rpc") +endfunction + +function! s:OnStderr(id, data, event) dict + echom 'codemp: stderr: ' . join(a:data, "\n") +endfunction diff --git a/src/client.rs b/src/client.rs deleted file mode 100644 index 2b7dba0..0000000 --- a/src/client.rs +++ /dev/null @@ -1,21 +0,0 @@ -pub mod proto_core { - tonic::include_proto!("core"); -} - -use proto_core::session_client::SessionClient; -use proto_core::SessionRequest; - -#[tokio::main] -async fn main() -> Result<(), Box> { - let mut client = SessionClient::connect("http://[::1]:50051").await?; - - let request = tonic::Request::new(SessionRequest { - session_id: 0, - }); - - let response = client.create(request).await?; - - println!("RESPONSE={:?}", response); - - Ok(()) -} diff --git a/src/client/main.rs b/src/client/main.rs new file mode 100644 index 0000000..7f78dcb --- /dev/null +++ b/src/client/main.rs @@ -0,0 +1,32 @@ +pub mod manager; +mod nvim; + +use tokio::sync::mpsc; +use nvim_rs::{compat::tokio::Compat, create::tokio::new_parent, rpc::IntoVal, Handler, Neovim, Value}; + +use manager::ConnectionManager; +use nvim::NeovimHandler; + + +#[tokio::main] +async fn main() -> Result<(), Box<(dyn std::error::Error + 'static)>> { + let (tx, rx) = mpsc::channel(32); + let mut mngr = ConnectionManager::new("http://[::1]:50051".to_string(), rx).await?; + tokio::spawn(async move { + mngr.process_packets().await + }); + + let handler: NeovimHandler = NeovimHandler::new(tx).await?; + let (nvim, io_handler) = new_parent(handler).await; + + nvim.call(":echo", vec![Value::from("***REMOVED***")]).await.unwrap().unwrap(); + + // Any error should probably be logged, as stderr is not visible to users. + match io_handler.await { + Err(err) => eprintln!("Error joining IO loop: {:?}", err), + Ok(Err(err)) => eprintln!("Process ended with error: {:?}", err), + Ok(Ok(())) => println!("Finished"), + } + + Ok(()) +} diff --git a/src/client/manager.rs b/src/client/manager.rs new file mode 100644 index 0000000..f45f6a3 --- /dev/null +++ b/src/client/manager.rs @@ -0,0 +1,38 @@ +pub mod proto_core { + tonic::include_proto!("core"); +} + +use tonic::transport::Channel; + +use proto_core::session_client::SessionClient; +use proto_core::SessionRequest; + +use tokio::sync::mpsc; + +#[derive(Debug)] +pub struct ConnectionManager { + client: SessionClient, + rx: mpsc::Receiver +} + + +impl ConnectionManager { + pub async fn new(addr:String, outbound:mpsc::Receiver) -> Result> { + Ok(ConnectionManager { + client: SessionClient::connect(addr).await?, + rx: outbound + }) + } + + pub async fn process_packets(&mut self) { + loop { + if let Some(i) = self.rx.recv().await { + let request = tonic::Request::new(SessionRequest {session_id: i}); + let response = self.client.create(request).await.unwrap(); + println!("RESPONSE={:?}", response); + } else { + break + } + } + } +} diff --git a/src/client/nvim/mod.rs b/src/client/nvim/mod.rs new file mode 100644 index 0000000..6216585 --- /dev/null +++ b/src/client/nvim/mod.rs @@ -0,0 +1,95 @@ +use rmpv::Value; + +use tokio::io::Stdout; +use tokio::sync::mpsc; + +use nvim_rs::{compat::tokio::Compat, create::tokio::new_parent, rpc::IntoVal, Handler, Neovim}; +use tonic::transport::Channel; + +use crate::manager::proto_core::{session_client::SessionClient, SessionRequest}; + + +#[derive(Clone)] +pub struct NeovimHandler { + tx: mpsc::Sender, +} + +impl NeovimHandler { + pub async fn new(tx: mpsc::Sender) -> Result { + Ok(NeovimHandler { tx }) + } +} + +#[tonic::async_trait] +impl Handler for NeovimHandler { + type Writer = Compat; + + async fn handle_request( + &self, + name: String, + _args: Vec, + _neovim: Neovim>, + ) -> Result { + match name.as_ref() { + "ping" => Ok(Value::from("pong")), + "rpc" => { + self.tx.send(0).await.unwrap(); + // let request = tonic::Request::new(SessionRequest {session_id: 0}); + // let response = self.client.create(request).await.unwrap(); + Ok(Value::from("sent")) + }, + _ => unimplemented!(), + } + } +} + +pub async fn run_nvim_plugin(tx: mpsc::Sender) -> Result<(), Box<(dyn std::error::Error + 'static)>> { + let handler: NeovimHandler = NeovimHandler::new(tx).await?; + let (nvim, io_handler) = new_parent(handler).await; + let curbuf = nvim.get_current_buf().await.unwrap(); + + let mut envargs = std::env::args(); + let _ = envargs.next(); + let testfile = envargs.next().unwrap(); + + std::fs::write(testfile, &format!("{:?}", curbuf.into_val())).unwrap(); + + + // Any error should probably be logged, as stderr is not visible to users. + match io_handler.await { + Err( err) => eprintln!("Error joining IO loop: '{}'", joinerr), + Ok(Err(err)) => { + if !err.is_reader_error() { + // One last try, since there wasn't an error with writing to the + // stream + nvim + .err_writeln(&format!("Error: '{}'", err)) + .await + .unwrap_or_else(|e| { + // We could inspect this error to see what was happening, and + // maybe retry, but at this point it's probably best + // to assume the worst and print a friendly and + // supportive message to our users + eprintln!("Well, dang... '{}'", e); + }); + } + + if !err.is_channel_closed() { + // Closed channel usually means neovim quit itself, or this plugin was + // told to quit by closing the channel, so it's not always an error + // condition. + eprintln!("Error: '{}'", err); + + // let mut source = err.source(); + + // while let Some(e) = source { + // eprintln!("Caused by: '{}'", e); + // source = e.source(); + // } + } + } + Ok(Ok(())) => {} + } + + Ok(()) +} diff --git a/src/main.rs b/src/main.rs deleted file mode 100644 index a30eb95..0000000 --- a/src/main.rs +++ /dev/null @@ -1,3 +0,0 @@ -fn main() { - println!("Hello, world!"); -} diff --git a/src/server.rs b/src/server/main.rs similarity index 70% rename from src/server.rs rename to src/server/main.rs index b72fbe2..0de536e 100644 --- a/src/server.rs +++ b/src/server/main.rs @@ -1,12 +1,13 @@ use tonic::{transport::Server, Request, Response, Status}; -use proto_core::session_server::{Session, SessionServer}; -use proto_core::{SessionRequest, SessionResponse}; - pub mod proto_core { tonic::include_proto!("core"); } +use proto_core::session_server::{Session, SessionServer}; +use proto_core::{SessionRequest, SessionResponse}; + + #[derive(Debug, Default)] pub struct TestSession {} @@ -38,3 +39,22 @@ async fn main() -> Result<(), Box> { Ok(()) } + +/* + +#[tokio::main] +async fn main() -> Result<(), Box> { + let mut client = GreeterClient::connect("http://[::1]:50051").await?; + + let request = tonic::Request::new(HelloRequest { + name: "Tonic".into(), + }); + + let response = client.say_hello(request).await?; + + println!("RESPONSE={:?}", response); + + Ok(()) +} + +*/