feat: add user to msgs, pass msgs directly

This commit is contained in:
əlemi 2023-04-10 20:24:11 +02:00
parent 14e9a1e86e
commit 9bf12b8bc3
4 changed files with 28 additions and 20 deletions

View file

@ -9,19 +9,22 @@ service Buffer {
message RawOp {
string opseq = 1;
string user = 2;
}
message OperationRequest {
string path = 1;
string hash = 2;
string opseq = 3;
string user = 4;
}
message BufferPayload {
string path = 2;
string path = 1;
string user = 2;
optional string content = 3;
}
message BufferResponse {
bool accepted = 3;
bool accepted = 1;
}

View file

@ -1,5 +1,5 @@
use clap::Parser;
use library::proto::{buffer_client::BufferClient, BufferPayload};
use codemp::proto::{buffer_client::BufferClient, BufferPayload};
use tokio_stream::StreamExt;
#[derive(Parser, Debug)]

View file

@ -1,3 +1,4 @@
use codemp::proto::{RawOp, OperationRequest};
use tokio::sync::{mpsc, broadcast, watch};
use tracing::error;
use md5::Digest;
@ -16,8 +17,8 @@ pub trait BufferStore<T> {
#[derive(Clone)]
pub struct BufferHandle {
pub edit: mpsc::Sender<OperationSeq>,
events: broadcast::Sender<OperationSeq>,
pub edit: mpsc::Sender<OperationRequest>,
events: broadcast::Sender<RawOp>,
pub digest: watch::Receiver<Digest>,
}
@ -47,7 +48,7 @@ impl BufferHandle {
}
}
pub fn subscribe(&self) -> broadcast::Receiver<OperationSeq> {
pub fn subscribe(&self) -> broadcast::Receiver<RawOp> {
self.events.subscribe()
}
@ -55,8 +56,8 @@ impl BufferHandle {
struct BufferWorker {
content: String,
edits: mpsc::Receiver<OperationSeq>,
events: broadcast::Sender<OperationSeq>,
edits: mpsc::Receiver<OperationRequest>,
events: broadcast::Sender<RawOp>,
digest: watch::Sender<Digest>,
}
@ -66,11 +67,16 @@ impl BufferWorker {
match self.edits.recv().await {
None => break,
Some(v) => {
match v.apply(&self.content) {
let op : OperationSeq = serde_json::from_str(&v.opseq).unwrap();
match op.apply(&self.content) {
Ok(res) => {
self.content = res;
self.digest.send(md5::compute(&self.content)).unwrap();
if let Err(e) = self.events.send(v) {
let msg = RawOp {
opseq: v.opseq,
user: v.user
};
if let Err(e) = self.events.send(msg) {
error!("could not broadcast OpSeq: {}", e);
}
},

View file

@ -1,12 +1,11 @@
use std::{pin::Pin, sync::{Arc, RwLock}, collections::HashMap};
use operational_transform::OperationSeq;
use tokio::sync::mpsc;
use tonic::{Request, Response, Status};
use tokio_stream::{Stream, wrappers::ReceiverStream}; // TODO example used this?
use library::proto::{buffer_server::{Buffer, BufferServer}, RawOp, BufferPayload, BufferResponse, OperationRequest};
use codemp::proto::{buffer_server::{Buffer, BufferServer}, RawOp, BufferPayload, BufferResponse, OperationRequest};
use tracing::info;
use super::actor::{BufferHandle, BufferStore};
@ -42,6 +41,7 @@ impl Buffer for BufferService {
async fn attach(&self, req: Request<BufferPayload>) -> Result<tonic::Response<OperationStream>, Status> {
let request = req.into_inner();
let myself = request.user;
match self.map.read().unwrap().get(&request.path) {
Some(handle) => {
let (tx, rx) = mpsc::channel(128);
@ -50,8 +50,8 @@ impl Buffer for BufferService {
loop {
match sub.recv().await {
Ok(v) => {
let snd = RawOp { opseq: serde_json::to_string(&v).unwrap() };
tx.send(Ok(snd)).await.unwrap();
if v.user == myself { continue }
tx.send(Ok(v)).await.unwrap(); // TODO unnecessary channel?
}
Err(_e) => break,
}
@ -69,16 +69,15 @@ impl Buffer for BufferService {
let request = req.into_inner();
let tx = match self.map.read().unwrap().get(&request.path) {
Some(handle) => {
if format!("{:x}", *handle.digest.borrow()) != request.hash {
return Ok(Response::new(BufferResponse { accepted : false } ));
}
// if format!("{:x}", *handle.digest.borrow()) != request.hash {
// return Ok(Response::new(BufferResponse { accepted : false } ));
// }
handle.edit.clone()
},
None => return Err(Status::not_found("path not found")),
};
let opseq : OperationSeq = serde_json::from_str(&request.opseq).unwrap();
tx.send(opseq).await.unwrap();
info!("sent edit to buffer");
info!("sending edit to buffer: {}", request.opseq);
tx.send(request).await.unwrap();
Ok(Response::new(BufferResponse { accepted: true }))
}