diff --git a/src/client/nvim/main.rs b/src/client/nvim/main.rs index 0b6662e..f5651a6 100644 --- a/src/client/nvim/main.rs +++ b/src/client/nvim/main.rs @@ -1,6 +1,8 @@ -use std::{net::TcpStream, sync::Mutex}; +use std::{net::TcpStream, sync::Mutex, collections::BTreeMap}; -use codemp::client::CodempClient; +use codemp::cursor::CursorController; +use codemp::operation::OperationController; +use codemp::{client::CodempClient, operation::OperationProcessor}; use codemp::proto::buffer_client::BufferClient; use rmpv::Value; @@ -9,12 +11,13 @@ use tokio::io::Stdout; use clap::Parser; use nvim_rs::{compat::tokio::Compat, create::tokio as create, Handler, Neovim}; -use tonic::async_trait; use tracing::{error, warn, debug, info}; #[derive(Clone)] struct NeovimHandler { client: CodempClient, + factories: BTreeMap, + cursor: Option, } fn nullable_optional_str(args: &Vec, index: usize) -> Option { @@ -33,7 +36,7 @@ fn default_zero_number(args: &Vec, index: usize) -> i64 { nullable_optional_number(args, index).unwrap_or(0) } -#[async_trait] +#[tonic::async_trait] impl Handler for NeovimHandler { type Writer = Compat; diff --git a/src/lib/client.rs b/src/lib/client.rs index d92e6b5..0ab4938 100644 --- a/src/lib/client.rs +++ b/src/lib/client.rs @@ -1,33 +1,38 @@ -use std::{future::Future, sync::Arc, pin::Pin}; +use std::sync::Arc; use operational_transform::OperationSeq; -use tokio::sync::mpsc; -use tonic::{transport::{Channel, Error}, Status}; -use tracing::error; +use tonic::{transport::Channel, Status}; +use tracing::{error, warn}; +use uuid::Uuid; -use crate::{proto::{buffer_client::BufferClient, BufferPayload, OperationRequest, RawOp, CursorMov}, opfactory::AsyncFactory}; - -pub trait EditorDriver : Clone { - fn id(&self) -> String; -} +use crate::{ + cursor::{CursorController, CursorStorage}, + operation::{OperationController, OperationProcessor}, + proto::{buffer_client::BufferClient, BufferPayload, OperationRequest}, +}; #[derive(Clone)] -pub struct CodempClient { +pub struct CodempClient { + id: String, client: BufferClient, - driver: T, } -impl CodempClient { // TODO wrap tonic 'connect' to allow multiple types - pub async fn new(addr: &str, driver: T) -> Result { - let client = BufferClient::connect(addr.to_string()).await?; +impl From::> for CodempClient { + fn from(value: BufferClient) -> Self { + CodempClient { id: Uuid::new_v4().to_string(), client: value } + } +} + +impl CodempClient { + pub fn new(id: String, client: BufferClient) -> Self { + CodempClient { id, client } - Ok(CodempClient { client, driver }) } - pub async fn create_buffer(&mut self, path: String, content: Option) -> Result { + pub async fn create(&mut self, path: String, content: Option) -> Result { let req = BufferPayload { path, content, - user: self.driver.id(), + user: self.id.clone(), }; let res = self.client.create(req).await?; @@ -35,10 +40,36 @@ impl CodempClient { // TODO wrap tonic 'connect' to allow m Ok(res.into_inner().accepted) } - pub async fn attach_buffer(&mut self, path: String) -> Result, Status> { + pub async fn listen(&mut self) -> Result, Status> { let req = BufferPayload { - path, content: None, - user: self.driver.id(), + path: "".into(), + content: None, + user: self.id.clone(), + }; + + let mut stream = self.client.listen(req).await?.into_inner(); + + let controller = Arc::new(CursorController::new()); + + let _controller = controller.clone(); + tokio::spawn(async move { + loop { + match stream.message().await { + Err(e) => break error!("error receiving cursor: {}", e), + Ok(None) => break, + Ok(Some(x)) => { _controller.update(x); }, + } + } + }); + + Ok(controller) + } + + pub async fn attach(&mut self, path: String) -> Result, Status> { + let req = BufferPayload { + path: path.clone(), + content: None, + user: self.id.clone(), }; let content = self.client.sync(req.clone()) @@ -48,33 +79,50 @@ impl CodempClient { // TODO wrap tonic 'connect' to allow m let mut stream = self.client.attach(req).await?.into_inner(); - let factory = Arc::new(AsyncFactory::new(content)); - - let (tx, mut rx) = mpsc::channel(64); + let factory = Arc::new(OperationController::new(content.unwrap_or("".into()))); + let _factory = factory.clone(); tokio::spawn(async move { loop { match stream.message().await { - Err(e) => break error!("error receiving update: {}", e), - Ok(None) => break, + Err(e) => break error!("error receiving update: {}", e), + Ok(None) => break, // clean exit Ok(Some(x)) => match serde_json::from_str::(&x.opseq) { Err(e) => break error!("error deserializing opseq: {}", e), - Ok(v) => match factory.process(v).await { + Ok(v) => match _factory.process(v).await { Err(e) => break error!("could not apply operation from server: {}", e), - Ok(txt) => { // send back txt - } + Ok(_txt) => { } } }, } } }); + let mut _client = self.client.clone(); + let _uid = self.id.clone(); + let _factory = factory.clone(); + let _path = path.clone(); tokio::spawn(async move { - while let Some(op) = rx.recv().await { - + while let Some(op) = _factory.poll().await { + let req = OperationRequest { + hash: "".into(), + opseq: serde_json::to_string(&op).unwrap(), + path: _path.clone(), + user: _uid.clone(), + }; + match _client.edit(req).await { + Ok(res) => match res.into_inner().accepted { + true => { _factory.ack().await; }, + false => { + warn!("server rejected operation, retrying in 1s"); + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + } + }, + Err(e) => error!("could not send edit: {}", e), + } } }); - Ok(tx) + Ok(factory) } }