diff --git a/Cargo.toml b/Cargo.toml index 84b8da7..9b5c310 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,8 +18,11 @@ path = "src/client/main.rs" [dependencies] tonic = "0.7" prost = "0.10" +futures = "0.3" tokio = { version = "1.0", features = ["macros", "rt-multi-thread", "sync", "full"] } +tokio-stream = "0.1" rmpv = "1" +operational-transform = "0.6" nvim-rs = { version = "0.4", features = ["use_tokio"] } # TODO put this behind a conditional feature [build-dependencies] diff --git a/src/client/main.rs b/src/client/main.rs index dabdf72..e033cfe 100644 --- a/src/client/main.rs +++ b/src/client/main.rs @@ -8,7 +8,7 @@ async fn main() -> Result<(), Box<(dyn std::error::Error + 'static)>> { let client = WorkspaceClient::connect("http://[::1]:50051").await?; #[cfg(feature = "nvim")] - crate::nvim::run_nvim_client(client).await.unwrap(); + crate::nvim::run_nvim_client(client).await?; Ok(()) } diff --git a/src/client/nvim/mod.rs b/src/client/nvim/mod.rs index b94684c..b29ee74 100644 --- a/src/client/nvim/mod.rs +++ b/src/client/nvim/mod.rs @@ -10,12 +10,19 @@ use crate::proto::{SessionRequest, workspace_client::WorkspaceClient}; #[derive(Clone)] pub struct NeovimHandler { + go: bool, client: WorkspaceClient, } impl NeovimHandler { pub fn new(client: WorkspaceClient) -> Self { - NeovimHandler { client } + NeovimHandler { go: true, client } + } + + async fn live_edit_worker(&self) { + while self.go { + + } } } diff --git a/src/server/main.rs b/src/server/main.rs index 53f5054..84d6f51 100644 --- a/src/server/main.rs +++ b/src/server/main.rs @@ -1,26 +1,96 @@ -use std::{collections::HashMap, sync::Arc}; +use std::{collections::HashMap, pin::Pin, sync::Arc}; use state::AlterState; use tonic::{transport::Server, Request, Response, Status}; pub mod proto { tonic::include_proto!("workspace"); + tonic::include_proto!("buffer"); } use tokio::sync::{mpsc, watch}; +use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt}; // TODO example used this? +use proto::buffer_server::{Buffer, BufferServer}; use proto::workspace_server::{Workspace, WorkspaceServer}; -use proto::{SessionRequest, SessionResponse}; +use proto::{Operation, SessionRequest, SessionResponse}; + +use tonic::Streaming; +//use futures::{Stream, StreamExt}; use crate::workspace::Workspace as WorkspaceInstance; // TODO fuck! -pub mod workspace; pub mod state; +pub mod workspace; + +type OperationStream = Pin> + Send>>; + +pub struct BufferService {} + +#[tonic::async_trait] +impl Buffer for BufferService { + // type ServerStreamingEchoStream = ResponseStream; + type AttachStream = OperationStream; + + async fn attach( + &self, + req: Request>, + ) -> Result, Status> { + println!("EchoServer::bidirectional_streaming_echo"); + + let mut in_stream = req.into_inner(); + let (tx_og, rx) = mpsc::channel(128); + + // this spawn here is required if you want to handle connection error. + // If we just map `in_stream` and write it back as `out_stream` the `out_stream` + // will be drooped when connection error occurs and error will never be propagated + // to mapped version of `in_stream`. + let tx = tx_og.clone(); + tokio::spawn(async move { + while let Some(result) = in_stream.next().await { + match result { + Ok(v) => tx + .send(Ok(Operation { + action: 1, + row: 0, + column: 0, + op_id: 0, + text: None, + })) + .await + .expect("working rx"), + Err(err) => { + // if let Some(io_err) = match_for_io_error(&err) { + // if io_err.kind() == ErrorKind::BrokenPipe { + // // here you can handle special case when client + // // disconnected in unexpected way + // eprintln!("\tclient disconnected: broken pipe"); + // break; + // } + // } + eprintln!("Error receiving operation from client"); + + match tx.send(Err(err)).await { + Ok(_) => (), + Err(_err) => break, // response was droped + } + } + } + } + println!("\tstream ended"); + }); + + // echo just write the same data that was received + let out_stream = ReceiverStream::new(rx); + + Ok(Response::new(Box::pin(out_stream) as Self::AttachStream)) + } +} #[derive(Debug)] pub struct WorkspaceService { tx: mpsc::Sender, - rx: watch::Receiver>> + rx: watch::Receiver>>, } #[tonic::async_trait] @@ -32,16 +102,16 @@ impl Workspace for WorkspaceService { println!("Got a request: {:?}", request); let r = request.into_inner(); - let w = WorkspaceInstance::new(r.session_key.clone(), r.content.unwrap_or("".to_string())); + // let w = WorkspaceInstance::new(r.session_key.clone(), r.content.unwrap_or("".to_string())); let reply = proto::SessionResponse { session_key: r.session_key.clone(), accepted: true, - content: Some(w.content.clone()), + content: None, // Some(w.content.clone()), hash: None, }; - self.tx.send(AlterState::ADD{key: r.session_key.clone(), w}).await.unwrap(); + // self.tx.send(AlterState::ADD{key: r.session_key.clone(), w}).await.unwrap(); Ok(Response::new(reply)) } @@ -93,7 +163,12 @@ impl Workspace for WorkspaceService { let mut removed = false; if self.rx.borrow().get(&r.session_key).is_some() { - self.tx.send(AlterState::REMOVE { key: r.session_key.clone() }).await.unwrap(); + self.tx + .send(AlterState::REMOVE { + key: r.session_key.clone(), + }) + .await + .unwrap(); removed = true; // TODO this is a lie! Verify it } @@ -106,7 +181,6 @@ impl Workspace for WorkspaceService { Ok(Response::new(reply)) } - } #[tokio::main] @@ -115,9 +189,11 @@ async fn main() -> Result<(), Box> { let (tx, rx) = state::run_state_manager(); let greeter = WorkspaceService { tx, rx }; + let processor = BufferService {}; Server::builder() .add_service(WorkspaceServer::new(greeter)) + .add_service(BufferServer::new(processor)) .serve(addr) .await?; diff --git a/src/server/workspace.rs b/src/server/workspace.rs index 8a92e5a..9aec82b 100644 --- a/src/server/workspace.rs +++ b/src/server/workspace.rs @@ -1,18 +1,65 @@ +use std::sync::Arc; + +use operational_transform::OperationSeq; +use tokio::sync::{broadcast, mpsc}; + +pub struct WorkspaceView { + pub rx: broadcast::Receiver, + pub tx: mpsc::Sender, +} + // Must be clonable, containing references to the actual state maybe? Or maybe give everyone an Arc, idk #[derive(Debug)] pub struct Workspace { pub name: String, pub content: String, + pub tx: mpsc::Sender, + w_tx: Arc>, } impl Workspace { - pub fn new(name: String, content: String) -> Self { - Workspace { name , content } + pub fn new( + name: String, + content: String, + tx: mpsc::Sender, + w_tx: Arc>, + ) -> Self { + Workspace { + name, + content, + tx, + w_tx, + } + } + + pub fn view(&self) -> WorkspaceView { + WorkspaceView { + rx: self.w_tx.subscribe(), + tx: self.tx.clone(), + } } } -impl Default for Workspace { - fn default() -> Self { - Workspace { name: "fuck you".to_string() , content: "too".to_string() } +pub async fn worker( + mut w: Workspace, + tx: Arc>, + mut rx: mpsc::Receiver, +) { + loop { + if let Some(op) = rx.recv().await { + w.content = op.apply(&w.content).unwrap(); + tx.send(op).unwrap(); + } else { + break; + } } } + +// impl Default for Workspace { +// fn default() -> Self { +// Workspace { +// name: "fuck you".to_string(), +// content: "too".to_string(), +// } +// } +// }