merge: branch 'betterlib' into dev

move features and tools into codemp library, trying to decouple stuff
from nvim. also implemented a proper outgoing queue and operation
composing, making connections more reliable
This commit is contained in:
əlemi 2023-04-19 04:19:59 +02:00
commit 7245674ddf
9 changed files with 427 additions and 459 deletions

View file

@ -49,13 +49,13 @@ local function hook_callbacks(path, buffer)
{ {
callback = function(args) callback = function(args)
local cursor = vim.api.nvim_win_get_cursor(0) local cursor = vim.api.nvim_win_get_cursor(0)
pcall(M.cursor, path, cursor[1], cursor[2]) -- TODO log errors
if cursor[1] == last_line then if cursor[1] == last_line then
return return
end end
last_line = cursor[1] last_line = cursor[1]
local lines = vim.api.nvim_buf_get_lines(args.buf, 0, -1, false) local lines = vim.api.nvim_buf_get_lines(args.buf, 0, -1, false)
pcall(M.replace, path, vim.fn.join(lines, "\n")) -- TODO log errors pcall(M.replace, path, vim.fn.join(lines, "\n")) -- TODO log errors
pcall(M.cursor, path, cursor[1], cursor[2]) -- TODO log errors
end, end,
buffer = buffer, buffer = buffer,
group = codemp_autocmds, group = codemp_autocmds,
@ -111,6 +111,7 @@ vim.api.nvim_create_user_command('Connect',
-- print(vim.fn.join(data, "\n")) -- print(vim.fn.join(data, "\n"))
end, end,
stderr_buffered = false, stderr_buffered = false,
env = { RUST_BACKTRACE = 1 }
} }
) )
if M.jobid <= 0 then if M.jobid <= 0 then

View file

@ -1,6 +1,8 @@
use std::{net::TcpStream, sync::Mutex}; use std::sync::Arc;
use std::{net::TcpStream, sync::Mutex, collections::BTreeMap};
use codemp::client::CodempClient; use codemp::operation::{OperationController, OperationFactory};
use codemp::{client::CodempClient, operation::OperationProcessor};
use codemp::proto::buffer_client::BufferClient; use codemp::proto::buffer_client::BufferClient;
use rmpv::Value; use rmpv::Value;
@ -9,12 +11,12 @@ use tokio::io::Stdout;
use clap::Parser; use clap::Parser;
use nvim_rs::{compat::tokio::Compat, create::tokio as create, Handler, Neovim}; use nvim_rs::{compat::tokio::Compat, create::tokio as create, Handler, Neovim};
use tonic::async_trait;
use tracing::{error, warn, debug, info}; use tracing::{error, warn, debug, info};
#[derive(Clone)] #[derive(Clone)]
struct NeovimHandler { struct NeovimHandler {
client: CodempClient, client: CodempClient,
factories: Arc<Mutex<BTreeMap<String, Arc<OperationController>>>>,
} }
fn nullable_optional_str(args: &Vec<Value>, index: usize) -> Option<String> { fn nullable_optional_str(args: &Vec<Value>, index: usize) -> Option<String> {
@ -33,7 +35,13 @@ fn default_zero_number(args: &Vec<Value>, index: usize) -> i64 {
nullable_optional_number(args, index).unwrap_or(0) nullable_optional_number(args, index).unwrap_or(0)
} }
#[async_trait] impl NeovimHandler {
fn buffer_controller(&self, path: &String) -> Option<Arc<OperationController>> {
Some(self.factories.lock().unwrap().get(path)?.clone())
}
}
#[tonic::async_trait]
impl Handler for NeovimHandler { impl Handler for NeovimHandler {
type Writer = Compat<Stdout>; type Writer = Compat<Stdout>;
@ -69,16 +77,18 @@ impl Handler for NeovimHandler {
} }
let path = default_empty_str(&args, 0); let path = default_empty_str(&args, 0);
let txt = default_empty_str(&args, 1); let txt = default_empty_str(&args, 1);
let pos = default_zero_number(&args, 2) as u64; let mut pos = default_zero_number(&args, 2) as i64;
let mut c = self.client.clone();
match c.insert(path, txt, pos).await { if pos <= 0 { pos = 0 } // TODO wtf vim??
Ok(res) => {
match res { match self.buffer_controller(&path) {
true => Ok(Value::Nil), None => Err(Value::from("no controller for given path")),
false => Err(Value::from("rejected")), Some(controller) => {
match controller.apply(controller.insert(&txt, pos as u64)).await {
Err(e) => Err(Value::from(format!("could not send insert: {}", e))),
Ok(_res) => Ok(Value::Nil),
} }
}, }
Err(e) => Err(Value::from(format!("could not send insert: {}", e))),
} }
}, },
@ -90,13 +100,12 @@ impl Handler for NeovimHandler {
let pos = default_zero_number(&args, 1) as u64; let pos = default_zero_number(&args, 1) as u64;
let count = default_zero_number(&args, 2) as u64; let count = default_zero_number(&args, 2) as u64;
let mut c = self.client.clone(); match self.buffer_controller(&path) {
match c.delete(path, pos, count).await { None => Err(Value::from("no controller for given path")),
Ok(res) => match res { Some(controller) => match controller.apply(controller.delete(pos, count)).await {
true => Ok(Value::Nil), Err(e) => Err(Value::from(format!("could not send delete: {}", e))),
false => Err(Value::from("rejected")), Ok(_res) => Ok(Value::Nil),
}, }
Err(e) => Err(Value::from(format!("could not send insert: {}", e))),
} }
}, },
@ -107,13 +116,12 @@ impl Handler for NeovimHandler {
let path = default_empty_str(&args, 0); let path = default_empty_str(&args, 0);
let txt = default_empty_str(&args, 1); let txt = default_empty_str(&args, 1);
let mut c = self.client.clone(); match self.buffer_controller(&path) {
match c.replace(path, txt).await { None => Err(Value::from("no controller for given path")),
Ok(res) => match res { Some(controller) => match controller.apply(controller.replace(&txt)).await {
true => Ok(Value::Nil), Err(e) => Err(Value::from(format!("could not send replace: {}", e))),
false => Err(Value::from("rejected")), Ok(_res) => Ok(Value::Nil),
}, }
Err(e) => Err(Value::from(format!("could not send replace: {}", e))),
} }
}, },
@ -129,63 +137,72 @@ impl Handler for NeovimHandler {
let mut c = self.client.clone(); let mut c = self.client.clone();
let buf = buffer.clone(); match c.attach(path.clone()).await {
match c.attach(path, move |x| {
let lines : Vec<String> = x.split("\n").map(|x| x.to_string()).collect();
let b = buf.clone();
tokio::spawn(async move {
if let Err(e) = b.set_lines(0, -1, false, lines).await {
error!("could not update buffer: {}", e);
}
});
}).await {
Err(e) => Err(Value::from(format!("could not attach to stream: {}", e))), Err(e) => Err(Value::from(format!("could not attach to stream: {}", e))),
Ok(content) => { Ok(controller) => {
let lines : Vec<String> = content.split("\n").map(|x| x.to_string()).collect(); let _controller = controller.clone();
if let Err(e) = buffer.set_lines(0, -1, false, lines).await { let lines : Vec<String> = _controller.content().split("\n").map(|x| x.to_string()).collect();
error!("could not update buffer: {}", e); match buffer.set_lines(0, -1, false, lines).await {
Err(e) => Err(Value::from(format!("could not sync buffer: {}", e))),
Ok(()) => {
tokio::spawn(async move {
loop {
_controller.wait().await;
let lines : Vec<String> = _controller.content().split("\n").map(|x| x.to_string()).collect();
if let Err(e) = buffer.set_lines(0, -1, false, lines).await {
error!("could not update buffer: {}", e);
}
}
});
self.factories.lock().unwrap().insert(path, controller);
Ok(Value::Nil)
}
} }
Ok(Value::Nil)
}, },
} }
}, },
"detach" => { "detach" => {
if args.len() < 1 { Err(Value::from("unimplemented! try with :q!"))
return Err(Value::from("no path given")); // if args.len() < 1 {
} // return Err(Value::from("no path given"));
let path = default_empty_str(&args, 0); // }
let mut c = self.client.clone(); // let path = default_empty_str(&args, 0);
c.detach(path); // let mut c = self.client.clone();
Ok(Value::Nil) // c.detach(path);
// Ok(Value::Nil)
}, },
"listen" => { "listen" => {
if args.len() < 1 {
return Err(Value::from("no path given"));
}
let path = default_empty_str(&args, 0);
let mut c = self.client.clone();
let ns = nvim.create_namespace("Cursor").await let ns = nvim.create_namespace("Cursor").await
.map_err(|e| Value::from(format!("could not create namespace: {}", e)))?; .map_err(|e| Value::from(format!("could not create namespace: {}", e)))?;
let buf = nvim.get_current_buf().await let buf = nvim.get_current_buf().await
.map_err(|e| Value::from(format!("could not get current buf: {}", e)))?; .map_err(|e| Value::from(format!("could not get current buf: {}", e)))?;
match c.listen(path, move |cur| { let mut c = self.client.clone();
let _b = buf.clone(); match c.listen().await {
tokio::spawn(async move {
if let Err(e) = _b.clear_namespace(ns, 0, -1).await {
error!("could not clear previous cursor highlight: {}", e);
}
if let Err(e) = _b.add_highlight(ns, "ErrorMsg", cur.row-1, cur.col, cur.col+1).await {
error!("could not create highlight for cursor: {}", e);
}
});
}).await {
Ok(()) => Ok(Value::Nil),
Err(e) => Err(Value::from(format!("could not listen cursors: {}", e))), Err(e) => Err(Value::from(format!("could not listen cursors: {}", e))),
Ok(cursor) => {
let mut sub = cursor.sub();
debug!("spawning cursor processing worker");
tokio::spawn(async move {
loop {
match sub.recv().await {
Err(e) => return error!("error receiving cursor update from controller: {}", e),
Ok((_usr, cur)) => {
if let Err(e) = buf.clear_namespace(ns, 0, -1).await {
error!("could not clear previous cursor highlight: {}", e);
}
if let Err(e) = buf.add_highlight(ns, "ErrorMsg", cur.start.row-1, cur.start.col, cur.start.col+1).await {
error!("could not create highlight for cursor: {}", e);
}
}
}
}
});
Ok(Value::Nil)
},
} }
}, },
@ -199,8 +216,8 @@ impl Handler for NeovimHandler {
let mut c = self.client.clone(); let mut c = self.client.clone();
match c.cursor(path, row, col).await { match c.cursor(path, row, col).await {
Ok(()) => Ok(Value::Nil), Ok(_) => Ok(Value::Nil),
Err(e) => Err(Value::from(format!("could not send cursor update: {}", e))), Err(e) => Err(Value:: from(format!("could not update cursor: {}", e))),
} }
}, },
@ -259,6 +276,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let handler: NeovimHandler = NeovimHandler { let handler: NeovimHandler = NeovimHandler {
client: client.into(), client: client.into(),
factories: Arc::new(Mutex::new(BTreeMap::new())),
}; };
let (_nvim, io_handler) = create::new_parent(handler).await; let (_nvim, io_handler) = create::new_parent(handler).await;

View file

@ -1,194 +1,138 @@
/// TODO better name for this file use std::sync::Arc;
use std::{sync::{Arc, RwLock}, collections::BTreeMap}; use operational_transform::OperationSeq;
use tracing::{error, warn, info}; use tonic::{transport::Channel, Status};
use tracing::{error, warn};
use uuid::Uuid; use uuid::Uuid;
use crate::{ use crate::{
opfactory::AsyncFactory, cursor::{CursorController, CursorStorage},
proto::{buffer_client::BufferClient, BufferPayload, OperationRequest, RawOp, CursorMov}, operation::{OperationController, OperationProcessor},
tonic::{transport::Channel, Status, Streaming}, proto::{buffer_client::BufferClient, BufferPayload, OperationRequest, CursorMov},
}; };
pub type FactoryStore = Arc<RwLock<BTreeMap<String, Arc<AsyncFactory>>>>;
impl From::<BufferClient<Channel>> for CodempClient {
fn from(x: BufferClient<Channel>) -> CodempClient {
CodempClient {
id: Uuid::new_v4(),
client:x,
factories: Arc::new(RwLock::new(BTreeMap::new())),
}
}
}
#[derive(Clone)] #[derive(Clone)]
pub struct CodempClient { pub struct CodempClient {
id: Uuid, id: String,
client: BufferClient<Channel>, client: BufferClient<Channel>,
factories: FactoryStore, }
impl From::<BufferClient<Channel>> for CodempClient {
fn from(value: BufferClient<Channel>) -> Self {
CodempClient { id: Uuid::new_v4().to_string(), client: value }
}
} }
impl CodempClient { impl CodempClient {
fn get_factory(&self, path: &String) -> Result<Arc<AsyncFactory>, Status> { pub fn new(id: String, client: BufferClient<Channel>) -> Self {
match self.factories.read().unwrap().get(path) { CodempClient { id, client }
Some(f) => Ok(f.clone()),
None => Err(Status::not_found("no active buffer for given path")),
}
}
pub fn add_factory(&self, path: String, factory:Arc<AsyncFactory>) {
self.factories.write().unwrap().insert(path, factory);
} }
pub async fn create(&mut self, path: String, content: Option<String>) -> Result<bool, Status> { pub async fn create(&mut self, path: String, content: Option<String>) -> Result<bool, Status> {
let req = BufferPayload { let req = BufferPayload {
path: path.clone(), path, content,
content: content.clone(), user: self.id.clone(),
user: self.id.to_string(),
}; };
let res = self.client.create(req).await?.into_inner(); let res = self.client.create(req).await?;
Ok(res.accepted) Ok(res.into_inner().accepted)
} }
pub async fn insert(&mut self, path: String, txt: String, pos: u64) -> Result<bool, Status> { pub async fn listen(&mut self) -> Result<Arc<CursorController>, Status> {
let factory = self.get_factory(&path)?;
match factory.insert(txt, pos).await {
Err(e) => Err(Status::internal(format!("invalid operation: {}", e))),
Ok(op) => {
let req = OperationRequest {
path,
hash: "".into(),
user: self.id.to_string(),
opseq: serde_json::to_string(&op)
.map_err(|_| Status::invalid_argument("could not serialize opseq"))?,
};
let res = self.client.edit(req).await?.into_inner();
if let Err(e) = factory.ack(op.clone()).await {
error!("could not ack op '{:?}' : {}", op, e);
}
Ok(res.accepted)
},
}
}
pub async fn delete(&mut self, path: String, pos: u64, count: u64) -> Result<bool, Status> {
let factory = self.get_factory(&path)?;
match factory.delete(pos, count).await {
Err(e) => Err(Status::internal(format!("invalid operation: {}", e))),
Ok(op) => {
let req = OperationRequest {
path,
hash: "".into(),
user: self.id.to_string(),
opseq: serde_json::to_string(&op)
.map_err(|_| Status::invalid_argument("could not serialize opseq"))?,
};
let res = self.client.edit(req).await?.into_inner();
if let Err(e) = factory.ack(op.clone()).await {
error!("could not ack op '{:?}' : {}", op, e);
}
Ok(res.accepted)
},
}
}
pub async fn replace(&mut self, path: String, txt: String) -> Result<bool, Status> {
let factory = self.get_factory(&path)?;
match factory.replace(txt).await {
Err(e) => Err(Status::internal(format!("invalid operation: {}", e))),
Ok(op) => {
let req = OperationRequest {
path,
hash: "".into(),
user: self.id.to_string(),
opseq: serde_json::to_string(&op)
.map_err(|_| Status::invalid_argument("could not serialize opseq"))?,
};
let res = self.client.edit(req).await?.into_inner();
if let Err(e) = factory.ack(op.clone()).await {
error!("could not ack op '{:?}' : {}", op, e);
}
Ok(res.accepted)
},
}
}
pub async fn cursor(&mut self, path: String, row: i64, col: i64) -> Result<(), Status> {
let req = CursorMov {
path, user: self.id.to_string(),
row, col,
};
let _res = self.client.cursor(req).await?.into_inner();
Ok(())
}
pub async fn listen<F>(&mut self, path: String, callback: F) -> Result<(), Status>
where F : Fn(CursorMov) -> () + Send + 'static {
let req = BufferPayload { let req = BufferPayload {
path, path: "".into(),
content: None, content: None,
user: self.id.to_string(), user: self.id.clone(),
}; };
let mut stream = self.client.listen(req).await?.into_inner(); let mut stream = self.client.listen(req).await?.into_inner();
let controller = Arc::new(CursorController::new());
let _controller = controller.clone();
tokio::spawn(async move { tokio::spawn(async move {
// TODO catch some errors loop {
while let Ok(Some(x)) = stream.message().await { match stream.message().await {
callback(x) Err(e) => break error!("error receiving cursor: {}", e),
Ok(None) => break,
Ok(Some(x)) => { _controller.update(x); },
}
} }
}); });
Ok(())
Ok(controller)
} }
pub async fn attach<F>(&mut self, path: String, callback: F) -> Result<String, Status> pub async fn attach(&mut self, path: String) -> Result<Arc<OperationController>, Status> {
where F : Fn(String) -> () + Send + 'static {
let content = self.sync(path.clone()).await?;
let factory = Arc::new(AsyncFactory::new(Some(content.clone())));
self.add_factory(path.clone(), factory.clone());
let req = BufferPayload { let req = BufferPayload {
path, path: path.clone(),
content: None, content: None,
user: self.id.to_string(), user: self.id.clone(),
}; };
let stream = self.client.attach(req).await?.into_inner();
tokio::spawn(async move { Self::worker(stream, factory, callback).await } );
Ok(content)
}
pub fn detach(&mut self, path: String) { let content = self.client.sync(req.clone())
self.factories.write().unwrap().remove(&path); .await?
info!("|| detached from buffer"); .into_inner()
} .content;
async fn sync(&mut self, path: String) -> Result<String, Status> { let mut stream = self.client.attach(req).await?.into_inner();
let res = self.client.sync(
BufferPayload { let factory = Arc::new(OperationController::new(content.unwrap_or("".into())));
path, content: None, user: self.id.to_string(),
let _factory = factory.clone();
tokio::spawn(async move {
loop {
match stream.message().await {
Err(e) => break error!("error receiving update: {}", e),
Ok(None) => break, // clean exit
Ok(Some(x)) => match serde_json::from_str::<OperationSeq>(&x.opseq) {
Err(e) => break error!("error deserializing opseq: {}", e),
Ok(v) => match _factory.process(v).await {
Err(e) => break error!("could not apply operation from server: {}", e),
Ok(_txt) => {
// send event containing where the change happened
}
}
},
}
} }
).await?; });
Ok(res.into_inner().content.unwrap_or("".into()))
let mut _client = self.client.clone();
let _uid = self.id.clone();
let _factory = factory.clone();
let _path = path.clone();
tokio::spawn(async move {
while let Some(op) = _factory.poll().await {
let req = OperationRequest {
hash: "".into(),
opseq: serde_json::to_string(&op).unwrap(),
path: _path.clone(),
user: _uid.clone(),
};
match _client.edit(req).await {
Ok(res) => match res.into_inner().accepted {
true => { _factory.ack().await; },
false => {
warn!("server rejected operation, retrying in 1s");
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
},
Err(e) => error!("could not send edit: {}", e),
}
}
});
Ok(factory)
} }
async fn worker<F>(mut stream: Streaming<RawOp>, factory: Arc<AsyncFactory>, callback: F) pub async fn cursor(&mut self, path: String, row: i64, col: i64) -> Result<bool, Status> {
where F : Fn(String) -> () { let req = CursorMov {
info!("|> buffer worker started"); path, row, col,
loop { user: self.id.clone(),
match stream.message().await { };
Err(e) => break error!("error receiving change: {}", e), let res = self.client.cursor(req).await?.into_inner();
Ok(v) => match v { Ok(res.accepted)
None => break warn!("stream closed"),
Some(operation) => match serde_json::from_str(&operation.opseq) {
Err(e) => break error!("could not deserialize opseq: {}", e),
Ok(op) => match factory.process(op).await {
Err(e) => break error!("desynched: {}", e),
Ok(x) => callback(x),
},
}
},
}
}
info!("[] buffer worker stopped");
} }
} }

87
src/lib/cursor.rs Normal file
View file

@ -0,0 +1,87 @@
use std::{collections::HashMap, sync::Mutex};
use tokio::sync::broadcast;
use tracing::{info, error, debug, warn};
use crate::proto::CursorMov;
/// Note that this differs from any hashmap in its put method: no &mut!
pub trait CursorStorage {
fn get(&self, id: &String) -> Option<Cursor>;
fn put(&self, id: String, val: Cursor);
fn update(&self, event: CursorMov) -> Option<Cursor> {
let mut cur = self.get(&event.user)?;
cur.buffer = event.path;
cur.start = (event.row, event.col).into();
self.put(event.user, cur.clone());
Some(cur)
}
}
#[derive(Copy, Clone, Debug, Default)]
pub struct Position {
pub row: i64,
pub col: i64,
}
impl From::<(i64, i64)> for Position {
fn from((row, col): (i64, i64)) -> Self {
Position { row, col }
}
}
#[derive(Clone, Debug, Default)]
pub struct Cursor {
pub buffer: String,
pub start: Position,
pub end: Position,
}
pub struct CursorController {
users: Mutex<HashMap<String, Cursor>>,
bus: broadcast::Sender<(String, Cursor)>,
_bus_keepalive: Mutex<broadcast::Receiver<(String, Cursor)>>,
}
impl CursorController {
pub fn new() -> Self {
let (tx, _rx) = broadcast::channel(64);
CursorController {
users: Mutex::new(HashMap::new()),
bus: tx,
_bus_keepalive: Mutex::new(_rx),
}
}
pub fn sub(&self) -> broadcast::Receiver<(String, Cursor)> {
self.bus.subscribe()
}
}
impl CursorStorage for CursorController {
fn update(&self, event: CursorMov) -> Option<Cursor> {
debug!("processing cursor event: {:?}", event);
let mut cur = self.get(&event.user).unwrap_or(Cursor::default());
cur.buffer = event.path;
cur.start = (event.row, event.col).into();
cur.end = (event.row, event.col).into();
self.put(event.user.clone(), cur.clone());
if let Err(e) = self.bus.send((event.user, cur.clone())) {
error!("could not broadcast cursor event: {}", e);
} else { // this is because once there are no receivers, nothing else can be sent
if let Err(e) = self._bus_keepalive.lock().unwrap().try_recv() {
warn!("could not consume event: {}", e);
}
}
Some(cur)
}
fn get(&self, id: &String) -> Option<Cursor> {
Some(self.users.lock().unwrap().get(id)?.clone())
}
fn put(&self, id: String, val: Cursor) {
self.users.lock().unwrap().insert(id, val);
}
}

View file

@ -1,6 +1,7 @@
pub mod proto; pub mod proto;
pub mod opfactory;
pub mod client; pub mod client;
pub mod operation;
pub mod cursor;
pub use tonic; pub use tonic;
pub use tokio; pub use tokio;

View file

@ -0,0 +1,53 @@
use operational_transform::OperationSeq;
use similar::{TextDiff, ChangeTag};
pub trait OperationFactory {
fn content(&self) -> String;
fn replace(&self, txt: &str) -> OperationSeq {
let mut out = OperationSeq::default();
let content = self.content();
if content == txt {
return out; // TODO this won't work, should we return a noop instead?
}
let diff = TextDiff::from_chars(content.as_str(), txt);
for change in diff.iter_all_changes() {
match change.tag() {
ChangeTag::Equal => out.retain(1),
ChangeTag::Delete => out.delete(1),
ChangeTag::Insert => out.insert(change.value()),
}
}
out
}
fn insert(&self, txt: &str, pos: u64) -> OperationSeq {
let mut out = OperationSeq::default();
let total = self.content().len() as u64;
out.retain(pos);
out.insert(txt);
out.retain(total - pos);
out
}
fn delete(&self, pos: u64, count: u64) -> OperationSeq {
let mut out = OperationSeq::default();
let len = self.content().len() as u64;
out.retain(pos - count);
out.delete(count);
out.retain(len - pos);
out
}
fn cancel(&self, pos: u64, count: u64) -> OperationSeq {
let mut out = OperationSeq::default();
let len = self.content().len() as u64;
out.retain(pos);
out.delete(count);
out.retain(len - (pos+count));
out
}
}

5
src/lib/operation/mod.rs Normal file
View file

@ -0,0 +1,5 @@
pub mod factory;
pub mod processor;
pub use processor::{OperationController, OperationProcessor};
pub use factory::OperationFactory;

View file

@ -0,0 +1,93 @@
use std::{sync::Mutex, collections::VecDeque};
use operational_transform::{OperationSeq, OTError};
use tokio::sync::watch;
use crate::operation::factory::OperationFactory;
#[tonic::async_trait]
pub trait OperationProcessor : OperationFactory {
async fn apply(&self, op: OperationSeq) -> Result<String, OTError>;
async fn process(&self, op: OperationSeq) -> Result<String, OTError>;
async fn poll(&self) -> Option<OperationSeq>;
async fn ack(&self) -> Option<OperationSeq>;
async fn wait(&self);
}
pub struct OperationController {
text: Mutex<String>,
queue: Mutex<VecDeque<OperationSeq>>,
last: Mutex<watch::Receiver<OperationSeq>>,
notifier: watch::Sender<OperationSeq>,
changed: Mutex<watch::Receiver<()>>,
changed_notifier: watch::Sender<()>,
}
impl OperationController {
pub fn new(content: String) -> Self {
let (tx, rx) = watch::channel(OperationSeq::default());
let (done, wait) = watch::channel(());
OperationController {
text: Mutex::new(content),
queue: Mutex::new(VecDeque::new()),
last: Mutex::new(rx),
notifier: tx,
changed: Mutex::new(wait),
changed_notifier: done,
}
}
}
impl OperationFactory for OperationController {
fn content(&self) -> String {
self.text.lock().unwrap().clone()
}
}
#[tonic::async_trait]
impl OperationProcessor for OperationController {
async fn apply(&self, op: OperationSeq) -> Result<String, OTError> {
let txt = self.content();
let res = op.apply(&txt)?;
*self.text.lock().unwrap() = res.clone();
self.queue.lock().unwrap().push_back(op.clone());
self.notifier.send(op);
Ok(res)
}
async fn wait(&self) {
let mut blocker = self.changed.lock().unwrap().clone();
blocker.changed().await;
blocker.changed().await;
}
async fn process(&self, mut op: OperationSeq) -> Result<String, OTError> {
let mut queue = self.queue.lock().unwrap();
for el in queue.iter_mut() {
(op, *el) = op.transform(el)?;
}
let txt = self.content();
let res = op.apply(&txt)?;
*self.text.lock().unwrap() = res.clone();
self.changed_notifier.send(());
Ok(res)
}
async fn poll(&self) -> Option<OperationSeq> {
let len = self.queue.lock().unwrap().len();
if len <= 0 {
let mut recv = self.last.lock().unwrap().clone();
// TODO this is not 100% reliable
recv.changed().await; // acknowledge current state
recv.changed().await; // wait for a change in state
}
Some(self.queue.lock().unwrap().get(0)?.clone())
}
async fn ack(&self) -> Option<OperationSeq> {
self.queue.lock().unwrap().pop_front()
}
}

View file

@ -1,234 +0,0 @@
use std::collections::VecDeque;
use operational_transform::{OperationSeq, OTError};
use similar::TextDiff;
use tokio::sync::{mpsc, watch, oneshot};
use tracing::{error, warn};
#[derive(Clone)]
pub struct OperationFactory {
content: String,
queue: VecDeque<OperationSeq>,
}
impl OperationFactory {
pub fn new(init: Option<String>) -> Self {
OperationFactory {
content: init.unwrap_or(String::new()),
queue: VecDeque::new(),
}
}
fn apply(&mut self, op: OperationSeq) -> Result<OperationSeq, OTError> {
if op.is_noop() { return Err(OTError) }
self.content = op.apply(&self.content)?;
self.queue.push_back(op.clone());
Ok(op)
}
// TODO remove the need for this
pub fn content(&self) -> String {
self.content.clone()
}
pub fn check(&self, txt: &str) -> bool {
self.content == txt
}
pub fn replace(&mut self, txt: &str) -> Result<OperationSeq, OTError> {
let mut out = OperationSeq::default();
if self.content == txt { // TODO throw and error rather than wasting everyone's resources
return Err(OTError); // nothing to do
}
let diff = TextDiff::from_chars(self.content.as_str(), txt);
for change in diff.iter_all_changes() {
match change.tag() {
similar::ChangeTag::Equal => out.retain(1),
similar::ChangeTag::Delete => out.delete(1),
similar::ChangeTag::Insert => out.insert(change.value()),
}
}
self.content = out.apply(&self.content)?;
Ok(out)
}
pub fn insert(&mut self, txt: &str, pos: u64) -> Result<OperationSeq, OTError> {
let mut out = OperationSeq::default();
let total = self.content.len() as u64;
out.retain(pos);
out.insert(txt);
out.retain(total - pos);
Ok(self.apply(out)?)
}
pub fn delete(&mut self, pos: u64, count: u64) -> Result<OperationSeq, OTError> {
let mut out = OperationSeq::default();
let len = self.content.len() as u64;
out.retain(pos - count);
out.delete(count);
out.retain(len - pos);
Ok(self.apply(out)?)
}
pub fn cancel(&mut self, pos: u64, count: u64) -> Result<OperationSeq, OTError> {
let mut out = OperationSeq::default();
let len = self.content.len() as u64;
out.retain(pos);
out.delete(count);
out.retain(len - (pos+count));
Ok(self.apply(out)?)
}
pub fn ack(&mut self, op: OperationSeq) -> Result<(), OTError> { // TODO use a different error?
// TODO is manually iterating from behind worth the manual search boilerplate?
for (i, o) in self.queue.iter().enumerate().rev() {
if o == &op {
self.queue.remove(i);
return Ok(());
}
}
warn!("could not ack op {:?} from {:?}", op, self.queue);
Err(OTError)
}
pub fn process(&mut self, mut op: OperationSeq) -> Result<String, OTError> {
for o in self.queue.iter_mut() {
(op, *o) = op.transform(o)?;
}
self.content = op.apply(&self.content)?;
Ok(self.content.clone())
}
}
pub struct AsyncFactory {
run: watch::Sender<bool>,
ops: mpsc::Sender<OpMsg>,
#[allow(unused)] // TODO is this necessary?
content: watch::Receiver<String>,
}
impl Drop for AsyncFactory {
fn drop(&mut self) {
self.run.send(false).unwrap_or(());
}
}
impl AsyncFactory {
pub fn new(init: Option<String>) -> Self {
let (run_tx, run_rx) = watch::channel(true);
let (ops_tx, ops_rx) = mpsc::channel(64); // TODO hardcoded size
let (txt_tx, txt_rx) = watch::channel("".into());
let worker = AsyncFactoryWorker {
factory: OperationFactory::new(init),
ops: ops_rx,
run: run_rx,
content: txt_tx,
};
tokio::spawn(async move { worker.work().await });
AsyncFactory { run: run_tx, ops: ops_tx, content: txt_rx }
}
pub async fn insert(&self, txt: String, pos: u64) -> Result<OperationSeq, OTError> {
let (tx, rx) = oneshot::channel();
self.ops.send(OpMsg::Exec(OpWrapper::Insert(txt, pos), tx)).await.map_err(|_| OTError)?;
rx.await.map_err(|_| OTError)?
}
pub async fn delete(&self, pos: u64, count: u64) -> Result<OperationSeq, OTError> {
let (tx, rx) = oneshot::channel();
self.ops.send(OpMsg::Exec(OpWrapper::Delete(pos, count), tx)).await.map_err(|_| OTError)?;
rx.await.map_err(|_| OTError)?
}
pub async fn cancel(&self, pos: u64, count: u64) -> Result<OperationSeq, OTError> {
let (tx, rx) = oneshot::channel();
self.ops.send(OpMsg::Exec(OpWrapper::Cancel(pos, count), tx)).await.map_err(|_| OTError)?;
rx.await.map_err(|_| OTError)?
}
pub async fn replace(&self, txt: String) -> Result<OperationSeq, OTError> {
let (tx, rx) = oneshot::channel();
self.ops.send(OpMsg::Exec(OpWrapper::Replace(txt), tx)).await.map_err(|_| OTError)?;
rx.await.map_err(|_| OTError)?
}
pub async fn process(&self, opseq: OperationSeq) -> Result<String, OTError> {
let (tx, rx) = oneshot::channel();
self.ops.send(OpMsg::Process(opseq, tx)).await.map_err(|_| OTError)?;
rx.await.map_err(|_| OTError)?
}
pub async fn ack(&self, opseq: OperationSeq) -> Result<(), OTError> {
let (tx, rx) = oneshot::channel();
self.ops.send(OpMsg::Ack(opseq, tx)).await.map_err(|_| OTError)?;
rx.await.map_err(|_| OTError)?
}
}
#[derive(Debug)]
enum OpMsg {
Exec(OpWrapper, oneshot::Sender<Result<OperationSeq, OTError>>),
Process(OperationSeq, oneshot::Sender<Result<String, OTError>>),
Ack(OperationSeq, oneshot::Sender<Result<(), OTError>>),
}
#[derive(Debug)]
enum OpWrapper {
Insert(String, u64),
Delete(u64, u64),
Cancel(u64, u64),
Replace(String),
}
struct AsyncFactoryWorker {
factory: OperationFactory,
ops: mpsc::Receiver<OpMsg>,
run: watch::Receiver<bool>,
content: watch::Sender<String>
}
impl AsyncFactoryWorker {
async fn work(mut self) {
while *self.run.borrow() {
tokio::select! { // periodically check run so that we stop cleanly
recv = self.ops.recv() => {
match recv {
Some(msg) => {
match msg {
OpMsg::Exec(op, tx) => tx.send(self.exec(op)).unwrap_or(()),
OpMsg::Process(opseq, tx) => tx.send(self.factory.process(opseq)).unwrap_or(()),
OpMsg::Ack(opseq, tx) => tx.send(self.factory.ack(opseq)).unwrap_or(()),
}
if let Err(e) = self.content.send(self.factory.content()) {
error!("error updating content: {}", e);
break;
}
},
None => break,
}
},
_ = tokio::time::sleep(std::time::Duration::from_secs(1)) => {},
};
}
}
fn exec(&mut self, op: OpWrapper) -> Result<OperationSeq, OTError> {
match op {
OpWrapper::Insert(txt, pos) => Ok(self.factory.insert(&txt, pos)?),
OpWrapper::Delete(pos, count) => Ok(self.factory.delete(pos, count)?),
OpWrapper::Cancel(pos, count) => Ok(self.factory.cancel(pos, count)?),
OpWrapper::Replace(txt) => Ok(self.factory.replace(&txt)?),
}
}
}