feat: implemented connection and state managers

Co-authored-by: f-tlm <f-tlm@users.noreply.github.com>
This commit is contained in:
əlemi 2022-07-10 23:47:14 +02:00
parent 6e26282faf
commit c213536c3b
8 changed files with 280 additions and 30 deletions

View file

@ -5,16 +5,18 @@ edition = "2021"
[[bin]] # Bin to run the CodeMP gRPC server [[bin]] # Bin to run the CodeMP gRPC server
name = "codemp-server" name = "codemp-server"
path = "src/server.rs" path = "src/server/main.rs"
[[bin]] # Bin to run the CodeMP gRPC client [[bin]] # Bin to run the CodeMP gRPC client
name = "codemp-client" name = "codemp-client"
path = "src/client.rs" path = "src/client/main.rs"
[dependencies] [dependencies]
tonic = "0.7" tonic = "0.7"
prost = "0.10" prost = "0.10"
tokio = { version = "1.0", features = ["macros", "rt-multi-thread"] } tokio = { version = "1.0", features = ["macros", "rt-multi-thread", "sync", "full"] }
rmpv = "1"
nvim-rs = { version = "0.4", features = ["use_tokio"] } # TODO put this behind a conditional feature
[build-dependencies] [build-dependencies]
tonic-build = "0.7" tonic-build = "0.7"

87
plugin/codemp.vim Normal file
View file

@ -0,0 +1,87 @@
" Copyright 2017 Justin Charette
"
" Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
" http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
" <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
" option. This file may not be copied, modified, or distributed
" except according to those terms.
if ! exists('s:jobid')
let s:jobid = 0
endif
let s:bin = "/home/alemi/projects/codemp/target/debug/codemp-client"
function! codemp#init()
let result = s:StartJob()
if 0 == result
echoerr "codeMP: cannot start rpc process"
elseif -1 == result
echoerr "codeMP: rpc process is not executable"
else
let s:jobid = result
call s:ConfigureJob(result)
endif
endfunction
function! s:StartJob()
if 0 == s:jobid
let id = jobstart([s:bin], { 'rpc': v:true, 'on_stderr': function('s:OnStderr') })
return id
else
return 0
endif
endfunction
function! s:StopJob()
if 0 < s:jobid
augroup codeMp
autocmd! " clear all previous autocommands
augroup END
call rpcnotify(s:jobid, 'quit')
let result = jobwait(s:jobid, 500)
if -1 == result
" kill the job
call jobstop(s:jobid)
endif
" reset job id back to zero
let s:jobid = 0
endif
endfunction
function! s:ConfigureJob(jobid)
augroup codeMp
" clear all previous autocommands
autocmd!
autocmd VimLeavePre * :call s:StopJob()
autocmd InsertEnter * :call s:NotifyInsertEnter()
autocmd InsertLeave * :call s:NotifyInsertLeave()
augroup END
endfunction
function! s:NotifyInsertEnter()
let [ bufnum, lnum, column, off ] = getpos('.')
call rpcnotify(s:jobid, 'insert-enter', v:insertmode, lnum, column)
endfunction
function! s:NotifyInsertLeave()
endfunction
function! codemp#ping()
call rpcnotify(s:jobid, "ping")
endfunction
function! codemp#test()
call rpcnotify(s:jobid, "rpc")
endfunction
function! s:OnStderr(id, data, event) dict
echom 'codemp: stderr: ' . join(a:data, "\n")
endfunction

View file

@ -1,21 +0,0 @@
pub mod proto_core {
tonic::include_proto!("core");
}
use proto_core::session_client::SessionClient;
use proto_core::SessionRequest;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut client = SessionClient::connect("http://[::1]:50051").await?;
let request = tonic::Request::new(SessionRequest {
session_id: 0,
});
let response = client.create(request).await?;
println!("RESPONSE={:?}", response);
Ok(())
}

32
src/client/main.rs Normal file
View file

@ -0,0 +1,32 @@
pub mod manager;
mod nvim;
use tokio::sync::mpsc;
use nvim_rs::{compat::tokio::Compat, create::tokio::new_parent, rpc::IntoVal, Handler, Neovim, Value};
use manager::ConnectionManager;
use nvim::NeovimHandler;
#[tokio::main]
async fn main() -> Result<(), Box<(dyn std::error::Error + 'static)>> {
let (tx, rx) = mpsc::channel(32);
let mut mngr = ConnectionManager::new("http://[::1]:50051".to_string(), rx).await?;
tokio::spawn(async move {
mngr.process_packets().await
});
let handler: NeovimHandler = NeovimHandler::new(tx).await?;
let (nvim, io_handler) = new_parent(handler).await;
nvim.call(":echo", vec![Value::from("***REMOVED***")]).await.unwrap().unwrap();
// Any error should probably be logged, as stderr is not visible to users.
match io_handler.await {
Err(err) => eprintln!("Error joining IO loop: {:?}", err),
Ok(Err(err)) => eprintln!("Process ended with error: {:?}", err),
Ok(Ok(())) => println!("Finished"),
}
Ok(())
}

38
src/client/manager.rs Normal file
View file

@ -0,0 +1,38 @@
pub mod proto_core {
tonic::include_proto!("core");
}
use tonic::transport::Channel;
use proto_core::session_client::SessionClient;
use proto_core::SessionRequest;
use tokio::sync::mpsc;
#[derive(Debug)]
pub struct ConnectionManager {
client: SessionClient<Channel>,
rx: mpsc::Receiver<i32>
}
impl ConnectionManager {
pub async fn new(addr:String, outbound:mpsc::Receiver<i32>) -> Result<Self, Box<dyn std::error::Error>> {
Ok(ConnectionManager {
client: SessionClient::connect(addr).await?,
rx: outbound
})
}
pub async fn process_packets(&mut self) {
loop {
if let Some(i) = self.rx.recv().await {
let request = tonic::Request::new(SessionRequest {session_id: i});
let response = self.client.create(request).await.unwrap();
println!("RESPONSE={:?}", response);
} else {
break
}
}
}
}

95
src/client/nvim/mod.rs Normal file
View file

@ -0,0 +1,95 @@
use rmpv::Value;
use tokio::io::Stdout;
use tokio::sync::mpsc;
use nvim_rs::{compat::tokio::Compat, create::tokio::new_parent, rpc::IntoVal, Handler, Neovim};
use tonic::transport::Channel;
use crate::manager::proto_core::{session_client::SessionClient, SessionRequest};
#[derive(Clone)]
pub struct NeovimHandler {
tx: mpsc::Sender<i32>,
}
impl NeovimHandler {
pub async fn new(tx: mpsc::Sender<i32>) -> Result<Self, tonic::transport::Error> {
Ok(NeovimHandler { tx })
}
}
#[tonic::async_trait]
impl Handler for NeovimHandler {
type Writer = Compat<Stdout>;
async fn handle_request(
&self,
name: String,
_args: Vec<Value>,
_neovim: Neovim<Compat<Stdout>>,
) -> Result<Value, Value> {
match name.as_ref() {
"ping" => Ok(Value::from("pong")),
"rpc" => {
self.tx.send(0).await.unwrap();
// let request = tonic::Request::new(SessionRequest {session_id: 0});
// let response = self.client.create(request).await.unwrap();
Ok(Value::from("sent"))
},
_ => unimplemented!(),
}
}
}
pub async fn run_nvim_plugin(tx: mpsc::Sender<i32>) -> Result<(), Box<(dyn std::error::Error + 'static)>> {
let handler: NeovimHandler = NeovimHandler::new(tx).await?;
let (nvim, io_handler) = new_parent(handler).await;
let curbuf = nvim.get_current_buf().await.unwrap();
let mut envargs = std::env::args();
let _ = envargs.next();
let testfile = envargs.next().unwrap();
std::fs::write(testfile, &format!("{:?}", curbuf.into_val())).unwrap();
// Any error should probably be logged, as stderr is not visible to users.
match io_handler.await {
Err( err) => eprintln!("Error joining IO loop: '{}'", joinerr),
Ok(Err(err)) => {
if !err.is_reader_error() {
// One last try, since there wasn't an error with writing to the
// stream
nvim
.err_writeln(&format!("Error: '{}'", err))
.await
.unwrap_or_else(|e| {
// We could inspect this error to see what was happening, and
// maybe retry, but at this point it's probably best
// to assume the worst and print a friendly and
// supportive message to our users
eprintln!("Well, dang... '{}'", e);
});
}
if !err.is_channel_closed() {
// Closed channel usually means neovim quit itself, or this plugin was
// told to quit by closing the channel, so it's not always an error
// condition.
eprintln!("Error: '{}'", err);
// let mut source = err.source();
// while let Some(e) = source {
// eprintln!("Caused by: '{}'", e);
// source = e.source();
// }
}
}
Ok(Ok(())) => {}
}
Ok(())
}

View file

@ -1,3 +0,0 @@
fn main() {
println!("Hello, world!");
}

View file

@ -1,12 +1,13 @@
use tonic::{transport::Server, Request, Response, Status}; use tonic::{transport::Server, Request, Response, Status};
use proto_core::session_server::{Session, SessionServer};
use proto_core::{SessionRequest, SessionResponse};
pub mod proto_core { pub mod proto_core {
tonic::include_proto!("core"); tonic::include_proto!("core");
} }
use proto_core::session_server::{Session, SessionServer};
use proto_core::{SessionRequest, SessionResponse};
#[derive(Debug, Default)] #[derive(Debug, Default)]
pub struct TestSession {} pub struct TestSession {}
@ -38,3 +39,22 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
Ok(()) Ok(())
} }
/*
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut client = GreeterClient::connect("http://[::1]:50051").await?;
let request = tonic::Request::new(HelloRequest {
name: "Tonic".into(),
});
let response = client.say_hello(request).await?;
println!("RESPONSE={:?}", response);
Ok(())
}
*/