mirror of
https://github.com/hexedtech/codemp.git
synced 2024-11-22 15:24:48 +01:00
chore: rewrote the codemp client using new traits
This commit is contained in:
parent
eafbc41bd1
commit
35935d88a4
2 changed files with 86 additions and 35 deletions
|
@ -1,6 +1,8 @@
|
||||||
use std::{net::TcpStream, sync::Mutex};
|
use std::{net::TcpStream, sync::Mutex, collections::BTreeMap};
|
||||||
|
|
||||||
use codemp::client::CodempClient;
|
use codemp::cursor::CursorController;
|
||||||
|
use codemp::operation::OperationController;
|
||||||
|
use codemp::{client::CodempClient, operation::OperationProcessor};
|
||||||
use codemp::proto::buffer_client::BufferClient;
|
use codemp::proto::buffer_client::BufferClient;
|
||||||
use rmpv::Value;
|
use rmpv::Value;
|
||||||
|
|
||||||
|
@ -9,12 +11,13 @@ use tokio::io::Stdout;
|
||||||
use clap::Parser;
|
use clap::Parser;
|
||||||
|
|
||||||
use nvim_rs::{compat::tokio::Compat, create::tokio as create, Handler, Neovim};
|
use nvim_rs::{compat::tokio::Compat, create::tokio as create, Handler, Neovim};
|
||||||
use tonic::async_trait;
|
|
||||||
use tracing::{error, warn, debug, info};
|
use tracing::{error, warn, debug, info};
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
struct NeovimHandler {
|
struct NeovimHandler {
|
||||||
client: CodempClient,
|
client: CodempClient,
|
||||||
|
factories: BTreeMap<String, OperationController>,
|
||||||
|
cursor: Option<CursorController>,
|
||||||
}
|
}
|
||||||
|
|
||||||
fn nullable_optional_str(args: &Vec<Value>, index: usize) -> Option<String> {
|
fn nullable_optional_str(args: &Vec<Value>, index: usize) -> Option<String> {
|
||||||
|
@ -33,7 +36,7 @@ fn default_zero_number(args: &Vec<Value>, index: usize) -> i64 {
|
||||||
nullable_optional_number(args, index).unwrap_or(0)
|
nullable_optional_number(args, index).unwrap_or(0)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[tonic::async_trait]
|
||||||
impl Handler for NeovimHandler {
|
impl Handler for NeovimHandler {
|
||||||
type Writer = Compat<Stdout>;
|
type Writer = Compat<Stdout>;
|
||||||
|
|
||||||
|
|
|
@ -1,33 +1,38 @@
|
||||||
use std::{future::Future, sync::Arc, pin::Pin};
|
use std::sync::Arc;
|
||||||
|
|
||||||
use operational_transform::OperationSeq;
|
use operational_transform::OperationSeq;
|
||||||
use tokio::sync::mpsc;
|
use tonic::{transport::Channel, Status};
|
||||||
use tonic::{transport::{Channel, Error}, Status};
|
use tracing::{error, warn};
|
||||||
use tracing::error;
|
use uuid::Uuid;
|
||||||
|
|
||||||
use crate::{proto::{buffer_client::BufferClient, BufferPayload, OperationRequest, RawOp, CursorMov}, opfactory::AsyncFactory};
|
use crate::{
|
||||||
|
cursor::{CursorController, CursorStorage},
|
||||||
pub trait EditorDriver : Clone {
|
operation::{OperationController, OperationProcessor},
|
||||||
fn id(&self) -> String;
|
proto::{buffer_client::BufferClient, BufferPayload, OperationRequest},
|
||||||
}
|
};
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct CodempClient<T : EditorDriver> {
|
pub struct CodempClient {
|
||||||
|
id: String,
|
||||||
client: BufferClient<Channel>,
|
client: BufferClient<Channel>,
|
||||||
driver: T,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T : EditorDriver> CodempClient<T> { // TODO wrap tonic 'connect' to allow multiple types
|
impl From::<BufferClient<Channel>> for CodempClient {
|
||||||
pub async fn new(addr: &str, driver: T) -> Result<Self, Error> {
|
fn from(value: BufferClient<Channel>) -> Self {
|
||||||
let client = BufferClient::connect(addr.to_string()).await?;
|
CodempClient { id: Uuid::new_v4().to_string(), client: value }
|
||||||
|
}
|
||||||
Ok(CodempClient { client, driver })
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn create_buffer(&mut self, path: String, content: Option<String>) -> Result<bool, Status> {
|
impl CodempClient {
|
||||||
|
pub fn new(id: String, client: BufferClient<Channel>) -> Self {
|
||||||
|
CodempClient { id, client }
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn create(&mut self, path: String, content: Option<String>) -> Result<bool, Status> {
|
||||||
let req = BufferPayload {
|
let req = BufferPayload {
|
||||||
path, content,
|
path, content,
|
||||||
user: self.driver.id(),
|
user: self.id.clone(),
|
||||||
};
|
};
|
||||||
|
|
||||||
let res = self.client.create(req).await?;
|
let res = self.client.create(req).await?;
|
||||||
|
@ -35,10 +40,36 @@ impl<T : EditorDriver> CodempClient<T> { // TODO wrap tonic 'connect' to allow m
|
||||||
Ok(res.into_inner().accepted)
|
Ok(res.into_inner().accepted)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn attach_buffer(&mut self, path: String) -> Result<mpsc::Sender<OperationSeq>, Status> {
|
pub async fn listen(&mut self) -> Result<Arc<CursorController>, Status> {
|
||||||
let req = BufferPayload {
|
let req = BufferPayload {
|
||||||
path, content: None,
|
path: "".into(),
|
||||||
user: self.driver.id(),
|
content: None,
|
||||||
|
user: self.id.clone(),
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut stream = self.client.listen(req).await?.into_inner();
|
||||||
|
|
||||||
|
let controller = Arc::new(CursorController::new());
|
||||||
|
|
||||||
|
let _controller = controller.clone();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
loop {
|
||||||
|
match stream.message().await {
|
||||||
|
Err(e) => break error!("error receiving cursor: {}", e),
|
||||||
|
Ok(None) => break,
|
||||||
|
Ok(Some(x)) => { _controller.update(x); },
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
Ok(controller)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn attach(&mut self, path: String) -> Result<Arc<OperationController>, Status> {
|
||||||
|
let req = BufferPayload {
|
||||||
|
path: path.clone(),
|
||||||
|
content: None,
|
||||||
|
user: self.id.clone(),
|
||||||
};
|
};
|
||||||
|
|
||||||
let content = self.client.sync(req.clone())
|
let content = self.client.sync(req.clone())
|
||||||
|
@ -48,33 +79,50 @@ impl<T : EditorDriver> CodempClient<T> { // TODO wrap tonic 'connect' to allow m
|
||||||
|
|
||||||
let mut stream = self.client.attach(req).await?.into_inner();
|
let mut stream = self.client.attach(req).await?.into_inner();
|
||||||
|
|
||||||
let factory = Arc::new(AsyncFactory::new(content));
|
let factory = Arc::new(OperationController::new(content.unwrap_or("".into())));
|
||||||
|
|
||||||
let (tx, mut rx) = mpsc::channel(64);
|
|
||||||
|
|
||||||
|
let _factory = factory.clone();
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
loop {
|
loop {
|
||||||
match stream.message().await {
|
match stream.message().await {
|
||||||
Err(e) => break error!("error receiving update: {}", e),
|
Err(e) => break error!("error receiving update: {}", e),
|
||||||
Ok(None) => break,
|
Ok(None) => break, // clean exit
|
||||||
Ok(Some(x)) => match serde_json::from_str::<OperationSeq>(&x.opseq) {
|
Ok(Some(x)) => match serde_json::from_str::<OperationSeq>(&x.opseq) {
|
||||||
Err(e) => break error!("error deserializing opseq: {}", e),
|
Err(e) => break error!("error deserializing opseq: {}", e),
|
||||||
Ok(v) => match factory.process(v).await {
|
Ok(v) => match _factory.process(v).await {
|
||||||
Err(e) => break error!("could not apply operation from server: {}", e),
|
Err(e) => break error!("could not apply operation from server: {}", e),
|
||||||
Ok(txt) => { // send back txt
|
Ok(_txt) => { }
|
||||||
}
|
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
let mut _client = self.client.clone();
|
||||||
|
let _uid = self.id.clone();
|
||||||
|
let _factory = factory.clone();
|
||||||
|
let _path = path.clone();
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
while let Some(op) = rx.recv().await {
|
while let Some(op) = _factory.poll().await {
|
||||||
|
let req = OperationRequest {
|
||||||
|
hash: "".into(),
|
||||||
|
opseq: serde_json::to_string(&op).unwrap(),
|
||||||
|
path: _path.clone(),
|
||||||
|
user: _uid.clone(),
|
||||||
|
};
|
||||||
|
match _client.edit(req).await {
|
||||||
|
Ok(res) => match res.into_inner().accepted {
|
||||||
|
true => { _factory.ack().await; },
|
||||||
|
false => {
|
||||||
|
warn!("server rejected operation, retrying in 1s");
|
||||||
|
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Err(e) => error!("could not send edit: {}", e),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
Ok(tx)
|
Ok(factory)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue