fix: better api with TextChange, ugly fixes

it's pretty ugly but it kinda works? really need to do this better
This commit is contained in:
əlemi 2023-11-16 06:52:25 +01:00
parent 8267e1b890
commit ca29ee3e0f
3 changed files with 96 additions and 39 deletions

View file

@ -5,7 +5,7 @@
use std::sync::Arc;
use tokio::sync::{watch, mpsc};
use tokio::sync::{watch, mpsc, Mutex, RwLock, TryLockError};
use tonic::async_trait;
use crate::errors::IgnorableError;
@ -32,6 +32,7 @@ use super::TextChange;
#[derive(Debug, Clone)]
pub struct BufferController {
content: watch::Receiver<String>,
seen: Arc<RwLock<String>>,
operations: mpsc::UnboundedSender<TextChange>,
_stop: Arc<StopOnDrop>, // just exist
}
@ -42,7 +43,11 @@ impl BufferController {
operations: mpsc::UnboundedSender<TextChange>,
stop: mpsc::UnboundedSender<()>,
) -> Self {
BufferController { content, operations, _stop: Arc::new(StopOnDrop(stop)) }
BufferController {
content, operations,
_stop: Arc::new(StopOnDrop(stop)),
seen: Arc::new(RwLock::new("".into())),
}
}
}
@ -56,19 +61,56 @@ impl Drop for StopOnDrop {
}
#[async_trait]
impl Controller<String> for BufferController {
impl Controller<TextChange> for BufferController {
type Input = TextChange;
async fn poll(&self) -> Result<(), Error> {
Ok(self.content.clone().changed().await?)
let mut poller = self.content.clone();
loop {
poller.changed().await?;
let seen = self.seen.read().await.clone();
if *poller.borrow() != seen {
break
}
}
Ok(())
}
fn try_recv(&self) -> Result<Option<String>, Error> {
Ok(Some(self.content.borrow().clone()))
fn try_recv(&self) -> Result<Option<TextChange>, Error> {
let cur = match self.seen.try_read() {
Err(e) => {
tracing::error!("try_recv invoked while being mutated: {}", e);
return Ok(None);
},
Ok(x) => x.clone(),
};
if *self.content.borrow() != cur {
match self.seen.try_write() {
Err(e) => {
tracing::error!("try_recv mutating while being mutated: {}", e);
return Ok(None);
},
Ok(mut w) => {
*w = self.content.borrow().clone();
// TODO it's not the whole buffer that changed
return Ok(Some(TextChange {
span: 0..cur.len(),
content: self.content.borrow().clone(),
after: "".to_string(),
}));
}
}
}
return Ok(None);
}
async fn recv(&self) -> Result<String, Error> {
Ok(self.content.borrow().clone())
async fn recv(&self) -> Result<TextChange, Error> {
self.poll().await?;
match self.try_recv()? {
Some(x) => Ok(x),
None => Err(crate::Error::Filler { message: "wtfff".into() }),
}
}
/// enqueue an opseq for processing

View file

@ -8,6 +8,7 @@ use tonic::{async_trait, Streaming};
use woot::crdt::{Op, CRDT, TextEditor};
use woot::woot::Woot;
use crate::errors::IgnorableError;
use crate::proto::{OperationRequest, RawOp};
use crate::proto::buffer_client::BufferClient;
use crate::api::controller::ControllerWorker;
@ -29,8 +30,8 @@ pub(crate) struct BufferControllerWorker {
}
impl BufferControllerWorker {
pub fn new(uid: String, buffer: &str, path: &str) -> Self {
let (txt_tx, txt_rx) = watch::channel(buffer.to_string());
pub fn new(uid: String, path: &str) -> Self {
let (txt_tx, txt_rx) = watch::channel("".to_string());
let (op_tx, op_rx) = mpsc::unbounded_channel();
let (end_tx, end_rx) = mpsc::unbounded_channel();
let mut hasher = DefaultHasher::new();
@ -42,7 +43,7 @@ impl BufferControllerWorker {
operations: op_rx,
receiver: txt_rx,
sender: op_tx,
buffer: Woot::new(site_id, buffer), // TODO initialize with buffer!
buffer: Woot::new(site_id, ""), // TODO initialize with buffer!
path: path.to_string(),
stop: end_rx,
stop_control: end_tx,
@ -64,7 +65,7 @@ impl BufferControllerWorker {
}
#[async_trait]
impl ControllerWorker<String> for BufferControllerWorker {
impl ControllerWorker<TextChange> for BufferControllerWorker {
type Controller = BufferController;
type Tx = BufferClient<Channel>;
type Rx = Streaming<RawOp>;
@ -90,29 +91,43 @@ impl ControllerWorker<String> for BufferControllerWorker {
res = self.operations.recv() => match res {
None => break,
Some(change) => {
let span = &self.buffer.view()[change.span.clone()];
let diff = TextDiff::from_chars(span, &change.content);
match self.buffer.view().get(change.span.clone()) {
None => tracing::error!("received illegal span from client"),
Some(span) => {
let diff = TextDiff::from_chars(span, &change.content);
let mut i = 0;
let mut ops = Vec::new();
for diff in diff.iter_all_changes() {
match diff.tag() {
ChangeTag::Equal => i += 1,
ChangeTag::Delete => ops.push(self.buffer.delete(change.span.start + i).unwrap()),
ChangeTag::Insert => {
for c in diff.value().chars() {
ops.push(self.buffer.insert(change.span.start + i, c).unwrap());
i += 1;
let mut i = 0;
let mut ops = Vec::new();
for diff in diff.iter_all_changes() {
match diff.tag() {
ChangeTag::Equal => i += 1,
ChangeTag::Delete => match self.buffer.delete(change.span.start + i) {
Ok(op) => ops.push(op),
Err(e) => tracing::error!("could not apply deletion: {}", e),
},
ChangeTag::Insert => {
for c in diff.value().chars() {
match self.buffer.insert(change.span.start + i, c) {
Ok(op) => {
ops.push(op);
i += 1;
},
Err(e) => tracing::error!("could not apply insertion: {}", e),
}
}
},
}
},
}
}
}
for op in ops {
match self.send_op(&mut tx, &op).await {
Ok(()) => self.buffer.merge(op),
Err(e) => tracing::error!("server refused to broadcast {}: {}", op, e),
}
for op in ops {
match self.send_op(&mut tx, &op).await {
Err(e) => tracing::error!("server refused to broadcast {}: {}", op, e),
Ok(()) => {
self.content.send(self.buffer.view()).unwrap_or_warn("could not send buffer update");
},
}
}
},
}
}
},
@ -121,10 +136,12 @@ impl ControllerWorker<String> for BufferControllerWorker {
res = rx.message() => match res {
Err(_e) => break,
Ok(None) => break,
Ok(Some(change)) => {
let op : Op = serde_json::from_str(&change.opseq).unwrap();
self.buffer.merge(op);
self.content.send(self.buffer.view()).unwrap();
Ok(Some(change)) => match serde_json::from_str::<Op>(&change.opseq) {
Ok(op) => {
self.buffer.merge(op);
self.content.send(self.buffer.view()).unwrap_or_warn("could not send buffer update");
},
Err(e) => tracing::error!("could not deserialize operation from server: {}", e),
},
},
}

View file

@ -132,11 +132,9 @@ impl Client {
path: path.to_string(), user: self.id.clone(), content: None
};
let content = client.sync(req.clone()).await?.into_inner().content;
let stream = client.attach(req).await?.into_inner();
let controller = BufferControllerWorker::new(self.id.clone(), &content, path);
let controller = BufferControllerWorker::new(self.id.clone(), path);
let handler = Arc::new(controller.subscribe());
let _path = path.to_string();