feat: initial work on Workspace buffer edit proto

Co-authored-by: f-tlm <f-tlm@users.noreply.github.com>
This commit is contained in:
əlemi 2022-07-21 10:40:43 +02:00
parent d5a551c76c
commit 97e9b1f737
5 changed files with 149 additions and 16 deletions

View file

@ -18,8 +18,11 @@ path = "src/client/main.rs"
[dependencies] [dependencies]
tonic = "0.7" tonic = "0.7"
prost = "0.10" prost = "0.10"
futures = "0.3"
tokio = { version = "1.0", features = ["macros", "rt-multi-thread", "sync", "full"] } tokio = { version = "1.0", features = ["macros", "rt-multi-thread", "sync", "full"] }
tokio-stream = "0.1"
rmpv = "1" rmpv = "1"
operational-transform = "0.6"
nvim-rs = { version = "0.4", features = ["use_tokio"] } # TODO put this behind a conditional feature nvim-rs = { version = "0.4", features = ["use_tokio"] } # TODO put this behind a conditional feature
[build-dependencies] [build-dependencies]

View file

@ -8,7 +8,7 @@ async fn main() -> Result<(), Box<(dyn std::error::Error + 'static)>> {
let client = WorkspaceClient::connect("http://[::1]:50051").await?; let client = WorkspaceClient::connect("http://[::1]:50051").await?;
#[cfg(feature = "nvim")] #[cfg(feature = "nvim")]
crate::nvim::run_nvim_client(client).await.unwrap(); crate::nvim::run_nvim_client(client).await?;
Ok(()) Ok(())
} }

View file

@ -10,12 +10,19 @@ use crate::proto::{SessionRequest, workspace_client::WorkspaceClient};
#[derive(Clone)] #[derive(Clone)]
pub struct NeovimHandler { pub struct NeovimHandler {
go: bool,
client: WorkspaceClient<Channel>, client: WorkspaceClient<Channel>,
} }
impl NeovimHandler { impl NeovimHandler {
pub fn new(client: WorkspaceClient<Channel>) -> Self { pub fn new(client: WorkspaceClient<Channel>) -> Self {
NeovimHandler { client } NeovimHandler { go: true, client }
}
async fn live_edit_worker(&self) {
while self.go {
}
} }
} }

View file

@ -1,26 +1,96 @@
use std::{collections::HashMap, sync::Arc}; use std::{collections::HashMap, pin::Pin, sync::Arc};
use state::AlterState; use state::AlterState;
use tonic::{transport::Server, Request, Response, Status}; use tonic::{transport::Server, Request, Response, Status};
pub mod proto { pub mod proto {
tonic::include_proto!("workspace"); tonic::include_proto!("workspace");
tonic::include_proto!("buffer");
} }
use tokio::sync::{mpsc, watch}; use tokio::sync::{mpsc, watch};
use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt}; // TODO example used this?
use proto::buffer_server::{Buffer, BufferServer};
use proto::workspace_server::{Workspace, WorkspaceServer}; use proto::workspace_server::{Workspace, WorkspaceServer};
use proto::{SessionRequest, SessionResponse}; use proto::{Operation, SessionRequest, SessionResponse};
use tonic::Streaming;
//use futures::{Stream, StreamExt};
use crate::workspace::Workspace as WorkspaceInstance; // TODO fuck! use crate::workspace::Workspace as WorkspaceInstance; // TODO fuck!
pub mod workspace;
pub mod state; pub mod state;
pub mod workspace;
type OperationStream = Pin<Box<dyn Stream<Item = Result<Operation, Status>> + Send>>;
pub struct BufferService {}
#[tonic::async_trait]
impl Buffer for BufferService {
// type ServerStreamingEchoStream = ResponseStream;
type AttachStream = OperationStream;
async fn attach(
&self,
req: Request<Streaming<Operation>>,
) -> Result<tonic::Response<OperationStream>, Status> {
println!("EchoServer::bidirectional_streaming_echo");
let mut in_stream = req.into_inner();
let (tx_og, rx) = mpsc::channel(128);
// this spawn here is required if you want to handle connection error.
// If we just map `in_stream` and write it back as `out_stream` the `out_stream`
// will be drooped when connection error occurs and error will never be propagated
// to mapped version of `in_stream`.
let tx = tx_og.clone();
tokio::spawn(async move {
while let Some(result) = in_stream.next().await {
match result {
Ok(v) => tx
.send(Ok(Operation {
action: 1,
row: 0,
column: 0,
op_id: 0,
text: None,
}))
.await
.expect("working rx"),
Err(err) => {
// if let Some(io_err) = match_for_io_error(&err) {
// if io_err.kind() == ErrorKind::BrokenPipe {
// // here you can handle special case when client
// // disconnected in unexpected way
// eprintln!("\tclient disconnected: broken pipe");
// break;
// }
// }
eprintln!("Error receiving operation from client");
match tx.send(Err(err)).await {
Ok(_) => (),
Err(_err) => break, // response was droped
}
}
}
}
println!("\tstream ended");
});
// echo just write the same data that was received
let out_stream = ReceiverStream::new(rx);
Ok(Response::new(Box::pin(out_stream) as Self::AttachStream))
}
}
#[derive(Debug)] #[derive(Debug)]
pub struct WorkspaceService { pub struct WorkspaceService {
tx: mpsc::Sender<AlterState>, tx: mpsc::Sender<AlterState>,
rx: watch::Receiver<HashMap<String, Arc<WorkspaceInstance>>> rx: watch::Receiver<HashMap<String, Arc<WorkspaceInstance>>>,
} }
#[tonic::async_trait] #[tonic::async_trait]
@ -32,16 +102,16 @@ impl Workspace for WorkspaceService {
println!("Got a request: {:?}", request); println!("Got a request: {:?}", request);
let r = request.into_inner(); let r = request.into_inner();
let w = WorkspaceInstance::new(r.session_key.clone(), r.content.unwrap_or("".to_string())); // let w = WorkspaceInstance::new(r.session_key.clone(), r.content.unwrap_or("".to_string()));
let reply = proto::SessionResponse { let reply = proto::SessionResponse {
session_key: r.session_key.clone(), session_key: r.session_key.clone(),
accepted: true, accepted: true,
content: Some(w.content.clone()), content: None, // Some(w.content.clone()),
hash: None, hash: None,
}; };
self.tx.send(AlterState::ADD{key: r.session_key.clone(), w}).await.unwrap(); // self.tx.send(AlterState::ADD{key: r.session_key.clone(), w}).await.unwrap();
Ok(Response::new(reply)) Ok(Response::new(reply))
} }
@ -93,7 +163,12 @@ impl Workspace for WorkspaceService {
let mut removed = false; let mut removed = false;
if self.rx.borrow().get(&r.session_key).is_some() { if self.rx.borrow().get(&r.session_key).is_some() {
self.tx.send(AlterState::REMOVE { key: r.session_key.clone() }).await.unwrap(); self.tx
.send(AlterState::REMOVE {
key: r.session_key.clone(),
})
.await
.unwrap();
removed = true; // TODO this is a lie! Verify it removed = true; // TODO this is a lie! Verify it
} }
@ -106,7 +181,6 @@ impl Workspace for WorkspaceService {
Ok(Response::new(reply)) Ok(Response::new(reply))
} }
} }
#[tokio::main] #[tokio::main]
@ -115,9 +189,11 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let (tx, rx) = state::run_state_manager(); let (tx, rx) = state::run_state_manager();
let greeter = WorkspaceService { tx, rx }; let greeter = WorkspaceService { tx, rx };
let processor = BufferService {};
Server::builder() Server::builder()
.add_service(WorkspaceServer::new(greeter)) .add_service(WorkspaceServer::new(greeter))
.add_service(BufferServer::new(processor))
.serve(addr) .serve(addr)
.await?; .await?;

View file

@ -1,18 +1,65 @@
use std::sync::Arc;
use operational_transform::OperationSeq;
use tokio::sync::{broadcast, mpsc};
pub struct WorkspaceView {
pub rx: broadcast::Receiver<OperationSeq>,
pub tx: mpsc::Sender<OperationSeq>,
}
// Must be clonable, containing references to the actual state maybe? Or maybe give everyone an Arc, idk // Must be clonable, containing references to the actual state maybe? Or maybe give everyone an Arc, idk
#[derive(Debug)] #[derive(Debug)]
pub struct Workspace { pub struct Workspace {
pub name: String, pub name: String,
pub content: String, pub content: String,
pub tx: mpsc::Sender<OperationSeq>,
w_tx: Arc<broadcast::Sender<OperationSeq>>,
} }
impl Workspace { impl Workspace {
pub fn new(name: String, content: String) -> Self { pub fn new(
Workspace { name , content } name: String,
content: String,
tx: mpsc::Sender<OperationSeq>,
w_tx: Arc<broadcast::Sender<OperationSeq>>,
) -> Self {
Workspace {
name,
content,
tx,
w_tx,
}
}
pub fn view(&self) -> WorkspaceView {
WorkspaceView {
rx: self.w_tx.subscribe(),
tx: self.tx.clone(),
}
} }
} }
impl Default for Workspace { pub async fn worker(
fn default() -> Self { mut w: Workspace,
Workspace { name: "fuck you".to_string() , content: "too".to_string() } tx: Arc<broadcast::Sender<OperationSeq>>,
mut rx: mpsc::Receiver<OperationSeq>,
) {
loop {
if let Some(op) = rx.recv().await {
w.content = op.apply(&w.content).unwrap();
tx.send(op).unwrap();
} else {
break;
}
} }
} }
// impl Default for Workspace {
// fn default() -> Self {
// Workspace {
// name: "fuck you".to_string(),
// content: "too".to_string(),
// }
// }
// }