From 74faca0f25871fa2d2c4883c3211245223bc0476 Mon Sep 17 00:00:00 2001 From: alemi Date: Wed, 16 Aug 2023 17:09:21 +0200 Subject: [PATCH] chore: cleaned up server and lib after split --- server/src/buffer/service.rs | 72 ++++++++---------------------------- server/src/cursor/mod.rs | 1 + server/src/cursor/service.rs | 52 ++++++++++++++++++++++++++ server/src/main.rs | 7 +++- src/buffer/controller.rs | 46 +++++++++++------------ src/cursor/mod.rs | 16 ++++---- src/cursor/tracker.rs | 28 +++++++------- src/state.rs | 19 +++++----- 8 files changed, 128 insertions(+), 113 deletions(-) create mode 100644 server/src/cursor/mod.rs create mode 100644 server/src/cursor/service.rs diff --git a/server/src/buffer/service.rs b/server/src/buffer/service.rs index 5785eda..5745981 100644 --- a/server/src/buffer/service.rs +++ b/server/src/buffer/service.rs @@ -1,17 +1,16 @@ use std::{pin::Pin, sync::{Arc, RwLock}, collections::HashMap}; -use tokio::sync::{mpsc, broadcast, oneshot}; +use tokio::sync::{mpsc, oneshot}; use tonic::{Request, Response, Status}; use tokio_stream::{Stream, wrappers::ReceiverStream}; // TODO example used this? -use codemp::proto::{buffer_server::{Buffer, BufferServer}, RawOp, BufferPayload, BufferResponse, OperationRequest, Cursor}; +use codemp::proto::{buffer_server::Buffer, RawOp, BufferPayload, BufferResponse, OperationRequest, BufferEditResponse, BufferCreateResponse}; use tracing::info; use super::actor::{BufferHandle, BufferStore}; type OperationStream = Pin> + Send>>; -type CursorStream = Pin> + Send>>; struct BufferMap { store: HashMap, @@ -34,15 +33,12 @@ impl BufferStore for BufferMap { pub struct BufferService { map: Arc>, - cursor: broadcast::Sender, } -impl BufferService { - #[allow(unused)] - fn get_buffer(&self, path: &String) -> Result { - match self.map.read().unwrap().get(path) { - Some(buf) => Ok(buf.clone()), - None => Err(Status::not_found("no buffer for given path")), +impl Default for BufferService { + fn default() -> BufferService { + BufferService { + map: Arc::new(RwLock::new(HashMap::new().into())), } } } @@ -50,7 +46,6 @@ impl BufferService { #[tonic::async_trait] impl Buffer for BufferService { type AttachStream = OperationStream; - type ListenStream = CursorStream; async fn attach(&self, req: Request) -> Result, Status> { let request = req.into_inner(); @@ -73,29 +68,7 @@ impl Buffer for BufferService { } } - async fn listen(&self, req: Request) -> Result, Status> { - let mut sub = self.cursor.subscribe(); - let myself = req.into_inner().user; - let (tx, rx) = mpsc::channel(128); - tokio::spawn(async move { - while let Ok(v) = sub.recv().await { - if v.user == myself { continue } - tx.send(Ok(v)).await.unwrap(); // TODO unnecessary channel? - } - }); - let output_stream = ReceiverStream::new(rx); - info!("registered new subscriber to cursor updates"); - Ok(Response::new(Box::pin(output_stream))) - } - - async fn moved(&self, req:Request) -> Result, Status> { - match self.cursor.send(req.into_inner()) { - Ok(_) => Ok(Response::new(BufferResponse { accepted: true, content: None})), - Err(e) => Err(Status::internal(format!("could not broadcast cursor update: {}", e))), - } - } - - async fn edit(&self, req:Request) -> Result, Status> { + async fn edit(&self, req:Request) -> Result, Status> { let request = req.into_inner(); let tx = match self.map.read().unwrap().get(&request.path) { Some(handle) => { @@ -110,19 +83,20 @@ impl Buffer for BufferService { let (ack, status) = oneshot::channel(); match tx.send((ack, request)).await { Err(e) => Err(Status::internal(format!("error sending edit to buffer actor: {}", e))), - Ok(()) => Ok(Response::new(BufferResponse { - accepted: status.await.unwrap_or(false), - content: None - })) + Ok(()) => { + match status.await { + Ok(_accepted) => Ok(Response::new(BufferEditResponse { })), + Err(e) => Err(Status::internal(format!("error receiving edit result: {}", e))), + } + } } } - async fn create(&self, req:Request) -> Result, Status> { + async fn create(&self, req:Request) -> Result, Status> { let request = req.into_inner(); let _handle = self.map.write().unwrap().handle(request.path, request.content); info!("created new buffer"); - let answ = BufferResponse { accepted: true, content: None }; - Ok(Response::new(answ)) + Ok(Response::new(BufferCreateResponse { })) } async fn sync(&self, req: Request) -> Result, Status> { @@ -131,23 +105,9 @@ impl Buffer for BufferService { None => Err(Status::not_found("requested buffer does not exist")), Some(buf) => { info!("synching buffer"); - let answ = BufferResponse { accepted: true, content: Some(buf.content.borrow().clone()) }; + let answ = BufferResponse { content: buf.content.borrow().clone() }; Ok(Response::new(answ)) } } } } - -impl BufferService { - pub fn new() -> BufferService { - let (cur_tx, _cur_rx) = broadcast::channel(64); // TODO hardcoded capacity - BufferService { - map: Arc::new(RwLock::new(HashMap::new().into())), - cursor: cur_tx, - } - } - - pub fn server(self) -> BufferServer { - BufferServer::new(self) - } -} diff --git a/server/src/cursor/mod.rs b/server/src/cursor/mod.rs new file mode 100644 index 0000000..1f278a4 --- /dev/null +++ b/server/src/cursor/mod.rs @@ -0,0 +1 @@ +pub mod service; diff --git a/server/src/cursor/service.rs b/server/src/cursor/service.rs new file mode 100644 index 0000000..b641c95 --- /dev/null +++ b/server/src/cursor/service.rs @@ -0,0 +1,52 @@ +use std::pin::Pin; + +use tokio::sync::{mpsc, broadcast}; +use tonic::{Request, Response, Status}; + +use tokio_stream::{Stream, wrappers::ReceiverStream}; // TODO example used this? + +use codemp::proto::{cursor_server::Cursor, UserIdentity, CursorPosition, MovedResponse}; +use tracing::info; + +type CursorStream = Pin> + Send>>; + +pub struct CursorService { + cursor: broadcast::Sender, +} + +#[tonic::async_trait] +impl Cursor for CursorService { + type ListenStream = CursorStream; + + async fn listen(&self, req: Request) -> Result, Status> { + let mut sub = self.cursor.subscribe(); + let myself = req.into_inner().id; + let (tx, rx) = mpsc::channel(128); + tokio::spawn(async move { + while let Ok(v) = sub.recv().await { + if v.user == myself { continue } + tx.send(Ok(v)).await.unwrap(); // TODO unnecessary channel? + } + }); + let output_stream = ReceiverStream::new(rx); + info!("registered new subscriber to cursor updates"); + Ok(Response::new(Box::pin(output_stream))) + } + + async fn moved(&self, req:Request) -> Result, Status> { + match self.cursor.send(req.into_inner()) { + Ok(_) => Ok(Response::new(MovedResponse { })), + Err(e) => Err(Status::internal(format!("could not broadcast cursor update: {}", e))), + } + } +} + +impl Default for CursorService { + fn default() -> Self { + let (cur_tx, _cur_rx) = broadcast::channel(64); // TODO hardcoded capacity + // TODO don't drop receiver because sending event when there are no receivers throws an error + CursorService { + cursor: cur_tx, + } + } +} diff --git a/server/src/main.rs b/server/src/main.rs index 13fe906..ec951e1 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -5,12 +5,16 @@ //! use clap::Parser; +use codemp::proto::buffer_server::BufferServer; +use codemp::proto::cursor_server::CursorServer; use tracing::info; use tonic::transport::Server; mod buffer; +mod cursor; use crate::buffer::service::BufferService; +use crate::cursor::service::CursorService; #[derive(Parser, Debug)] struct CliArgs { @@ -37,7 +41,8 @@ async fn main() -> Result<(), Box> { info!("binding on {}", args.host); Server::builder() - .add_service(BufferService::new().server()) + .add_service(BufferServer::new(BufferService::default())) + .add_service(CursorServer::new(CursorService::default())) .serve(args.host.parse()?) .await?; diff --git a/src/buffer/controller.rs b/src/buffer/controller.rs index 9be39f4..9e03318 100644 --- a/src/buffer/controller.rs +++ b/src/buffer/controller.rs @@ -4,7 +4,6 @@ use uuid::Uuid; use crate::{ ControllerWorker, - cursor::tracker::{CursorTracker, CursorTrackerWorker}, buffer::handle::{BufferHandle, OperationControllerEditor, OperationControllerWorker}, proto::{buffer_client::BufferClient, BufferPayload, RawOp, OperationRequest}, }; @@ -28,39 +27,39 @@ impl BufferController { pub fn id(&self) -> &str { &self.id } - pub async fn create(&mut self, path: &str, content: Option<&str>) -> Result { + pub async fn create(&mut self, path: &str, content: Option<&str>) -> Result<(), Status> { let req = BufferPayload { path: path.to_string(), content: content.map(|x| x.to_string()), user: self.id.clone(), }; - let res = self.client.create(req).await?; + self.client.create(req).await?; - Ok(res.into_inner().accepted) + Ok(()) } - pub async fn listen(&mut self) -> Result { - let req = BufferPayload { - path: "".into(), - content: None, - user: self.id.clone(), - }; + // pub async fn listen(&mut self) -> Result { + // let req = BufferPayload { + // path: "".into(), + // content: None, + // user: self.id.clone(), + // }; - let stream = self.client.listen(req).await?.into_inner(); + // let stream = self.client.listen(req).await?.into_inner(); - let controller = CursorTrackerWorker::new(self.id().to_string()); - let handle = controller.subscribe(); - let client = self.client.clone(); + // let controller = CursorTrackerWorker::new(self.id().to_string()); + // let handle = controller.subscribe(); + // let client = self.client.clone(); - tokio::spawn(async move { - tracing::debug!("cursor worker started"); - controller.work(stream, client).await; - tracing::debug!("cursor worker stopped"); - }); + // tokio::spawn(async move { + // tracing::debug!("cursor worker started"); + // controller.work(stream, client).await; + // tracing::debug!("cursor worker stopped"); + // }); - Ok(handle) - } + // Ok(handle) + // } pub async fn attach(&mut self, path: &str) -> Result { let req = BufferPayload { @@ -72,8 +71,7 @@ impl BufferController { let content = self.client.sync(req.clone()) .await? .into_inner() - .content - .unwrap_or("".into()); + .content; let stream = self.client.attach(req).await?.into_inner(); @@ -100,7 +98,7 @@ impl OperationControllerEditor for (BufferController, Streaming) { user: self.0.id().to_string(), }; match self.0.client.edit(req).await { - Ok(res) => res.into_inner().accepted, + Ok(_) => true, Err(e) => { tracing::error!("error sending edit: {}", e); false diff --git a/src/cursor/mod.rs b/src/cursor/mod.rs index 134f2a7..58a0abe 100644 --- a/src/cursor/mod.rs +++ b/src/cursor/mod.rs @@ -1,25 +1,25 @@ pub mod tracker; -use crate::proto::{Position, Cursor}; +use crate::proto::{RowColumn, CursorPosition}; -impl From:: for (i32, i32) { - fn from(pos: Position) -> (i32, i32) { +impl From:: for (i32, i32) { + fn from(pos: RowColumn) -> (i32, i32) { (pos.row, pos.col) } } -impl From::<(i32, i32)> for Position { +impl From::<(i32, i32)> for RowColumn { fn from((row, col): (i32, i32)) -> Self { - Position { row, col } + RowColumn { row, col } } } -impl Cursor { - pub fn start(&self) -> Position { +impl CursorPosition { + pub fn start(&self) -> RowColumn { self.start.clone().unwrap_or((0, 0).into()) } - pub fn end(&self) -> Position { + pub fn end(&self) -> RowColumn { self.end.clone().unwrap_or((0, 0).into()) } } diff --git a/src/cursor/tracker.rs b/src/cursor/tracker.rs index 67149fb..7fde457 100644 --- a/src/cursor/tracker.rs +++ b/src/cursor/tracker.rs @@ -3,17 +3,17 @@ use std::sync::Arc; use tokio::sync::{mpsc, broadcast::{self, error::RecvError}, Mutex}; use tonic::{Streaming, transport::Channel}; -use crate::{proto::{Position, Cursor, buffer_client::BufferClient}, errors::IgnorableError, CodempError}; +use crate::{proto::{RowColumn, CursorPosition, buffer_client::BufferClient}, errors::IgnorableError, CodempError}; pub struct CursorTracker { uid: String, - op: mpsc::Sender, - stream: Mutex>, + op: mpsc::Sender, + stream: Mutex>, } impl CursorTracker { - pub async fn moved(&self, path: &str, start: Position, end: Position) -> Result<(), CodempError> { - Ok(self.op.send(Cursor { + pub async fn moved(&self, path: &str, start: RowColumn, end: RowColumn) -> Result<(), CodempError> { + Ok(self.op.send(CursorPosition { user: self.uid.clone(), buffer: path.to_string(), start: start.into(), @@ -23,7 +23,7 @@ impl CursorTracker { // TODO is this cancelable? so it can be used in tokio::select! // TODO is the result type overkill? should be an option? - pub async fn recv(&self) -> Result { + pub async fn recv(&self) -> Result { let mut stream = self.stream.lock().await; match stream.recv().await { Ok(x) => Ok(x), @@ -35,7 +35,7 @@ impl CursorTracker { } } - // fn try_poll(&self) -> Option> { + // fn try_poll(&self) -> Option> { // match self.stream.try_lock() { // Err(_) => None, // Ok(mut x) => match x.try_recv() { @@ -51,14 +51,14 @@ impl CursorTracker { // } } -pub(crate) struct CursorTrackerWorker { +pub(crate) struct CursorPositionTrackerWorker { uid: String, - producer: mpsc::Sender, - op: mpsc::Receiver, - channel: Arc>, + producer: mpsc::Sender, + op: mpsc::Receiver, + channel: Arc>, } -impl CursorTrackerWorker { +impl CursorPositionTrackerWorker { pub(crate) fn new(uid: String) -> Self { let (op_tx, op_rx) = mpsc::channel(64); let (cur_tx, _cur_rx) = broadcast::channel(64); @@ -79,11 +79,11 @@ impl CursorTrackerWorker { } // TODO is it possible to avoid passing directly tonic Streaming and proto BufferClient ? - pub(crate) async fn work(mut self, mut rx: Streaming, mut tx: BufferClient) { + pub(crate) async fn work(mut self, mut rx: Streaming, mut tx: BufferClient) { loop { tokio::select!{ Ok(Some(cur)) = rx.message() => self.channel.send(cur).unwrap_or_warn("could not broadcast event"), - Some(op) = self.op.recv() => { tx.moved(op).await.unwrap_or_warn("could not update cursor"); }, + Some(op) = self.op.recv() => { todo!() } // tx.moved(op).await.unwrap_or_warn("could not update cursor"); }, else => break, } } diff --git a/src/state.rs b/src/state.rs index abb78d5..5755896 100644 --- a/src/state.rs +++ b/src/state.rs @@ -4,7 +4,6 @@ use tokio::sync::RwLock; use crate::{ buffer::{controller::BufferController, handle::BufferHandle}, - cursor::tracker::CursorTracker, errors::CodempError, }; @@ -25,35 +24,35 @@ pub mod instance { } pub struct Workspace { - client: BufferController, buffers: RwLock, Arc>>, - cursor: Arc, + // cursor: Arc, + client: BufferController, } impl Workspace { pub async fn new(dest: &str) -> Result { - let mut client = BufferController::new(dest).await?; - let cursor = Arc::new(client.listen().await?); + let client = BufferController::new(dest).await?; + // let cursor = Arc::new(client.listen().await?); Ok( Workspace { buffers: RwLock::new(BTreeMap::new()), - cursor, + // cursor, client, } ) } // Cursor - pub async fn cursor(&self) -> Arc { - self.cursor.clone() - } + // pub async fn cursor(&self) -> Arc { + // self.cursor.clone() + // } // Buffer pub async fn buffer(&self, path: &str) -> Option> { self.buffers.read().await.get(path).cloned() } - pub async fn create(&self, path: &str, content: Option<&str>) -> Result { + pub async fn create(&self, path: &str, content: Option<&str>) -> Result<(), CodempError> { Ok(self.client.clone().create(path, content).await?) }