feat: updated to new glue, magic.

fix: added sublime junk to gitignore
This commit is contained in:
Camillo Schenone 2024-08-05 22:44:46 +02:00
parent b75caaf959
commit f9784e961d
8 changed files with 320 additions and 267 deletions

1
.gitignore vendored
View file

@ -2,6 +2,7 @@
Cargo.lock
.cargo
.vscode/
*.sublime-*
# js
node_modules/

View file

@ -27,26 +27,32 @@ tokio-stream = { version = "0.1" }
serde = { version = "1.0.193", features = ["derive"] }
dashmap = { version = "5.5.3" }
postcard = { version = "1.0.8" }
# glue (multiple)
lazy_static = { version = "1.4.0", optional = true }
tracing-subscriber = { version = "0.3.18", optional = true }
# glue (java)
jni = { version = "0.21.1", features = ["invocation"], optional = true }
jni-sys = { version = "0.3.0", optional = true }
rifgen = { git = "https://github.com/Kofituo/rifgen.git", rev = "d27d9785b2febcf5527f1deb6a846be5d583f7d7", optional = true }
log = { version = "0.4.21", optional = true }
# glue (lua)
mlua = { version = "0.9.6", features = ["module", "luajit", "send"], optional = true }
thiserror = { version = "1.0.57", optional = true }
derive_more = { version = "0.99.17", optional = true }
# glue (js)
rmpv = { version = "1", optional = true }
napi = { version = "2", features = ["full"], optional = true }
napi-derive = { version="2", optional = true}
futures = { version = "0.3.28", optional = true }
# glue (python)
pyo3 = { version = "0.20", features = ["extension-module"], optional = true}
pyo3-asyncio = { version = "0.20", features = ["tokio-runtime"], optional = true }
[build-dependencies]
# glue (java)
flapigen = { version = "0.6.0", optional = true }

View file

@ -3,7 +3,11 @@
//! an editor-friendly representation of a text change in a buffer
//! to easily interface with codemp from various editors
use crate::woot::{WootResult, woot::Woot, crdt::{TextEditor, CRDT}};
use crate::woot::{
crdt::{TextEditor, CRDT},
woot::Woot,
WootResult,
};
/// an atomic and orderable operation
///
@ -27,6 +31,7 @@ pub struct Op(pub(crate) woot::crdt::Op);
/// `TextChange { span: 3..4, content: "".into() }`
///
#[derive(Clone, Debug, Default)]
#[cfg_attr(feature = "python", pyo3::pyclass)]
pub struct TextChange {
/// range of text change, as char indexes in buffer previous state
pub span: std::ops::Range<usize>,
@ -49,7 +54,7 @@ impl TextChange {
} else {
end += len
}
},
}
_ => {
end = 0;
from_beginning = false;
@ -68,7 +73,9 @@ impl TextChange {
/// consume the [TextChange], transforming it into a Vec of [Op]
pub fn transform(self, woot: &Woot) -> WootResult<Vec<Op>> {
let mut out = Vec::new();
if self.is_empty() { return Ok(out); } // no-op
if self.is_empty() {
return Ok(out);
} // no-op
let view = woot.view();
let Some(span) = view.get(self.span.clone()) else {
return Err(crate::woot::WootError::OutOfBounds);
@ -76,17 +83,21 @@ impl TextChange {
let diff = similar::TextDiff::from_chars(span, &self.content);
for (i, diff) in diff.iter_all_changes().enumerate() {
match diff.tag() {
similar::ChangeTag::Equal => {},
similar::ChangeTag::Equal => {}
similar::ChangeTag::Delete => match woot.delete_one(self.span.start + i) {
Err(e) => tracing::error!("could not create deletion: {}", e),
Ok(op) => out.push(Op(op)),
},
similar::ChangeTag::Insert => {
match woot.insert(self.span.start + i, diff.value()) {
Ok(ops) => for op in ops { out.push(Op(op)) },
Ok(ops) => {
for op in ops {
out.push(Op(op))
}
}
Err(e) => tracing::error!("could not create insertion: {}", e),
}
},
}
}
}
Ok(out)
@ -131,7 +142,7 @@ mod tests {
fn textchange_diff_works_for_deletions() {
let change = super::TextChange::from_diff(
"sphinx of black quartz, judge my vow",
"sphinx of quartz, judge my vow"
"sphinx of quartz, judge my vow",
);
assert_eq!(change.span, 10..16);
assert_eq!(change.content, "");
@ -141,7 +152,7 @@ mod tests {
fn textchange_diff_works_for_insertions() {
let change = super::TextChange::from_diff(
"sphinx of quartz, judge my vow",
"sphinx of black quartz, judge my vow"
"sphinx of black quartz, judge my vow",
);
assert_eq!(change.span, 10..10);
assert_eq!(change.content, "black ");
@ -151,7 +162,7 @@ mod tests {
fn textchange_diff_works_for_changes() {
let change = super::TextChange::from_diff(
"sphinx of black quartz, judge my vow",
"sphinx who watches the desert, judge my vow"
"sphinx who watches the desert, judge my vow",
);
assert_eq!(change.span, 7..22);
assert_eq!(change.content, "who watches the desert");
@ -159,30 +170,45 @@ mod tests {
#[test]
fn textchange_apply_works_for_insertions() {
let change = super::TextChange { span: 5..5, content: " cruel".to_string() };
let change = super::TextChange {
span: 5..5,
content: " cruel".to_string(),
};
let result = change.apply("hello world!");
assert_eq!(result, "hello cruel world!");
}
#[test]
fn textchange_apply_works_for_deletions() {
let change = super::TextChange { span: 5..11, content: "".to_string() };
let change = super::TextChange {
span: 5..11,
content: "".to_string(),
};
let result = change.apply("hello cruel world!");
assert_eq!(result, "hello world!");
}
#[test]
fn textchange_apply_works_for_replacements() {
let change = super::TextChange { span: 5..11, content: " not very pleasant".to_string() };
let change = super::TextChange {
span: 5..11,
content: " not very pleasant".to_string(),
};
let result = change.apply("hello cruel world!");
assert_eq!(result, "hello not very pleasant world!");
}
#[test]
fn textchange_apply_never_panics() {
let change = super::TextChange { span: 100..110, content: "a very long string \n which totally matters".to_string() };
let change = super::TextChange {
span: 100..110,
content: "a very long string \n which totally matters".to_string(),
};
let result = change.apply("a short text");
assert_eq!(result, "a short texta very long string \n which totally matters");
assert_eq!(
result,
"a short texta very long string \n which totally matters"
);
}
#[test]
@ -193,7 +219,10 @@ mod tests {
#[test]
fn empty_textchange_doesnt_alter_buffer() {
let change = super::TextChange { span: 42..42, content: "".to_string() };
let change = super::TextChange {
span: 42..42,
content: "".to_string(),
};
let result = change.apply("some important text");
assert_eq!(result, "some important text");
}

View file

@ -3,11 +3,13 @@
//! represents the position of an user's cursor, with
//! information about their identity
use uuid::Uuid;
use codemp_proto as proto;
// use pyo3::prelude::*;
use uuid::Uuid;
/// user cursor position in a buffer
#[derive(Clone, Debug, Default)]
#[cfg_attr(feature = "python", pyo3::pyclass)]
pub struct Cursor {
/// range of text change, as char indexes in buffer previous state
pub start: (i32, i32),
@ -16,7 +18,6 @@ pub struct Cursor {
pub user: Option<Uuid>,
}
impl From<proto::cursor::CursorPosition> for Cursor {
fn from(value: proto::cursor::CursorPosition) -> Self {
Self {
@ -32,8 +33,14 @@ impl From<Cursor> for proto::cursor::CursorPosition {
fn from(value: Cursor) -> Self {
Self {
buffer: proto::files::BufferNode { path: value.buffer },
start: proto::cursor::RowCol { row: value.start.0, col: value.start.1 },
end: proto::cursor::RowCol { row: value.end.0, col: value.end.1 },
start: proto::cursor::RowCol {
row: value.start.0,
col: value.start.1,
},
end: proto::cursor::RowCol {
row: value.end.0,
col: value.end.1,
},
}
}
}
@ -52,12 +59,20 @@ impl From<proto::cursor::CursorEvent> for Cursor {
impl From<Cursor> for proto::cursor::CursorEvent {
fn from(value: Cursor) -> Self {
Self {
user: proto::common::Identity { id: value.user.unwrap_or_default().to_string() },
user: proto::common::Identity {
id: value.user.unwrap_or_default().to_string(),
},
position: proto::cursor::CursorPosition {
buffer: proto::files::BufferNode { path: value.buffer },
start: proto::cursor::RowCol { row: value.start.0, col: value.start.1 },
end: proto::cursor::RowCol { row: value.end.0, col: value.end.1 },
}
start: proto::cursor::RowCol {
row: value.start.0,
col: value.start.1,
},
end: proto::cursor::RowCol {
row: value.end.0,
col: value.end.1,
},
},
}
}
}

View file

@ -2,15 +2,14 @@
//!
//! a controller implementation for buffer actions
use std::sync::Arc;
use tokio::sync::oneshot;
use tokio::sync::{watch, mpsc};
use tokio::sync::{mpsc, watch};
use tonic::async_trait;
use crate::errors::IgnorableError;
use crate::api::Controller;
use crate::errors::IgnorableError;
use crate::api::TextChange;
@ -22,6 +21,7 @@ use crate::api::TextChange;
///
/// upon dropping this handle will stop the associated worker
#[derive(Debug, Clone)]
#[cfg_attr(feature = "python", pyo3::pyclass)]
pub struct BufferController(Arc<BufferControllerInner>);
#[derive(Debug)]
@ -42,14 +42,14 @@ impl BufferController {
poller: mpsc::UnboundedSender<oneshot::Sender<()>>,
stop: mpsc::UnboundedSender<()>,
) -> Self {
Self(Arc::new(
BufferControllerInner {
Self(Arc::new(BufferControllerInner {
name,
content, operations, poller,
content,
operations,
poller,
seen: StatusCheck::default(),
_stop: Arc::new(StopOnDrop(stop)),
}
))
}))
}
/// unique identifier of buffer
@ -69,7 +69,9 @@ struct StopOnDrop(mpsc::UnboundedSender<()>);
impl Drop for StopOnDrop {
fn drop(&mut self) {
self.0.send(()).unwrap_or_warn("could not send stop message to worker");
self.0
.send(())
.unwrap_or_warn("could not send stop message to worker");
}
}
@ -85,7 +87,8 @@ impl Controller<TextChange> for BufferController {
}
let (tx, rx) = oneshot::channel::<()>();
self.0.poller.send(tx)?;
rx.await.map_err(|_| crate::Error::Channel { send: false })?;
rx.await
.map_err(|_| crate::Error::Channel { send: false })?;
Ok(())
}
@ -121,19 +124,22 @@ impl Controller<TextChange> for BufferController {
}
#[derive(Debug, Clone)]
struct StatusCheck<T : Clone> {
struct StatusCheck<T: Clone> {
state: watch::Receiver<T>,
updater: Arc<watch::Sender<T>>,
}
impl<T : Clone + Default> Default for StatusCheck<T> {
impl<T: Clone + Default> Default for StatusCheck<T> {
fn default() -> Self {
let (tx, rx) = watch::channel(T::default());
StatusCheck { state: rx, updater: Arc::new(tx) }
StatusCheck {
state: rx,
updater: Arc::new(tx),
}
}
}
impl<T : Clone> StatusCheck<T> {
impl<T: Clone> StatusCheck<T> {
fn update(&self, state: T) -> T {
self.updater.send_replace(state)
}

View file

@ -1,13 +1,21 @@
//! ### controller
//!
//! a controller implementation for cursor actions
use std::sync::Arc;
use tokio::sync::{mpsc, broadcast::{self, error::{TryRecvError, RecvError}}, Mutex, watch};
use tokio::sync::{
broadcast::{
self,
error::{RecvError, TryRecvError},
},
mpsc, watch, Mutex,
};
use tonic::async_trait;
use crate::{api::{Cursor, Controller}, errors::IgnorableError};
use crate::{
api::{Controller, Cursor},
errors::IgnorableError,
};
use codemp_proto::cursor::{CursorEvent, CursorPosition};
/// the cursor controller implementation
///
@ -17,10 +25,11 @@ use codemp_proto::cursor::{CursorEvent, CursorPosition};
/// * a mutex over a stream of inbound cursor events
/// * a channel to stop the associated worker
///
/// for each controller a worker exists, managing outgoing and inbound event queues
/// for each controller a worker exists , managing outgoing and inbound event queues
///
/// upon dropping this handle will stop the associated worker
#[derive(Debug, Clone)]
#[cfg_attr(feature = "python", pyo3::pyclass)]
pub struct CursorController(Arc<CursorControllerInner>);
#[derive(Debug)]
@ -33,7 +42,10 @@ struct CursorControllerInner {
impl Drop for CursorController {
fn drop(&mut self) {
self.0.stop.send(()).unwrap_or_warn("could not stop cursor actor")
self.0
.stop
.send(())
.unwrap_or_warn("could not stop cursor actor")
}
}
@ -44,9 +56,12 @@ impl CursorController {
stream: Mutex<broadcast::Receiver<CursorEvent>>,
stop: mpsc::UnboundedSender<()>,
) -> Self {
Self(Arc::new(
CursorControllerInner { op, last_op, stream, stop }
))
Self(Arc::new(CursorControllerInner {
op,
last_op,
stream,
stop,
}))
}
}
@ -73,7 +88,7 @@ impl Controller<Cursor> for CursorController {
Err(TryRecvError::Lagged(n)) => {
tracing::warn!("cursor channel lagged, skipping {} events", n);
Ok(stream.try_recv().map(|x| x.into()).ok())
},
}
}
}
@ -87,7 +102,11 @@ impl Controller<Cursor> for CursorController {
Err(RecvError::Closed) => Err(crate::Error::Channel { send: false }),
Err(RecvError::Lagged(n)) => {
tracing::error!("cursor channel lagged behind, skipping {} events", n);
Ok(stream.recv().await.expect("could not receive after lagging").into())
Ok(stream
.recv()
.await
.expect("could not receive after lagging")
.into())
}
}
}
@ -96,5 +115,4 @@ impl Controller<Cursor> for CursorController {
async fn poll(&self) -> crate::Result<()> {
Ok(self.0.last_op.lock().await.changed().await?)
}
}

View file

@ -4,12 +4,7 @@ use tokio::sync::{mpsc, Mutex, RwLock};
use tracing;
use tracing_subscriber;
use crate::errors::Error as CodempError;
use crate::prelude::*;
use codemp_proto::{
common::Identity, cursor::CursorEvent as CodempCursorEvent,
cursor::CursorPosition as CodempCursorPosition, files::BufferNode,
};
use pyo3::{
exceptions::{PyBaseException, PyConnectionError, PyRuntimeError},
@ -101,12 +96,14 @@ fn init_logger(py: Python<'_>, debug: Option<bool>) -> PyResult<Py<PyLogger>> {
.with_line_number(false)
.with_source_location(false)
.compact();
tracing_subscriber::fmt()
let _ = tracing_subscriber::fmt()
.with_ansi(false)
.event_format(format)
.with_max_level(level)
.with_writer(std::sync::Mutex::new(LoggerProducer(tx)))
.init();
.try_init();
Ok(Py::new(py, PyLogger(Arc::new(Mutex::new(rx))))?)
}
@ -171,7 +168,7 @@ impl PyClient {
return Err(PyConnectionError::new_err("Connect to a server first."));
};
let workspace: PyWorkspace = cli
let workspace: CodempWorkspace = cli
.as_mut()
.unwrap()
.join_workspace(workspace.as_str())
@ -193,10 +190,10 @@ impl PyClient {
};
let Some(ws) = cli.as_ref().unwrap().get_workspace(id.as_str()) else {
return Ok(None)
return Ok(None);
};
Python::with_gil(|py| Ok(Some(Py::new(py, PyWorkspace(ws))?)))
Python::with_gil(|py| Ok(Some(Py::new(py, ws)?)))
})
}
@ -217,20 +214,11 @@ impl PyClient {
}
}
#[pyclass]
struct PyWorkspace(Arc<CodempWorkspace>);
impl From<Arc<CodempWorkspace>> for PyWorkspace {
fn from(value: Arc<CodempWorkspace>) -> Self {
PyWorkspace(value)
}
}
#[pymethods]
impl PyWorkspace {
impl CodempWorkspace {
// join a workspace
fn create<'a>(&'a self, py: Python<'a>, path: String) -> PyResult<&'a PyAny> {
let ws = self.0.clone();
fn pycreate<'a>(&'a self, py: Python<'a>, path: String) -> PyResult<&'a PyAny> {
let ws = self.clone();
pyo3_asyncio::tokio::future_into_py(py, async move {
ws.create(path.as_str()).await?;
@ -238,17 +226,17 @@ impl PyWorkspace {
})
}
fn attach<'a>(&'a self, py: Python<'a>, path: String) -> PyResult<&'a PyAny> {
let ws = self.0.clone();
fn pyattach<'a>(&'a self, py: Python<'a>, path: String) -> PyResult<&'a PyAny> {
let ws = self.clone();
pyo3_asyncio::tokio::future_into_py(py, async move {
let buffctl: PyBufferController = ws.attach(path.as_str()).await?.into();
let buffctl: CodempBufferController = ws.attach(path.as_str()).await?.into();
Python::with_gil(|py| Ok(Py::new(py, buffctl)?))
})
}
fn fetch_buffers<'a>(&'a self, py: Python<'a>) -> PyResult<&'a PyAny> {
let ws = self.0.clone();
fn pyfetch_buffers<'a>(&'a self, py: Python<'a>) -> PyResult<&'a PyAny> {
let ws = self.clone();
pyo3_asyncio::tokio::future_into_py(py, async move {
ws.fetch_buffers().await?;
@ -256,8 +244,8 @@ impl PyWorkspace {
})
}
fn fetch_users<'a>(&'a self, py: Python<'a>) -> PyResult<&'a PyAny> {
let ws = self.0.clone();
fn pyfetch_users<'a>(&'a self, py: Python<'a>) -> PyResult<&'a PyAny> {
let ws = self.clone();
pyo3_asyncio::tokio::future_into_py(py, async move {
ws.fetch_users().await?;
@ -265,23 +253,23 @@ impl PyWorkspace {
})
}
fn list_buffer_users<'a>(&'a self, py: Python<'a>, path: String) -> PyResult<&'a PyAny> {
let ws = self.0.clone();
fn pylist_buffer_users<'a>(&'a self, py: Python<'a>, path: String) -> PyResult<&'a PyAny> {
let ws = self.clone();
pyo3_asyncio::tokio::future_into_py(py, async move {
let usrlist: Vec<PyId> = ws
let usrlist: Vec<String> = ws
.list_buffer_users(path.as_str())
.await?
.into_iter()
.map(PyId::from)
.map(|e| e.id)
.collect();
Ok(usrlist)
})
}
fn delete<'a>(&'a self, py: Python<'a>, path: String) -> PyResult<&'a PyAny> {
let ws = self.0.clone();
fn pydelete<'a>(&'a self, py: Python<'a>, path: String) -> PyResult<&'a PyAny> {
let ws = self.clone();
pyo3_asyncio::tokio::future_into_py(py, async move {
ws.delete(path.as_str()).await?;
@ -289,222 +277,152 @@ impl PyWorkspace {
})
}
fn id(&self, py: Python<'_>) -> Py<PyString> {
PyString::new(py, self.0.id().as_str()).into()
fn pyid(&self, py: Python<'_>) -> Py<PyString> {
PyString::new(py, self.id().as_str()).into()
}
fn cursor(&self, py: Python<'_>) -> PyResult<Py<PyCursorController>> {
Ok(Py::new(py, PyCursorController::from(self.0.cursor()))?)
fn pycursor(&self, py: Python<'_>) -> PyResult<Py<CodempCursorController>> {
Ok(Py::new(py, CodempCursorController::from(self.cursor()))?)
}
fn buffer_by_name(
fn pybuffer_by_name(
&self,
py: Python<'_>,
path: String,
) -> PyResult<Option<Py<PyBufferController>>> {
let Some(bufctl) = self.0.buffer_by_name(path.as_str()) else {
return Ok(None)
) -> PyResult<Option<Py<CodempBufferController>>> {
let Some(bufctl) = self.buffer_by_name(path.as_str()) else {
return Ok(None);
};
Ok(Some(Py::new(py, PyBufferController::from(bufctl))?))
Ok(Some(Py::new(py, CodempBufferController::from(bufctl))?))
}
fn filetree(&self, py: Python<'_>) -> Py<PyList> {
PyList::new(py, self.0.filetree()).into_py(py)
fn pyfiletree(&self, py: Python<'_>) -> Py<PyList> {
PyList::new(py, self.filetree()).into_py(py)
}
}
/* ########################################################################### */
#[pyclass]
struct PyCursorController(Arc<CodempCursorController>);
impl From<Arc<CodempCursorController>> for PyCursorController {
fn from(value: Arc<CodempCursorController>) -> Self {
PyCursorController(value)
}
}
#[pymethods]
impl PyCursorController {
fn send<'a>(&'a self, path: String, start: (i32, i32), end: (i32, i32)) -> PyResult<()> {
let pos = CodempCursorPosition {
buffer: BufferNode { path },
impl CodempCursorController {
fn pysend<'a>(&'a self, path: String, start: (i32, i32), end: (i32, i32)) -> PyResult<()> {
let pos = CodempCursor {
start: start.into(),
end: end.into(),
buffer: path,
user: None,
};
Ok(self.0.send(pos)?)
Ok(self.send(pos)?)
}
fn try_recv(&self, py: Python<'_>) -> PyResult<PyObject> {
match self.0.try_recv()? {
fn pytry_recv(&self, py: Python<'_>) -> PyResult<PyObject> {
match self.try_recv()? {
Some(cur_event) => {
let evt = PyCursorEvent::from(cur_event);
let evt = CodempCursor::from(cur_event);
Ok(evt.into_py(py))
}
None => Ok(py.None()),
}
}
fn recv<'a>(&'a self, py: Python<'a>) -> PyResult<&'a PyAny> {
let rc = self.0.clone();
fn pyrecv<'a>(&'a self, py: Python<'a>) -> PyResult<&'a PyAny> {
let rc = self.clone();
pyo3_asyncio::tokio::future_into_py(py, async move {
let cur_event: PyCursorEvent = rc.recv().await?.into();
let cur_event: CodempCursor = rc.recv().await?.into();
Python::with_gil(|py| Ok(Py::new(py, cur_event)?))
})
}
fn poll<'a>(&'a self, py: Python<'a>) -> PyResult<&'a PyAny> {
let rc = self.0.clone();
fn pypoll<'a>(&'a self, py: Python<'a>) -> PyResult<&'a PyAny> {
let rc = self.clone();
pyo3_asyncio::tokio::future_into_py(py, async move { Ok(rc.poll().await?) })
}
}
#[pyclass]
struct PyBufferController(Arc<CodempBufferController>);
impl From<Arc<CodempBufferController>> for PyBufferController {
fn from(value: Arc<CodempBufferController>) -> Self {
PyBufferController(value)
}
}
#[pymethods]
impl PyBufferController {
fn content<'a>(&self, py: Python<'a>) -> &'a PyString {
PyString::new(py, self.0.content().as_str())
impl CodempBufferController {
fn pycontent<'a>(&self, py: Python<'a>) -> &'a PyString {
PyString::new(py, self.content().as_str())
}
fn send(&self, start: usize, end: usize, txt: String) -> PyResult<()> {
fn pysend(&self, start: usize, end: usize, txt: String) -> PyResult<()> {
let op = CodempTextChange {
span: start..end,
content: txt.into(),
};
Ok(self.0.send(op)?)
Ok(self.send(op)?)
}
fn try_recv(&self, py: Python<'_>) -> PyResult<PyObject> {
match self.0.try_recv()? {
fn pytry_recv(&self, py: Python<'_>) -> PyResult<PyObject> {
match self.try_recv()? {
Some(txt_change) => {
let evt = PyTextChange::from(txt_change);
let evt = CodempTextChange::from(txt_change);
Ok(evt.into_py(py))
}
None => Ok(py.None()),
}
}
fn recv<'a>(&'a self, py: Python<'a>) -> PyResult<&'a PyAny> {
let rc = self.0.clone();
fn pyrecv<'a>(&'a self, py: Python<'a>) -> PyResult<&'a PyAny> {
let rc = self.clone();
pyo3_asyncio::tokio::future_into_py(py, async move {
let txt_change: PyTextChange = rc.recv().await?.into();
let txt_change: CodempTextChange = rc.recv().await?.into();
Python::with_gil(|py| Ok(Py::new(py, txt_change)?))
})
}
fn poll<'a>(&'a self, py: Python<'a>) -> PyResult<&'a PyAny> {
let rc = self.0.clone();
fn pypoll<'a>(&'a self, py: Python<'a>) -> PyResult<&'a PyAny> {
let rc = self.clone();
pyo3_asyncio::tokio::future_into_py(py, async move { Ok(rc.poll().await?) })
}
}
/* ---------- Type Wrappers ----------*/
// All these objects are not meant to be handled rust side.
// Just to be sent to the python heap.
#[pyclass]
struct PyId {
#[pyo3(get, set)]
id: String,
}
impl From<Identity> for PyId {
fn from(value: Identity) -> Self {
PyId { id: value.id }
}
}
#[pyclass]
struct PyCursorEvent {
#[pyo3(get, set)]
user: String,
#[pyo3(get, set)]
buffer: String,
#[pyo3(get, set)]
start: (i32, i32),
#[pyo3(get, set)]
end: (i32, i32),
}
impl From<CodempCursorEvent> for PyCursorEvent {
fn from(value: CodempCursorEvent) -> Self {
// todo, handle this optional better?
let pos = value.position;
PyCursorEvent {
user: value.user.id,
buffer: pos.buffer.path,
start: pos.start.into(),
end: pos.end.into(),
}
}
}
#[pyclass]
struct PyTextChange(CodempTextChange);
impl From<CodempTextChange> for PyTextChange {
fn from(value: CodempTextChange) -> Self {
PyTextChange(value)
}
}
#[pymethods]
impl PyTextChange {
impl CodempTextChange {
#[getter]
fn start_incl(&self) -> PyResult<usize> {
Ok(self.0.span.start)
fn pystart_incl(&self) -> PyResult<usize> {
Ok(self.span.start)
}
#[getter]
fn end_excl(&self) -> PyResult<usize> {
Ok(self.0.span.end)
fn pyend_excl(&self) -> PyResult<usize> {
Ok(self.span.end)
}
#[getter]
fn content(&self) -> PyResult<String> {
Ok(self.0.content.clone())
fn pycontent(&self) -> PyResult<String> {
Ok(self.content.clone())
}
fn is_deletion(&self) -> bool {
self.0.is_deletion()
fn pyis_deletion(&self) -> bool {
self.is_deletion()
}
fn is_addition(&self) -> bool {
self.0.is_addition()
fn pyis_addition(&self) -> bool {
self.is_addition()
}
fn is_empty(&self) -> bool {
self.0.is_empty()
fn pyis_empty(&self) -> bool {
self.is_empty()
}
fn apply(&self, txt: &str) -> String {
self.0.apply(txt)
fn pyapply(&self, txt: &str) -> String {
self.apply(txt)
}
#[classmethod]
fn from_diff(_cls: &PyType, before: &str, after: &str) -> PyTextChange {
PyTextChange(CodempTextChange::from_diff(before, after))
fn pyfrom_diff(_cls: &PyType, before: &str, after: &str) -> CodempTextChange {
CodempTextChange::from_diff(before, after)
}
#[classmethod]
fn index_to_rowcol(_cls: &PyType, txt: &str, index: usize) -> (i32, i32) {
fn pyindex_to_rowcol(_cls: &PyType, txt: &str, index: usize) -> (i32, i32) {
CodempTextChange::index_to_rowcol(txt, index).into()
}
}
@ -515,14 +433,14 @@ fn codemp(_py: Python, m: &PyModule) -> PyResult<()> {
m.add_function(wrap_pyfunction!(codemp_init, m)?)?;
m.add_function(wrap_pyfunction!(init_logger, m)?)?;
m.add_class::<PyClient>()?;
m.add_class::<PyWorkspace>()?;
m.add_class::<PyCursorController>()?;
m.add_class::<PyBufferController>()?;
m.add_class::<PyLogger>()?;
m.add_class::<CodempWorkspace>()?;
m.add_class::<CodempCursorController>()?;
m.add_class::<CodempBufferController>()?;
m.add_class::<PyId>()?;
m.add_class::<PyCursorEvent>()?;
m.add_class::<PyTextChange>()?;
// m.add_class::<PyId>()?;
m.add_class::<CodempCursor>()?;
m.add_class::<CodempTextChange>()?;
Ok(())
}

View file

@ -1,20 +1,34 @@
use crate::{
api::controller::ControllerWorker,
buffer::{self, worker::BufferWorker},
client::Services,
cursor,
};
use codemp_proto::{
auth::Token,
common::{Empty, Identity},
files::BufferNode,
workspace::{
workspace_event::{
Event as WorkspaceEventInner, FileCreate, FileDelete, FileRename, UserJoin, UserLeave,
},
WorkspaceEvent,
},
};
use dashmap::{DashMap, DashSet};
use std::{collections::BTreeSet, sync::Arc};
use tokio::sync::mpsc;
use dashmap::{DashMap, DashSet};
use tonic::Streaming;
use uuid::Uuid;
use crate::{
api::controller::ControllerWorker, buffer::{self, worker::BufferWorker}, client::Services, cursor,
};
use codemp_proto::{auth::Token, common::{Identity, Empty}, files::BufferNode, workspace::{WorkspaceEvent, workspace_event::{Event as WorkspaceEventInner, FileCreate, FileDelete, FileRename, UserJoin, UserLeave}}};
//TODO may contain more info in the future
#[derive(Debug, Clone)]
pub struct UserInfo {
pub uuid: Uuid
pub uuid: Uuid,
}
#[derive(Debug, Clone)]
#[cfg_attr(feature = "python", pyo3::pyclass)]
pub struct Workspace(Arc<WorkspaceInner>);
#[derive(Debug)]
@ -26,7 +40,7 @@ struct WorkspaceInner {
buffers: Arc<DashMap<String, buffer::Controller>>,
pub(crate) filetree: Arc<DashSet<String>>,
pub(crate) users: Arc<DashMap<Uuid, UserInfo>>,
services: Arc<Services>
services: Arc<Services>,
}
impl Workspace {
@ -36,7 +50,7 @@ impl Workspace {
user_id: Uuid,
token: Arc<tokio::sync::watch::Sender<Token>>,
cursor: cursor::Controller,
services: Arc<Services>
services: Arc<Services>,
) -> Self {
Self(Arc::new(WorkspaceInner {
id,
@ -46,7 +60,7 @@ impl Workspace {
buffers: Arc::new(DashMap::default()),
filetree: Arc::new(DashSet::default()),
users: Arc::new(DashMap::default()),
services
services,
}))
}
@ -59,13 +73,26 @@ impl Workspace {
match stream.message().await {
Err(e) => break tracing::error!("workspace '{}' stream closed: {}", name, e),
Ok(None) => break tracing::info!("leaving workspace {}", name),
Ok(Some(WorkspaceEvent { event: None })) => tracing::warn!("workspace {} received empty event", name),
Ok(Some(WorkspaceEvent { event: None })) => {
tracing::warn!("workspace {} received empty event", name)
}
Ok(Some(WorkspaceEvent { event: Some(ev) })) => match ev {
WorkspaceEventInner::Join(UserJoin { user }) => { users.insert(user.clone().into(), UserInfo { uuid: user.into() }); },
WorkspaceEventInner::Leave(UserLeave { user }) => { users.remove(&user.into()); },
WorkspaceEventInner::Create(FileCreate { path }) => { filetree.insert(path); },
WorkspaceEventInner::Rename(FileRename { before, after }) => { filetree.remove(&before); filetree.insert(after); },
WorkspaceEventInner::Delete(FileDelete { path }) => { filetree.remove(&path); },
WorkspaceEventInner::Join(UserJoin { user }) => {
users.insert(user.clone().into(), UserInfo { uuid: user.into() });
}
WorkspaceEventInner::Leave(UserLeave { user }) => {
users.remove(&user.into());
}
WorkspaceEventInner::Create(FileCreate { path }) => {
filetree.insert(path);
}
WorkspaceEventInner::Rename(FileRename { before, after }) => {
filetree.remove(&before);
filetree.insert(after);
}
WorkspaceEventInner::Delete(FileDelete { path }) => {
filetree.remove(&path);
}
},
}
}
@ -75,9 +102,11 @@ impl Workspace {
/// create a new buffer in current workspace
pub async fn create(&self, path: &str) -> crate::Result<()> {
let mut workspace_client = self.0.services.workspace.clone();
workspace_client.create_buffer(
tonic::Request::new(BufferNode { path: path.to_string() })
).await?;
workspace_client
.create_buffer(tonic::Request::new(BufferNode {
path: path.to_string(),
}))
.await?;
// add to filetree
self.0.filetree.insert(path.to_string());
@ -94,14 +123,27 @@ impl Workspace {
/// [crate::api::Controller::recv] to exchange [crate::api::TextChange]
pub async fn attach(&self, path: &str) -> crate::Result<buffer::Controller> {
let mut worskspace_client = self.0.services.workspace.clone();
let request = tonic::Request::new(BufferNode { path: path.to_string() });
let request = tonic::Request::new(BufferNode {
path: path.to_string(),
});
let credentials = worskspace_client.access_buffer(request).await?.into_inner();
self.0.token.send(credentials.token)?;
let (tx, rx) = mpsc::channel(256);
let mut req = tonic::Request::new(tokio_stream::wrappers::ReceiverStream::new(rx));
req.metadata_mut().insert("path", tonic::metadata::MetadataValue::try_from(credentials.id.id).expect("could not represent path as byte sequence"));
let stream = self.0.services.buffer.clone().attach(req).await?.into_inner();
req.metadata_mut().insert(
"path",
tonic::metadata::MetadataValue::try_from(credentials.id.id)
.expect("could not represent path as byte sequence"),
);
let stream = self
.0
.services
.buffer
.clone()
.attach(req)
.await?
.into_inner();
let worker = BufferWorker::new(self.0.user_id, path);
let controller = worker.subscribe();
@ -119,9 +161,11 @@ impl Workspace {
/// fetch a list of all buffers in a workspace
pub async fn fetch_buffers(&self) -> crate::Result<()> {
let mut workspace_client = self.0.services.workspace.clone();
let buffers = workspace_client.list_buffers(
tonic::Request::new(Empty {})
).await?.into_inner().buffers;
let buffers = workspace_client
.list_buffers(tonic::Request::new(Empty {}))
.await?
.into_inner()
.buffers;
self.0.filetree.clear();
for b in buffers {
@ -134,9 +178,15 @@ impl Workspace {
/// fetch a list of all users in a workspace
pub async fn fetch_users(&self) -> crate::Result<()> {
let mut workspace_client = self.0.services.workspace.clone();
let users = BTreeSet::from_iter(workspace_client.list_users(
tonic::Request::new(Empty {})
).await?.into_inner().users.into_iter().map(Uuid::from));
let users = BTreeSet::from_iter(
workspace_client
.list_users(tonic::Request::new(Empty {}))
.await?
.into_inner()
.users
.into_iter()
.map(Uuid::from),
);
self.0.users.clear();
for u in users {
@ -151,9 +201,13 @@ impl Workspace {
/// TODO: discuss implementation details
pub async fn list_buffer_users(&self, path: &str) -> crate::Result<Vec<Identity>> {
let mut workspace_client = self.0.services.workspace.clone();
let buffer_users = workspace_client.list_buffer_users(
tonic::Request::new(BufferNode { path: path.to_string() })
).await?.into_inner().users;
let buffer_users = workspace_client
.list_buffer_users(tonic::Request::new(BufferNode {
path: path.to_string(),
}))
.await?
.into_inner()
.users;
Ok(buffer_users)
}
@ -161,9 +215,11 @@ impl Workspace {
/// delete a buffer
pub async fn delete(&self, path: &str) -> crate::Result<()> {
let mut workspace_client = self.0.services.workspace.clone();
workspace_client.delete_buffer(
tonic::Request::new(BufferNode { path: path.to_string() })
).await?;
workspace_client
.delete_buffer(tonic::Request::new(BufferNode {
path: path.to_string(),
}))
.await?;
self.0.filetree.remove(path);
@ -171,10 +227,14 @@ impl Workspace {
}
/// get the id of the workspace
pub fn id(&self) -> String { self.0.id.clone() }
pub fn id(&self) -> String {
self.0.id.clone()
}
/// return a reference to current cursor controller, if currently in a workspace
pub fn cursor(&self) -> cursor::Controller { self.0.cursor.clone() }
pub fn cursor(&self) -> cursor::Controller {
self.0.cursor.clone()
}
/// get a new reference to a buffer controller, if any is active to given path
pub fn buffer_by_name(&self, path: &str) -> Option<buffer::Controller> {