diff --git a/.gitignore b/.gitignore index 56c86d3..a22c944 100644 --- a/.gitignore +++ b/.gitignore @@ -2,9 +2,10 @@ Cargo.lock .cargo .vscode/ +*.sublime-* # js node_modules/ package-lock.json index.d.ts -index.node \ No newline at end of file +index.node diff --git a/Cargo.toml b/Cargo.toml index 2988964..c5f1c91 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,26 +27,32 @@ tokio-stream = { version = "0.1" } serde = { version = "1.0.193", features = ["derive"] } dashmap = { version = "5.5.3" } postcard = { version = "1.0.8" } + # glue (multiple) lazy_static = { version = "1.4.0", optional = true } tracing-subscriber = { version = "0.3.18", optional = true } + # glue (java) jni = { version = "0.21.1", features = ["invocation"], optional = true } jni-sys = { version = "0.3.0", optional = true } rifgen = { git = "https://github.com/Kofituo/rifgen.git", rev = "d27d9785b2febcf5527f1deb6a846be5d583f7d7", optional = true } log = { version = "0.4.21", optional = true } + # glue (lua) mlua = { version = "0.9.6", features = ["module", "luajit", "send"], optional = true } thiserror = { version = "1.0.57", optional = true } derive_more = { version = "0.99.17", optional = true } + # glue (js) rmpv = { version = "1", optional = true } napi = { version = "2", features = ["full"], optional = true } napi-derive = { version="2", optional = true} futures = { version = "0.3.28", optional = true } + # glue (python) pyo3 = { version = "0.20", features = ["extension-module"], optional = true} pyo3-asyncio = { version = "0.20", features = ["tokio-runtime"], optional = true } + [build-dependencies] # glue (java) flapigen = { version = "0.6.0", optional = true } diff --git a/src/api/change.rs b/src/api/change.rs index 9575614..19d792a 100644 --- a/src/api/change.rs +++ b/src/api/change.rs @@ -3,7 +3,11 @@ //! an editor-friendly representation of a text change in a buffer //! to easily interface with codemp from various editors -use crate::woot::{WootResult, woot::Woot, crdt::{TextEditor, CRDT}}; +use crate::woot::{ + crdt::{TextEditor, CRDT}, + woot::Woot, + WootResult, +}; /// an atomic and orderable operation /// @@ -17,7 +21,7 @@ pub struct Op(pub(crate) woot::crdt::Op); /// replaced to it, allowing to represent any combination of deletions, insertions or replacements /// /// bulk and widespread operations will result in a TextChange effectively sending the whole new -/// buffer, but small changes are efficient and easy to create or apply +/// buffer, but small changes are efficient and easy to create or apply /// /// ### examples /// to insert 'a' after 4th character we should send a @@ -27,6 +31,7 @@ pub struct Op(pub(crate) woot::crdt::Op); /// `TextChange { span: 3..4, content: "".into() }` /// #[derive(Clone, Debug, Default)] +#[cfg_attr(feature = "python", pyo3::pyclass)] pub struct TextChange { /// range of text change, as char indexes in buffer previous state pub span: std::ops::Range, @@ -49,7 +54,7 @@ impl TextChange { } else { end += len } - }, + } _ => { end = 0; from_beginning = false; @@ -68,7 +73,9 @@ impl TextChange { /// consume the [TextChange], transforming it into a Vec of [Op] pub fn transform(self, woot: &Woot) -> WootResult> { let mut out = Vec::new(); - if self.is_empty() { return Ok(out); } // no-op + if self.is_empty() { + return Ok(out); + } // no-op let view = woot.view(); let Some(span) = view.get(self.span.clone()) else { return Err(crate::woot::WootError::OutOfBounds); @@ -76,17 +83,21 @@ impl TextChange { let diff = similar::TextDiff::from_chars(span, &self.content); for (i, diff) in diff.iter_all_changes().enumerate() { match diff.tag() { - similar::ChangeTag::Equal => {}, + similar::ChangeTag::Equal => {} similar::ChangeTag::Delete => match woot.delete_one(self.span.start + i) { Err(e) => tracing::error!("could not create deletion: {}", e), Ok(op) => out.push(Op(op)), }, similar::ChangeTag::Insert => { match woot.insert(self.span.start + i, diff.value()) { - Ok(ops) => for op in ops { out.push(Op(op)) }, + Ok(ops) => { + for op in ops { + out.push(Op(op)) + } + } Err(e) => tracing::error!("could not create insertion: {}", e), } - }, + } } } Ok(out) @@ -131,7 +142,7 @@ mod tests { fn textchange_diff_works_for_deletions() { let change = super::TextChange::from_diff( "sphinx of black quartz, judge my vow", - "sphinx of quartz, judge my vow" + "sphinx of quartz, judge my vow", ); assert_eq!(change.span, 10..16); assert_eq!(change.content, ""); @@ -141,7 +152,7 @@ mod tests { fn textchange_diff_works_for_insertions() { let change = super::TextChange::from_diff( "sphinx of quartz, judge my vow", - "sphinx of black quartz, judge my vow" + "sphinx of black quartz, judge my vow", ); assert_eq!(change.span, 10..10); assert_eq!(change.content, "black "); @@ -151,7 +162,7 @@ mod tests { fn textchange_diff_works_for_changes() { let change = super::TextChange::from_diff( "sphinx of black quartz, judge my vow", - "sphinx who watches the desert, judge my vow" + "sphinx who watches the desert, judge my vow", ); assert_eq!(change.span, 7..22); assert_eq!(change.content, "who watches the desert"); @@ -159,30 +170,45 @@ mod tests { #[test] fn textchange_apply_works_for_insertions() { - let change = super::TextChange { span: 5..5, content: " cruel".to_string() }; + let change = super::TextChange { + span: 5..5, + content: " cruel".to_string(), + }; let result = change.apply("hello world!"); assert_eq!(result, "hello cruel world!"); } #[test] fn textchange_apply_works_for_deletions() { - let change = super::TextChange { span: 5..11, content: "".to_string() }; + let change = super::TextChange { + span: 5..11, + content: "".to_string(), + }; let result = change.apply("hello cruel world!"); assert_eq!(result, "hello world!"); } #[test] fn textchange_apply_works_for_replacements() { - let change = super::TextChange { span: 5..11, content: " not very pleasant".to_string() }; + let change = super::TextChange { + span: 5..11, + content: " not very pleasant".to_string(), + }; let result = change.apply("hello cruel world!"); assert_eq!(result, "hello not very pleasant world!"); } #[test] fn textchange_apply_never_panics() { - let change = super::TextChange { span: 100..110, content: "a very long string \n which totally matters".to_string() }; + let change = super::TextChange { + span: 100..110, + content: "a very long string \n which totally matters".to_string(), + }; let result = change.apply("a short text"); - assert_eq!(result, "a short texta very long string \n which totally matters"); + assert_eq!( + result, + "a short texta very long string \n which totally matters" + ); } #[test] @@ -190,10 +216,13 @@ mod tests { let change = super::TextChange::from_diff("same \n\n text", "same \n\n text"); assert!(change.is_empty()); } - + #[test] fn empty_textchange_doesnt_alter_buffer() { - let change = super::TextChange { span: 42..42, content: "".to_string() }; + let change = super::TextChange { + span: 42..42, + content: "".to_string(), + }; let result = change.apply("some important text"); assert_eq!(result, "some important text"); } diff --git a/src/api/cursor.rs b/src/api/cursor.rs index bc7963b..acffcd9 100644 --- a/src/api/cursor.rs +++ b/src/api/cursor.rs @@ -3,11 +3,13 @@ //! represents the position of an user's cursor, with //! information about their identity -use uuid::Uuid; use codemp_proto as proto; +// use pyo3::prelude::*; +use uuid::Uuid; /// user cursor position in a buffer #[derive(Clone, Debug, Default)] +#[cfg_attr(feature = "python", pyo3::pyclass)] pub struct Cursor { /// range of text change, as char indexes in buffer previous state pub start: (i32, i32), @@ -16,7 +18,6 @@ pub struct Cursor { pub user: Option, } - impl From for Cursor { fn from(value: proto::cursor::CursorPosition) -> Self { Self { @@ -32,8 +33,14 @@ impl From for proto::cursor::CursorPosition { fn from(value: Cursor) -> Self { Self { buffer: proto::files::BufferNode { path: value.buffer }, - start: proto::cursor::RowCol { row: value.start.0, col: value.start.1 }, - end: proto::cursor::RowCol { row: value.end.0, col: value.end.1 }, + start: proto::cursor::RowCol { + row: value.start.0, + col: value.start.1, + }, + end: proto::cursor::RowCol { + row: value.end.0, + col: value.end.1, + }, } } } @@ -52,12 +59,20 @@ impl From for Cursor { impl From for proto::cursor::CursorEvent { fn from(value: Cursor) -> Self { Self { - user: proto::common::Identity { id: value.user.unwrap_or_default().to_string() }, + user: proto::common::Identity { + id: value.user.unwrap_or_default().to_string(), + }, position: proto::cursor::CursorPosition { buffer: proto::files::BufferNode { path: value.buffer }, - start: proto::cursor::RowCol { row: value.start.0, col: value.start.1 }, - end: proto::cursor::RowCol { row: value.end.0, col: value.end.1 }, - } + start: proto::cursor::RowCol { + row: value.start.0, + col: value.start.1, + }, + end: proto::cursor::RowCol { + row: value.end.0, + col: value.end.1, + }, + }, } } } diff --git a/src/buffer/controller.rs b/src/buffer/controller.rs index 8dd9085..129f492 100644 --- a/src/buffer/controller.rs +++ b/src/buffer/controller.rs @@ -1,27 +1,27 @@ //! ### controller -//! +//! //! a controller implementation for buffer actions - use std::sync::Arc; use tokio::sync::oneshot; -use tokio::sync::{watch, mpsc}; +use tokio::sync::{mpsc, watch}; use tonic::async_trait; -use crate::errors::IgnorableError; use crate::api::Controller; +use crate::errors::IgnorableError; use crate::api::TextChange; /// the buffer controller implementation /// /// for each controller a worker exists, managing outgoing and inbound -/// queues, transforming outbound delayed ops and applying remote changes +/// queues, transforming outbound delayed ops and applying remote changes /// to the local buffer /// /// upon dropping this handle will stop the associated worker #[derive(Debug, Clone)] +#[cfg_attr(feature = "python", pyo3::pyclass)] pub struct BufferController(Arc); #[derive(Debug)] @@ -42,14 +42,14 @@ impl BufferController { poller: mpsc::UnboundedSender>, stop: mpsc::UnboundedSender<()>, ) -> Self { - Self(Arc::new( - BufferControllerInner { - name, - content, operations, poller, - seen: StatusCheck::default(), - _stop: Arc::new(StopOnDrop(stop)), - } - )) + Self(Arc::new(BufferControllerInner { + name, + content, + operations, + poller, + seen: StatusCheck::default(), + _stop: Arc::new(StopOnDrop(stop)), + })) } /// unique identifier of buffer @@ -69,7 +69,9 @@ struct StopOnDrop(mpsc::UnboundedSender<()>); impl Drop for StopOnDrop { fn drop(&mut self) { - self.0.send(()).unwrap_or_warn("could not send stop message to worker"); + self.0 + .send(()) + .unwrap_or_warn("could not send stop message to worker"); } } @@ -85,7 +87,8 @@ impl Controller for BufferController { } let (tx, rx) = oneshot::channel::<()>(); self.0.poller.send(tx)?; - rx.await.map_err(|_| crate::Error::Channel { send: false })?; + rx.await + .map_err(|_| crate::Error::Channel { send: false })?; Ok(()) } @@ -121,19 +124,22 @@ impl Controller for BufferController { } #[derive(Debug, Clone)] -struct StatusCheck { +struct StatusCheck { state: watch::Receiver, updater: Arc>, } -impl Default for StatusCheck { +impl Default for StatusCheck { fn default() -> Self { let (tx, rx) = watch::channel(T::default()); - StatusCheck { state: rx, updater: Arc::new(tx) } + StatusCheck { + state: rx, + updater: Arc::new(tx), + } } } -impl StatusCheck { +impl StatusCheck { fn update(&self, state: T) -> T { self.updater.send_replace(state) } diff --git a/src/cursor/controller.rs b/src/cursor/controller.rs index 651a758..320871a 100644 --- a/src/cursor/controller.rs +++ b/src/cursor/controller.rs @@ -1,13 +1,21 @@ //! ### controller -//! +//! //! a controller implementation for cursor actions - use std::sync::Arc; -use tokio::sync::{mpsc, broadcast::{self, error::{TryRecvError, RecvError}}, Mutex, watch}; +use tokio::sync::{ + broadcast::{ + self, + error::{RecvError, TryRecvError}, + }, + mpsc, watch, Mutex, +}; use tonic::async_trait; -use crate::{api::{Cursor, Controller}, errors::IgnorableError}; +use crate::{ + api::{Controller, Cursor}, + errors::IgnorableError, +}; use codemp_proto::cursor::{CursorEvent, CursorPosition}; /// the cursor controller implementation /// @@ -17,10 +25,11 @@ use codemp_proto::cursor::{CursorEvent, CursorPosition}; /// * a mutex over a stream of inbound cursor events /// * a channel to stop the associated worker /// -/// for each controller a worker exists, managing outgoing and inbound event queues +/// for each controller a worker exists , managing outgoing and inbound event queues /// /// upon dropping this handle will stop the associated worker #[derive(Debug, Clone)] +#[cfg_attr(feature = "python", pyo3::pyclass)] pub struct CursorController(Arc); #[derive(Debug)] @@ -33,7 +42,10 @@ struct CursorControllerInner { impl Drop for CursorController { fn drop(&mut self) { - self.0.stop.send(()).unwrap_or_warn("could not stop cursor actor") + self.0 + .stop + .send(()) + .unwrap_or_warn("could not stop cursor actor") } } @@ -44,9 +56,12 @@ impl CursorController { stream: Mutex>, stop: mpsc::UnboundedSender<()>, ) -> Self { - Self(Arc::new( - CursorControllerInner { op, last_op, stream, stop } - )) + Self(Arc::new(CursorControllerInner { + op, + last_op, + stream, + stop, + })) } } @@ -73,7 +88,7 @@ impl Controller for CursorController { Err(TryRecvError::Lagged(n)) => { tracing::warn!("cursor channel lagged, skipping {} events", n); Ok(stream.try_recv().map(|x| x.into()).ok()) - }, + } } } @@ -87,7 +102,11 @@ impl Controller for CursorController { Err(RecvError::Closed) => Err(crate::Error::Channel { send: false }), Err(RecvError::Lagged(n)) => { tracing::error!("cursor channel lagged behind, skipping {} events", n); - Ok(stream.recv().await.expect("could not receive after lagging").into()) + Ok(stream + .recv() + .await + .expect("could not receive after lagging") + .into()) } } } @@ -96,5 +115,4 @@ impl Controller for CursorController { async fn poll(&self) -> crate::Result<()> { Ok(self.0.last_op.lock().await.changed().await?) } - } diff --git a/src/ffi/python.rs b/src/ffi/python.rs index 7d09d46..a7563e4 100644 --- a/src/ffi/python.rs +++ b/src/ffi/python.rs @@ -4,12 +4,7 @@ use tokio::sync::{mpsc, Mutex, RwLock}; use tracing; use tracing_subscriber; -use crate::errors::Error as CodempError; use crate::prelude::*; -use codemp_proto::{ - common::Identity, cursor::CursorEvent as CodempCursorEvent, - cursor::CursorPosition as CodempCursorPosition, files::BufferNode, -}; use pyo3::{ exceptions::{PyBaseException, PyConnectionError, PyRuntimeError}, @@ -101,12 +96,14 @@ fn init_logger(py: Python<'_>, debug: Option) -> PyResult> { .with_line_number(false) .with_source_location(false) .compact(); - tracing_subscriber::fmt() + + let _ = tracing_subscriber::fmt() .with_ansi(false) .event_format(format) .with_max_level(level) .with_writer(std::sync::Mutex::new(LoggerProducer(tx))) - .init(); + .try_init(); + Ok(Py::new(py, PyLogger(Arc::new(Mutex::new(rx))))?) } @@ -171,7 +168,7 @@ impl PyClient { return Err(PyConnectionError::new_err("Connect to a server first.")); }; - let workspace: PyWorkspace = cli + let workspace: CodempWorkspace = cli .as_mut() .unwrap() .join_workspace(workspace.as_str()) @@ -193,10 +190,10 @@ impl PyClient { }; let Some(ws) = cli.as_ref().unwrap().get_workspace(id.as_str()) else { - return Ok(None) - }; + return Ok(None); + }; - Python::with_gil(|py| Ok(Some(Py::new(py, PyWorkspace(ws))?))) + Python::with_gil(|py| Ok(Some(Py::new(py, ws)?))) }) } @@ -217,20 +214,11 @@ impl PyClient { } } -#[pyclass] -struct PyWorkspace(Arc); - -impl From> for PyWorkspace { - fn from(value: Arc) -> Self { - PyWorkspace(value) - } -} - #[pymethods] -impl PyWorkspace { +impl CodempWorkspace { // join a workspace - fn create<'a>(&'a self, py: Python<'a>, path: String) -> PyResult<&'a PyAny> { - let ws = self.0.clone(); + fn pycreate<'a>(&'a self, py: Python<'a>, path: String) -> PyResult<&'a PyAny> { + let ws = self.clone(); pyo3_asyncio::tokio::future_into_py(py, async move { ws.create(path.as_str()).await?; @@ -238,17 +226,17 @@ impl PyWorkspace { }) } - fn attach<'a>(&'a self, py: Python<'a>, path: String) -> PyResult<&'a PyAny> { - let ws = self.0.clone(); + fn pyattach<'a>(&'a self, py: Python<'a>, path: String) -> PyResult<&'a PyAny> { + let ws = self.clone(); pyo3_asyncio::tokio::future_into_py(py, async move { - let buffctl: PyBufferController = ws.attach(path.as_str()).await?.into(); + let buffctl: CodempBufferController = ws.attach(path.as_str()).await?.into(); Python::with_gil(|py| Ok(Py::new(py, buffctl)?)) }) } - fn fetch_buffers<'a>(&'a self, py: Python<'a>) -> PyResult<&'a PyAny> { - let ws = self.0.clone(); + fn pyfetch_buffers<'a>(&'a self, py: Python<'a>) -> PyResult<&'a PyAny> { + let ws = self.clone(); pyo3_asyncio::tokio::future_into_py(py, async move { ws.fetch_buffers().await?; @@ -256,8 +244,8 @@ impl PyWorkspace { }) } - fn fetch_users<'a>(&'a self, py: Python<'a>) -> PyResult<&'a PyAny> { - let ws = self.0.clone(); + fn pyfetch_users<'a>(&'a self, py: Python<'a>) -> PyResult<&'a PyAny> { + let ws = self.clone(); pyo3_asyncio::tokio::future_into_py(py, async move { ws.fetch_users().await?; @@ -265,23 +253,23 @@ impl PyWorkspace { }) } - fn list_buffer_users<'a>(&'a self, py: Python<'a>, path: String) -> PyResult<&'a PyAny> { - let ws = self.0.clone(); + fn pylist_buffer_users<'a>(&'a self, py: Python<'a>, path: String) -> PyResult<&'a PyAny> { + let ws = self.clone(); pyo3_asyncio::tokio::future_into_py(py, async move { - let usrlist: Vec = ws + let usrlist: Vec = ws .list_buffer_users(path.as_str()) .await? .into_iter() - .map(PyId::from) + .map(|e| e.id) .collect(); Ok(usrlist) }) } - fn delete<'a>(&'a self, py: Python<'a>, path: String) -> PyResult<&'a PyAny> { - let ws = self.0.clone(); + fn pydelete<'a>(&'a self, py: Python<'a>, path: String) -> PyResult<&'a PyAny> { + let ws = self.clone(); pyo3_asyncio::tokio::future_into_py(py, async move { ws.delete(path.as_str()).await?; @@ -289,222 +277,152 @@ impl PyWorkspace { }) } - fn id(&self, py: Python<'_>) -> Py { - PyString::new(py, self.0.id().as_str()).into() + fn pyid(&self, py: Python<'_>) -> Py { + PyString::new(py, self.id().as_str()).into() } - fn cursor(&self, py: Python<'_>) -> PyResult> { - Ok(Py::new(py, PyCursorController::from(self.0.cursor()))?) + fn pycursor(&self, py: Python<'_>) -> PyResult> { + Ok(Py::new(py, CodempCursorController::from(self.cursor()))?) } - fn buffer_by_name( + fn pybuffer_by_name( &self, py: Python<'_>, path: String, - ) -> PyResult>> { - let Some(bufctl) = self.0.buffer_by_name(path.as_str()) else { - return Ok(None) - }; + ) -> PyResult>> { + let Some(bufctl) = self.buffer_by_name(path.as_str()) else { + return Ok(None); + }; - Ok(Some(Py::new(py, PyBufferController::from(bufctl))?)) + Ok(Some(Py::new(py, CodempBufferController::from(bufctl))?)) } - fn filetree(&self, py: Python<'_>) -> Py { - PyList::new(py, self.0.filetree()).into_py(py) + fn pyfiletree(&self, py: Python<'_>) -> Py { + PyList::new(py, self.filetree()).into_py(py) } } /* ########################################################################### */ -#[pyclass] -struct PyCursorController(Arc); - -impl From> for PyCursorController { - fn from(value: Arc) -> Self { - PyCursorController(value) - } -} - #[pymethods] -impl PyCursorController { - fn send<'a>(&'a self, path: String, start: (i32, i32), end: (i32, i32)) -> PyResult<()> { - let pos = CodempCursorPosition { - buffer: BufferNode { path }, +impl CodempCursorController { + fn pysend<'a>(&'a self, path: String, start: (i32, i32), end: (i32, i32)) -> PyResult<()> { + let pos = CodempCursor { start: start.into(), end: end.into(), + buffer: path, + user: None, }; - Ok(self.0.send(pos)?) + Ok(self.send(pos)?) } - fn try_recv(&self, py: Python<'_>) -> PyResult { - match self.0.try_recv()? { + fn pytry_recv(&self, py: Python<'_>) -> PyResult { + match self.try_recv()? { Some(cur_event) => { - let evt = PyCursorEvent::from(cur_event); + let evt = CodempCursor::from(cur_event); Ok(evt.into_py(py)) } None => Ok(py.None()), } } - fn recv<'a>(&'a self, py: Python<'a>) -> PyResult<&'a PyAny> { - let rc = self.0.clone(); + fn pyrecv<'a>(&'a self, py: Python<'a>) -> PyResult<&'a PyAny> { + let rc = self.clone(); pyo3_asyncio::tokio::future_into_py(py, async move { - let cur_event: PyCursorEvent = rc.recv().await?.into(); + let cur_event: CodempCursor = rc.recv().await?.into(); Python::with_gil(|py| Ok(Py::new(py, cur_event)?)) }) } - fn poll<'a>(&'a self, py: Python<'a>) -> PyResult<&'a PyAny> { - let rc = self.0.clone(); + fn pypoll<'a>(&'a self, py: Python<'a>) -> PyResult<&'a PyAny> { + let rc = self.clone(); pyo3_asyncio::tokio::future_into_py(py, async move { Ok(rc.poll().await?) }) } } -#[pyclass] -struct PyBufferController(Arc); - -impl From> for PyBufferController { - fn from(value: Arc) -> Self { - PyBufferController(value) - } -} - #[pymethods] -impl PyBufferController { - fn content<'a>(&self, py: Python<'a>) -> &'a PyString { - PyString::new(py, self.0.content().as_str()) +impl CodempBufferController { + fn pycontent<'a>(&self, py: Python<'a>) -> &'a PyString { + PyString::new(py, self.content().as_str()) } - fn send(&self, start: usize, end: usize, txt: String) -> PyResult<()> { + fn pysend(&self, start: usize, end: usize, txt: String) -> PyResult<()> { let op = CodempTextChange { span: start..end, content: txt.into(), }; - Ok(self.0.send(op)?) + Ok(self.send(op)?) } - fn try_recv(&self, py: Python<'_>) -> PyResult { - match self.0.try_recv()? { + fn pytry_recv(&self, py: Python<'_>) -> PyResult { + match self.try_recv()? { Some(txt_change) => { - let evt = PyTextChange::from(txt_change); + let evt = CodempTextChange::from(txt_change); Ok(evt.into_py(py)) } None => Ok(py.None()), } } - fn recv<'a>(&'a self, py: Python<'a>) -> PyResult<&'a PyAny> { - let rc = self.0.clone(); + fn pyrecv<'a>(&'a self, py: Python<'a>) -> PyResult<&'a PyAny> { + let rc = self.clone(); pyo3_asyncio::tokio::future_into_py(py, async move { - let txt_change: PyTextChange = rc.recv().await?.into(); + let txt_change: CodempTextChange = rc.recv().await?.into(); Python::with_gil(|py| Ok(Py::new(py, txt_change)?)) }) } - fn poll<'a>(&'a self, py: Python<'a>) -> PyResult<&'a PyAny> { - let rc = self.0.clone(); + fn pypoll<'a>(&'a self, py: Python<'a>) -> PyResult<&'a PyAny> { + let rc = self.clone(); pyo3_asyncio::tokio::future_into_py(py, async move { Ok(rc.poll().await?) }) } } -/* ---------- Type Wrappers ----------*/ -// All these objects are not meant to be handled rust side. -// Just to be sent to the python heap. - -#[pyclass] -struct PyId { - #[pyo3(get, set)] - id: String, -} - -impl From for PyId { - fn from(value: Identity) -> Self { - PyId { id: value.id } - } -} - -#[pyclass] -struct PyCursorEvent { - #[pyo3(get, set)] - user: String, - - #[pyo3(get, set)] - buffer: String, - - #[pyo3(get, set)] - start: (i32, i32), - - #[pyo3(get, set)] - end: (i32, i32), -} - -impl From for PyCursorEvent { - fn from(value: CodempCursorEvent) -> Self { - // todo, handle this optional better? - let pos = value.position; - PyCursorEvent { - user: value.user.id, - buffer: pos.buffer.path, - start: pos.start.into(), - end: pos.end.into(), - } - } -} - -#[pyclass] -struct PyTextChange(CodempTextChange); - -impl From for PyTextChange { - fn from(value: CodempTextChange) -> Self { - PyTextChange(value) - } -} - #[pymethods] -impl PyTextChange { +impl CodempTextChange { #[getter] - fn start_incl(&self) -> PyResult { - Ok(self.0.span.start) + fn pystart_incl(&self) -> PyResult { + Ok(self.span.start) } #[getter] - fn end_excl(&self) -> PyResult { - Ok(self.0.span.end) + fn pyend_excl(&self) -> PyResult { + Ok(self.span.end) } #[getter] - fn content(&self) -> PyResult { - Ok(self.0.content.clone()) + fn pycontent(&self) -> PyResult { + Ok(self.content.clone()) } - fn is_deletion(&self) -> bool { - self.0.is_deletion() + fn pyis_deletion(&self) -> bool { + self.is_deletion() } - fn is_addition(&self) -> bool { - self.0.is_addition() + fn pyis_addition(&self) -> bool { + self.is_addition() } - fn is_empty(&self) -> bool { - self.0.is_empty() + fn pyis_empty(&self) -> bool { + self.is_empty() } - fn apply(&self, txt: &str) -> String { - self.0.apply(txt) + fn pyapply(&self, txt: &str) -> String { + self.apply(txt) } #[classmethod] - fn from_diff(_cls: &PyType, before: &str, after: &str) -> PyTextChange { - PyTextChange(CodempTextChange::from_diff(before, after)) + fn pyfrom_diff(_cls: &PyType, before: &str, after: &str) -> CodempTextChange { + CodempTextChange::from_diff(before, after) } #[classmethod] - fn index_to_rowcol(_cls: &PyType, txt: &str, index: usize) -> (i32, i32) { + fn pyindex_to_rowcol(_cls: &PyType, txt: &str, index: usize) -> (i32, i32) { CodempTextChange::index_to_rowcol(txt, index).into() } } @@ -515,14 +433,14 @@ fn codemp(_py: Python, m: &PyModule) -> PyResult<()> { m.add_function(wrap_pyfunction!(codemp_init, m)?)?; m.add_function(wrap_pyfunction!(init_logger, m)?)?; m.add_class::()?; - m.add_class::()?; - m.add_class::()?; - m.add_class::()?; m.add_class::()?; + m.add_class::()?; + m.add_class::()?; + m.add_class::()?; - m.add_class::()?; - m.add_class::()?; - m.add_class::()?; + // m.add_class::()?; + m.add_class::()?; + m.add_class::()?; Ok(()) } diff --git a/src/workspace.rs b/src/workspace.rs index 3a8892c..e0bd5d4 100644 --- a/src/workspace.rs +++ b/src/workspace.rs @@ -1,20 +1,34 @@ +use crate::{ + api::controller::ControllerWorker, + buffer::{self, worker::BufferWorker}, + client::Services, + cursor, +}; +use codemp_proto::{ + auth::Token, + common::{Empty, Identity}, + files::BufferNode, + workspace::{ + workspace_event::{ + Event as WorkspaceEventInner, FileCreate, FileDelete, FileRename, UserJoin, UserLeave, + }, + WorkspaceEvent, + }, +}; +use dashmap::{DashMap, DashSet}; use std::{collections::BTreeSet, sync::Arc}; use tokio::sync::mpsc; -use dashmap::{DashMap, DashSet}; use tonic::Streaming; use uuid::Uuid; -use crate::{ - api::controller::ControllerWorker, buffer::{self, worker::BufferWorker}, client::Services, cursor, -}; -use codemp_proto::{auth::Token, common::{Identity, Empty}, files::BufferNode, workspace::{WorkspaceEvent, workspace_event::{Event as WorkspaceEventInner, FileCreate, FileDelete, FileRename, UserJoin, UserLeave}}}; //TODO may contain more info in the future #[derive(Debug, Clone)] pub struct UserInfo { - pub uuid: Uuid + pub uuid: Uuid, } #[derive(Debug, Clone)] +#[cfg_attr(feature = "python", pyo3::pyclass)] pub struct Workspace(Arc); #[derive(Debug)] @@ -26,7 +40,7 @@ struct WorkspaceInner { buffers: Arc>, pub(crate) filetree: Arc>, pub(crate) users: Arc>, - services: Arc + services: Arc, } impl Workspace { @@ -36,7 +50,7 @@ impl Workspace { user_id: Uuid, token: Arc>, cursor: cursor::Controller, - services: Arc + services: Arc, ) -> Self { Self(Arc::new(WorkspaceInner { id, @@ -46,7 +60,7 @@ impl Workspace { buffers: Arc::new(DashMap::default()), filetree: Arc::new(DashSet::default()), users: Arc::new(DashMap::default()), - services + services, })) } @@ -59,13 +73,26 @@ impl Workspace { match stream.message().await { Err(e) => break tracing::error!("workspace '{}' stream closed: {}", name, e), Ok(None) => break tracing::info!("leaving workspace {}", name), - Ok(Some(WorkspaceEvent { event: None })) => tracing::warn!("workspace {} received empty event", name), + Ok(Some(WorkspaceEvent { event: None })) => { + tracing::warn!("workspace {} received empty event", name) + } Ok(Some(WorkspaceEvent { event: Some(ev) })) => match ev { - WorkspaceEventInner::Join(UserJoin { user }) => { users.insert(user.clone().into(), UserInfo { uuid: user.into() }); }, - WorkspaceEventInner::Leave(UserLeave { user }) => { users.remove(&user.into()); }, - WorkspaceEventInner::Create(FileCreate { path }) => { filetree.insert(path); }, - WorkspaceEventInner::Rename(FileRename { before, after }) => { filetree.remove(&before); filetree.insert(after); }, - WorkspaceEventInner::Delete(FileDelete { path }) => { filetree.remove(&path); }, + WorkspaceEventInner::Join(UserJoin { user }) => { + users.insert(user.clone().into(), UserInfo { uuid: user.into() }); + } + WorkspaceEventInner::Leave(UserLeave { user }) => { + users.remove(&user.into()); + } + WorkspaceEventInner::Create(FileCreate { path }) => { + filetree.insert(path); + } + WorkspaceEventInner::Rename(FileRename { before, after }) => { + filetree.remove(&before); + filetree.insert(after); + } + WorkspaceEventInner::Delete(FileDelete { path }) => { + filetree.remove(&path); + } }, } } @@ -75,9 +102,11 @@ impl Workspace { /// create a new buffer in current workspace pub async fn create(&self, path: &str) -> crate::Result<()> { let mut workspace_client = self.0.services.workspace.clone(); - workspace_client.create_buffer( - tonic::Request::new(BufferNode { path: path.to_string() }) - ).await?; + workspace_client + .create_buffer(tonic::Request::new(BufferNode { + path: path.to_string(), + })) + .await?; // add to filetree self.0.filetree.insert(path.to_string()); @@ -94,14 +123,27 @@ impl Workspace { /// [crate::api::Controller::recv] to exchange [crate::api::TextChange] pub async fn attach(&self, path: &str) -> crate::Result { let mut worskspace_client = self.0.services.workspace.clone(); - let request = tonic::Request::new(BufferNode { path: path.to_string() }); + let request = tonic::Request::new(BufferNode { + path: path.to_string(), + }); let credentials = worskspace_client.access_buffer(request).await?.into_inner(); self.0.token.send(credentials.token)?; let (tx, rx) = mpsc::channel(256); let mut req = tonic::Request::new(tokio_stream::wrappers::ReceiverStream::new(rx)); - req.metadata_mut().insert("path", tonic::metadata::MetadataValue::try_from(credentials.id.id).expect("could not represent path as byte sequence")); - let stream = self.0.services.buffer.clone().attach(req).await?.into_inner(); + req.metadata_mut().insert( + "path", + tonic::metadata::MetadataValue::try_from(credentials.id.id) + .expect("could not represent path as byte sequence"), + ); + let stream = self + .0 + .services + .buffer + .clone() + .attach(req) + .await? + .into_inner(); let worker = BufferWorker::new(self.0.user_id, path); let controller = worker.subscribe(); @@ -110,7 +152,7 @@ impl Workspace { worker.work(tx, stream).await; tracing::debug!("controller worker stopped"); }); - + self.0.buffers.insert(path.to_string(), controller.clone()); Ok(controller) @@ -119,9 +161,11 @@ impl Workspace { /// fetch a list of all buffers in a workspace pub async fn fetch_buffers(&self) -> crate::Result<()> { let mut workspace_client = self.0.services.workspace.clone(); - let buffers = workspace_client.list_buffers( - tonic::Request::new(Empty {}) - ).await?.into_inner().buffers; + let buffers = workspace_client + .list_buffers(tonic::Request::new(Empty {})) + .await? + .into_inner() + .buffers; self.0.filetree.clear(); for b in buffers { @@ -134,47 +178,63 @@ impl Workspace { /// fetch a list of all users in a workspace pub async fn fetch_users(&self) -> crate::Result<()> { let mut workspace_client = self.0.services.workspace.clone(); - let users = BTreeSet::from_iter(workspace_client.list_users( - tonic::Request::new(Empty {}) - ).await?.into_inner().users.into_iter().map(Uuid::from)); + let users = BTreeSet::from_iter( + workspace_client + .list_users(tonic::Request::new(Empty {})) + .await? + .into_inner() + .users + .into_iter() + .map(Uuid::from), + ); self.0.users.clear(); for u in users { self.0.users.insert(u, UserInfo { uuid: u }); } - + Ok(()) } /// get a list of the users attached to a specific buffer - /// + /// /// TODO: discuss implementation details pub async fn list_buffer_users(&self, path: &str) -> crate::Result> { let mut workspace_client = self.0.services.workspace.clone(); - let buffer_users = workspace_client.list_buffer_users( - tonic::Request::new(BufferNode { path: path.to_string() }) - ).await?.into_inner().users; + let buffer_users = workspace_client + .list_buffer_users(tonic::Request::new(BufferNode { + path: path.to_string(), + })) + .await? + .into_inner() + .users; Ok(buffer_users) } - + /// delete a buffer pub async fn delete(&self, path: &str) -> crate::Result<()> { let mut workspace_client = self.0.services.workspace.clone(); - workspace_client.delete_buffer( - tonic::Request::new(BufferNode { path: path.to_string() }) - ).await?; - + workspace_client + .delete_buffer(tonic::Request::new(BufferNode { + path: path.to_string(), + })) + .await?; + self.0.filetree.remove(path); - + Ok(()) } /// get the id of the workspace - pub fn id(&self) -> String { self.0.id.clone() } + pub fn id(&self) -> String { + self.0.id.clone() + } /// return a reference to current cursor controller, if currently in a workspace - pub fn cursor(&self) -> cursor::Controller { self.0.cursor.clone() } + pub fn cursor(&self) -> cursor::Controller { + self.0.cursor.clone() + } /// get a new reference to a buffer controller, if any is active to given path pub fn buffer_by_name(&self, path: &str) -> Option {