diff --git a/proto/buffer.proto b/proto/buffer.proto index 671b991..36c26ee 100644 --- a/proto/buffer.proto +++ b/proto/buffer.proto @@ -9,19 +9,22 @@ service Buffer { message RawOp { string opseq = 1; + string user = 2; } message OperationRequest { string path = 1; string hash = 2; string opseq = 3; + string user = 4; } message BufferPayload { - string path = 2; + string path = 1; + string user = 2; optional string content = 3; } message BufferResponse { - bool accepted = 3; + bool accepted = 1; } diff --git a/src/client/cli/main.rs b/src/client/cli/main.rs index 8bf722d..ad27332 100644 --- a/src/client/cli/main.rs +++ b/src/client/cli/main.rs @@ -1,5 +1,5 @@ use clap::Parser; -use library::proto::{buffer_client::BufferClient, BufferPayload}; +use codemp::proto::{buffer_client::BufferClient, BufferPayload}; use tokio_stream::StreamExt; #[derive(Parser, Debug)] diff --git a/src/server/buffer/actor.rs b/src/server/buffer/actor.rs index d5b1d26..0b67c07 100644 --- a/src/server/buffer/actor.rs +++ b/src/server/buffer/actor.rs @@ -1,3 +1,4 @@ +use codemp::proto::{RawOp, OperationRequest}; use tokio::sync::{mpsc, broadcast, watch}; use tracing::error; use md5::Digest; @@ -16,8 +17,8 @@ pub trait BufferStore { #[derive(Clone)] pub struct BufferHandle { - pub edit: mpsc::Sender, - events: broadcast::Sender, + pub edit: mpsc::Sender, + events: broadcast::Sender, pub digest: watch::Receiver, } @@ -47,7 +48,7 @@ impl BufferHandle { } } - pub fn subscribe(&self) -> broadcast::Receiver { + pub fn subscribe(&self) -> broadcast::Receiver { self.events.subscribe() } @@ -55,8 +56,8 @@ impl BufferHandle { struct BufferWorker { content: String, - edits: mpsc::Receiver, - events: broadcast::Sender, + edits: mpsc::Receiver, + events: broadcast::Sender, digest: watch::Sender, } @@ -66,11 +67,16 @@ impl BufferWorker { match self.edits.recv().await { None => break, Some(v) => { - match v.apply(&self.content) { + let op : OperationSeq = serde_json::from_str(&v.opseq).unwrap(); + match op.apply(&self.content) { Ok(res) => { self.content = res; self.digest.send(md5::compute(&self.content)).unwrap(); - if let Err(e) = self.events.send(v) { + let msg = RawOp { + opseq: v.opseq, + user: v.user + }; + if let Err(e) = self.events.send(msg) { error!("could not broadcast OpSeq: {}", e); } }, diff --git a/src/server/buffer/service.rs b/src/server/buffer/service.rs index 4e8e9cb..ba9adf9 100644 --- a/src/server/buffer/service.rs +++ b/src/server/buffer/service.rs @@ -1,12 +1,11 @@ use std::{pin::Pin, sync::{Arc, RwLock}, collections::HashMap}; -use operational_transform::OperationSeq; use tokio::sync::mpsc; use tonic::{Request, Response, Status}; use tokio_stream::{Stream, wrappers::ReceiverStream}; // TODO example used this? -use library::proto::{buffer_server::{Buffer, BufferServer}, RawOp, BufferPayload, BufferResponse, OperationRequest}; +use codemp::proto::{buffer_server::{Buffer, BufferServer}, RawOp, BufferPayload, BufferResponse, OperationRequest}; use tracing::info; use super::actor::{BufferHandle, BufferStore}; @@ -42,6 +41,7 @@ impl Buffer for BufferService { async fn attach(&self, req: Request) -> Result, Status> { let request = req.into_inner(); + let myself = request.user; match self.map.read().unwrap().get(&request.path) { Some(handle) => { let (tx, rx) = mpsc::channel(128); @@ -50,8 +50,8 @@ impl Buffer for BufferService { loop { match sub.recv().await { Ok(v) => { - let snd = RawOp { opseq: serde_json::to_string(&v).unwrap() }; - tx.send(Ok(snd)).await.unwrap(); + if v.user == myself { continue } + tx.send(Ok(v)).await.unwrap(); // TODO unnecessary channel? } Err(_e) => break, } @@ -69,16 +69,15 @@ impl Buffer for BufferService { let request = req.into_inner(); let tx = match self.map.read().unwrap().get(&request.path) { Some(handle) => { - if format!("{:x}", *handle.digest.borrow()) != request.hash { - return Ok(Response::new(BufferResponse { accepted : false } )); - } + // if format!("{:x}", *handle.digest.borrow()) != request.hash { + // return Ok(Response::new(BufferResponse { accepted : false } )); + // } handle.edit.clone() }, None => return Err(Status::not_found("path not found")), }; - let opseq : OperationSeq = serde_json::from_str(&request.opseq).unwrap(); - tx.send(opseq).await.unwrap(); - info!("sent edit to buffer"); + info!("sending edit to buffer: {}", request.opseq); + tx.send(request).await.unwrap(); Ok(Response::new(BufferResponse { accepted: true })) }