2023-11-10 04:29:50 +01:00
|
|
|
use std::collections::hash_map::DefaultHasher;
|
|
|
|
use std::hash::{Hash, Hasher};
|
|
|
|
|
2023-11-09 05:21:24 +01:00
|
|
|
use similar::{TextDiff, ChangeTag};
|
|
|
|
use tokio::sync::{watch, mpsc};
|
2023-08-16 18:58:42 +02:00
|
|
|
use tonic::transport::Channel;
|
|
|
|
use tonic::{async_trait, Streaming};
|
2023-11-09 05:21:24 +01:00
|
|
|
use woot::crdt::{Op, CRDT, TextEditor};
|
|
|
|
use woot::woot::Woot;
|
2023-08-11 15:33:40 +02:00
|
|
|
|
2023-11-16 06:52:25 +01:00
|
|
|
use crate::errors::IgnorableError;
|
2023-08-16 18:58:42 +02:00
|
|
|
use crate::proto::{OperationRequest, RawOp};
|
|
|
|
use crate::proto::buffer_client::BufferClient;
|
2023-09-10 03:00:47 +02:00
|
|
|
use crate::api::controller::ControllerWorker;
|
2023-11-17 05:45:31 +01:00
|
|
|
use crate::api::TextChange;
|
2023-08-11 15:33:40 +02:00
|
|
|
|
2023-08-16 23:09:47 +02:00
|
|
|
use super::controller::BufferController;
|
2023-08-11 15:33:40 +02:00
|
|
|
|
|
|
|
|
2023-08-16 23:09:47 +02:00
|
|
|
pub(crate) struct BufferControllerWorker {
|
2023-08-16 18:58:42 +02:00
|
|
|
uid: String,
|
2023-09-05 20:13:09 +02:00
|
|
|
content: watch::Sender<String>,
|
2023-11-09 05:21:24 +01:00
|
|
|
operations: mpsc::UnboundedReceiver<TextChange>,
|
2023-08-11 15:33:40 +02:00
|
|
|
receiver: watch::Receiver<String>,
|
2023-11-09 05:21:24 +01:00
|
|
|
sender: mpsc::UnboundedSender<TextChange>,
|
|
|
|
buffer: Woot,
|
2023-08-11 15:33:40 +02:00
|
|
|
path: String,
|
2023-08-19 04:02:21 +02:00
|
|
|
stop: mpsc::UnboundedReceiver<()>,
|
|
|
|
stop_control: mpsc::UnboundedSender<()>,
|
2023-08-11 15:33:40 +02:00
|
|
|
}
|
|
|
|
|
2023-08-16 23:09:47 +02:00
|
|
|
impl BufferControllerWorker {
|
2023-11-16 06:52:25 +01:00
|
|
|
pub fn new(uid: String, path: &str) -> Self {
|
|
|
|
let (txt_tx, txt_rx) = watch::channel("".to_string());
|
2023-08-19 21:44:27 +02:00
|
|
|
let (op_tx, op_rx) = mpsc::unbounded_channel();
|
2023-08-19 04:02:21 +02:00
|
|
|
let (end_tx, end_rx) = mpsc::unbounded_channel();
|
2023-11-10 04:29:50 +01:00
|
|
|
let mut hasher = DefaultHasher::new();
|
|
|
|
uid.hash(&mut hasher);
|
|
|
|
let site_id = hasher.finish() as usize;
|
2023-08-16 23:09:47 +02:00
|
|
|
BufferControllerWorker {
|
2023-08-16 18:58:42 +02:00
|
|
|
uid,
|
|
|
|
content: txt_tx,
|
|
|
|
operations: op_rx,
|
|
|
|
receiver: txt_rx,
|
|
|
|
sender: op_tx,
|
2023-11-17 03:15:13 +01:00
|
|
|
buffer: Woot::new(site_id % (2<<10), ""), // TODO remove the modulo, only for debugging!
|
2023-08-16 18:58:42 +02:00
|
|
|
path: path.to_string(),
|
2023-08-19 04:02:21 +02:00
|
|
|
stop: end_rx,
|
|
|
|
stop_control: end_tx,
|
2023-08-16 18:58:42 +02:00
|
|
|
}
|
|
|
|
}
|
2023-09-10 03:01:37 +02:00
|
|
|
|
2023-11-09 05:21:24 +01:00
|
|
|
async fn send_op(&self, tx: &mut BufferClient<Channel>, outbound: &Op) -> crate::Result<()> {
|
2023-09-10 03:01:37 +02:00
|
|
|
let opseq = serde_json::to_string(outbound).expect("could not serialize opseq");
|
|
|
|
let req = OperationRequest {
|
|
|
|
path: self.path.clone(),
|
2023-11-09 05:21:24 +01:00
|
|
|
hash: format!("{:x}", md5::compute(self.buffer.view())),
|
2023-09-10 03:01:37 +02:00
|
|
|
op: Some(RawOp {
|
|
|
|
opseq, user: self.uid.clone(),
|
|
|
|
}),
|
|
|
|
};
|
|
|
|
let _ = tx.edit(req).await?;
|
|
|
|
Ok(())
|
|
|
|
}
|
2023-08-16 18:58:42 +02:00
|
|
|
}
|
|
|
|
|
2023-08-11 15:33:40 +02:00
|
|
|
#[async_trait]
|
2023-11-16 06:52:25 +01:00
|
|
|
impl ControllerWorker<TextChange> for BufferControllerWorker {
|
2023-08-16 23:09:47 +02:00
|
|
|
type Controller = BufferController;
|
2023-08-16 18:58:42 +02:00
|
|
|
type Tx = BufferClient<Channel>;
|
|
|
|
type Rx = Streaming<RawOp>;
|
|
|
|
|
2023-08-16 23:09:47 +02:00
|
|
|
fn subscribe(&self) -> BufferController {
|
2023-08-17 00:04:37 +02:00
|
|
|
BufferController::new(
|
|
|
|
self.receiver.clone(),
|
|
|
|
self.sender.clone(),
|
2023-08-19 04:02:21 +02:00
|
|
|
self.stop_control.clone(),
|
2023-08-17 00:04:37 +02:00
|
|
|
)
|
2023-08-11 15:33:40 +02:00
|
|
|
}
|
|
|
|
|
2023-08-16 18:58:42 +02:00
|
|
|
async fn work(mut self, mut tx: Self::Tx, mut rx: Self::Rx) {
|
2023-08-11 15:33:40 +02:00
|
|
|
loop {
|
2023-09-05 20:13:09 +02:00
|
|
|
// block until one of these is ready
|
2023-09-04 03:09:32 +02:00
|
|
|
tokio::select! {
|
2023-09-10 03:01:37 +02:00
|
|
|
biased;
|
|
|
|
|
2023-11-09 05:21:24 +01:00
|
|
|
// received stop signal
|
|
|
|
_ = self.stop.recv() => break,
|
|
|
|
|
|
|
|
// received a text change from editor
|
|
|
|
res = self.operations.recv() => match res {
|
|
|
|
None => break,
|
|
|
|
Some(change) => {
|
2023-11-16 06:52:25 +01:00
|
|
|
match self.buffer.view().get(change.span.clone()) {
|
|
|
|
None => tracing::error!("received illegal span from client"),
|
|
|
|
Some(span) => {
|
|
|
|
let diff = TextDiff::from_chars(span, &change.content);
|
|
|
|
|
|
|
|
let mut i = 0;
|
|
|
|
let mut ops = Vec::new();
|
|
|
|
for diff in diff.iter_all_changes() {
|
|
|
|
match diff.tag() {
|
|
|
|
ChangeTag::Equal => i += 1,
|
|
|
|
ChangeTag::Delete => match self.buffer.delete(change.span.start + i) {
|
|
|
|
Ok(op) => ops.push(op),
|
|
|
|
Err(e) => tracing::error!("could not apply deletion: {}", e),
|
|
|
|
},
|
|
|
|
ChangeTag::Insert => {
|
|
|
|
for c in diff.value().chars() {
|
|
|
|
match self.buffer.insert(change.span.start + i, c) {
|
|
|
|
Ok(op) => {
|
|
|
|
ops.push(op);
|
|
|
|
i += 1;
|
|
|
|
},
|
|
|
|
Err(e) => tracing::error!("could not apply insertion: {}", e),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
},
|
2023-11-09 05:21:24 +01:00
|
|
|
}
|
2023-11-16 06:52:25 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
for op in ops {
|
|
|
|
match self.send_op(&mut tx, &op).await {
|
|
|
|
Err(e) => tracing::error!("server refused to broadcast {}: {}", op, e),
|
|
|
|
Ok(()) => {
|
2023-11-17 03:15:13 +01:00
|
|
|
// self.content.send(self.buffer.view()).unwrap_or_warn("could not send buffer update");
|
2023-11-16 06:52:25 +01:00
|
|
|
},
|
|
|
|
}
|
|
|
|
}
|
|
|
|
},
|
2023-09-05 20:13:09 +02:00
|
|
|
}
|
2023-09-04 03:09:32 +02:00
|
|
|
}
|
2023-08-11 15:33:40 +02:00
|
|
|
},
|
2023-09-10 03:01:37 +02:00
|
|
|
|
2023-11-09 05:21:24 +01:00
|
|
|
// received a stop request (or channel got closed)
|
|
|
|
res = rx.message() => match res {
|
|
|
|
Err(_e) => break,
|
|
|
|
Ok(None) => break,
|
2023-11-16 06:52:25 +01:00
|
|
|
Ok(Some(change)) => match serde_json::from_str::<Op>(&change.opseq) {
|
|
|
|
Ok(op) => {
|
|
|
|
self.buffer.merge(op);
|
|
|
|
self.content.send(self.buffer.view()).unwrap_or_warn("could not send buffer update");
|
|
|
|
},
|
|
|
|
Err(e) => tracing::error!("could not deserialize operation from server: {}", e),
|
2023-11-09 05:21:24 +01:00
|
|
|
},
|
2023-09-10 03:01:37 +02:00
|
|
|
},
|
2023-08-11 15:33:40 +02:00
|
|
|
}
|
2023-11-09 05:21:24 +01:00
|
|
|
|
2023-08-11 15:33:40 +02:00
|
|
|
}
|
|
|
|
}
|
2023-08-16 18:58:42 +02:00
|
|
|
}
|