feat: Bumping to pyo3 0.22, removing pyo3-asyncio, moving code around to allow for

smoother pyo3 aggregation.
This commit is contained in:
cschen 2024-08-16 12:58:43 +02:00
parent e732e6a938
commit 610576f8b7
10 changed files with 227 additions and 260 deletions

View file

@ -40,8 +40,7 @@ napi = { version = "2.16", features = ["full"], optional = true }
napi-derive = { version="2.16", optional = true} napi-derive = { version="2.16", optional = true}
# glue (python) # glue (python)
pyo3 = { version = "0.20", features = ["extension-module"], optional = true} pyo3 = { version = "0.22", features = ["extension-module", "experimental-async"], optional = true}
pyo3-asyncio = { version = "0.20", features = ["tokio-runtime"], optional = true }
[build-dependencies] [build-dependencies]
# glue (js) # glue (js)
@ -54,4 +53,4 @@ default = []
lua = ["mlua", "lazy_static", "tracing-subscriber"] lua = ["mlua", "lazy_static", "tracing-subscriber"]
java = ["lazy_static", "jni", "tracing-subscriber"] java = ["lazy_static", "jni", "tracing-subscriber"]
js = ["napi-build", "tracing-subscriber", "napi", "napi-derive"] js = ["napi-build", "tracing-subscriber", "napi", "napi-derive"]
python = ["pyo3", "pyo3-asyncio", "tracing-subscriber", "pyo3-build-config"] python = ["pyo3", "tracing-subscriber", "pyo3-build-config"]

View file

@ -20,8 +20,7 @@
/// ///
#[derive(Clone, Debug, Default)] #[derive(Clone, Debug, Default)]
#[cfg_attr(feature = "js", napi_derive::napi(object))] #[cfg_attr(feature = "js", napi_derive::napi(object))]
#[cfg_attr(feature = "python", pyo3::pyclass)] #[cfg_attr(feature = "python", pyo3::pyclass(get_all))]
#[cfg_attr(feature = "python", pyo3(get_all))]
pub struct TextChange { pub struct TextChange {
/// range start of text change, as char indexes in buffer previous state /// range start of text change, as char indexes in buffer previous state
pub start: u32, pub start: u32,
@ -37,7 +36,10 @@ impl TextChange {
pub fn span(&self) -> std::ops::Range<usize> { pub fn span(&self) -> std::ops::Range<usize> {
self.start as usize..self.end as usize self.start as usize..self.end as usize
} }
}
#[cfg_attr(feature = "python", pyo3::pymethods)]
impl TextChange {
/// returns true if this TextChange deletes existing text /// returns true if this TextChange deletes existing text
pub fn is_delete(&self) -> bool { pub fn is_delete(&self) -> bool {
self.start < self.end self.start < self.end
@ -52,12 +54,12 @@ impl TextChange {
pub fn is_empty(&self) -> bool { pub fn is_empty(&self) -> bool {
!self.is_delete() && !self.is_insert() !self.is_delete() && !self.is_insert()
} }
/// applies this text change to given text, returning a new string /// applies this text change to given text, returning a new string
pub fn apply(&self, txt: &str) -> String { pub fn apply(&self, txt: &str) -> String {
let pre_index = std::cmp::min(self.span().start, txt.len()); let pre_index = std::cmp::min(self.start as usize, txt.len());
let pre = txt.get(..pre_index).unwrap_or("").to_string(); let pre = txt.get(..pre_index).unwrap_or("").to_string();
let post = txt.get(self.span().end..).unwrap_or("").to_string(); let post = txt.get(self.end as usize..).unwrap_or("").to_string();
format!("{}{}{}", pre, self.content, post) format!("{}{}{}", pre, self.content, post)
} }
} }
@ -70,7 +72,7 @@ mod tests {
start: 5, start: 5,
end: 5, end: 5,
content: " cruel".to_string(), content: " cruel".to_string(),
hash: None hash: None,
}; };
let result = change.apply("hello world!"); let result = change.apply("hello world!");
assert_eq!(result, "hello cruel world!"); assert_eq!(result, "hello cruel world!");
@ -82,7 +84,7 @@ mod tests {
start: 5, start: 5,
end: 11, end: 11,
content: "".to_string(), content: "".to_string(),
hash: None hash: None,
}; };
let result = change.apply("hello cruel world!"); let result = change.apply("hello cruel world!");
assert_eq!(result, "hello world!"); assert_eq!(result, "hello world!");
@ -94,7 +96,7 @@ mod tests {
start: 5, start: 5,
end: 11, end: 11,
content: " not very pleasant".to_string(), content: " not very pleasant".to_string(),
hash: None hash: None,
}; };
let result = change.apply("hello cruel world!"); let result = change.apply("hello cruel world!");
assert_eq!(result, "hello not very pleasant world!"); assert_eq!(result, "hello not very pleasant world!");
@ -106,7 +108,7 @@ mod tests {
start: 100, start: 100,
end: 110, end: 110,
content: "a very long string \n which totally matters".to_string(), content: "a very long string \n which totally matters".to_string(),
hash: None hash: None,
}; };
let result = change.apply("a short text"); let result = change.apply("a short text");
assert_eq!( assert_eq!(
@ -121,7 +123,7 @@ mod tests {
start: 42, start: 42,
end: 42, end: 42,
content: "".to_string(), content: "".to_string(),
hash: None hash: None,
}; };
let result = change.apply("some important text"); let result = change.apply("some important text");
assert_eq!(result, "some important text"); assert_eq!(result, "some important text");

View file

@ -1,7 +1,8 @@
use codemp_proto::workspace::workspace_event::Event as WorkspaceEventInner; use codemp_proto::workspace::workspace_event::Event as WorkspaceEventInner;
#[cfg_attr(feature = "python", pyo3::pyclass)]
pub enum Event { pub enum Event {
FileTreeUpdated, FileTreeUpdated(String),
UserJoin(String), UserJoin(String),
UserLeave(String), UserLeave(String),
} }
@ -11,9 +12,9 @@ impl From<&WorkspaceEventInner> for Event {
match event { match event {
WorkspaceEventInner::Join(e) => Self::UserJoin(e.user.id.clone()), WorkspaceEventInner::Join(e) => Self::UserJoin(e.user.id.clone()),
WorkspaceEventInner::Leave(e) => Self::UserLeave(e.user.id.clone()), WorkspaceEventInner::Leave(e) => Self::UserLeave(e.user.id.clone()),
WorkspaceEventInner::Create(_) WorkspaceEventInner::Create(e) => Self::FileTreeUpdated(e.path.clone()),
| WorkspaceEventInner::Rename(_) WorkspaceEventInner::Delete(e) => Self::FileTreeUpdated(e.path.clone()),
| WorkspaceEventInner::Delete(_) => Self::FileTreeUpdated, WorkspaceEventInner::Rename(e) => Self::FileTreeUpdated(e.after.clone()),
} }
} }
} }

View file

@ -5,7 +5,7 @@
use std::sync::Arc; use std::sync::Arc;
use diamond_types::LocalVersion; use diamond_types::LocalVersion;
use tokio::sync::{oneshot, mpsc, watch}; use tokio::sync::{mpsc, oneshot, watch};
use tonic::async_trait; use tonic::async_trait;
use crate::api::Controller; use crate::api::Controller;
@ -37,7 +37,9 @@ impl BufferController {
let (tx, rx) = oneshot::channel(); let (tx, rx) = oneshot::channel();
self.0.content_request.send(tx).await?; self.0.content_request.send(tx).await?;
let content = rx.await?; let content = rx.await?;
self.0.last_update.set(self.0.latest_version.borrow().clone()); self.0
.last_update
.set(self.0.latest_version.borrow().clone());
Ok(content) Ok(content)
} }
} }
@ -51,7 +53,8 @@ pub(crate) struct BufferControllerInner {
pub(crate) poller: mpsc::UnboundedSender<oneshot::Sender<()>>, pub(crate) poller: mpsc::UnboundedSender<oneshot::Sender<()>>,
pub(crate) stopper: mpsc::UnboundedSender<()>, // just exist pub(crate) stopper: mpsc::UnboundedSender<()>, // just exist
pub(crate) content_request: mpsc::Sender<oneshot::Sender<String>>, pub(crate) content_request: mpsc::Sender<oneshot::Sender<String>>,
pub(crate) delta_request: mpsc::Sender<(LocalVersion, oneshot::Sender<(LocalVersion, TextChange)>)>, pub(crate) delta_request:
mpsc::Sender<(LocalVersion, oneshot::Sender<(LocalVersion, TextChange)>)>,
} }
#[async_trait] #[async_trait]

View file

@ -101,7 +101,7 @@ impl ControllerWorker<TextChange> for BufferWorker {
let last_ver = oplog.local_version(); let last_ver = oplog.local_version();
if change.is_delete() { if change.is_delete() {
branch.delete_without_content(&mut oplog, 1, change.span()); branch.delete_without_content(&mut oplog, 1, change.start as usize..change.end as usize);
} }
if change.is_insert() { if change.is_insert() {

View file

@ -10,10 +10,8 @@ use pyo3::prelude::*;
#[pymethods] #[pymethods]
impl Client { impl Client {
#[new] #[new]
fn pyconnect(host: String, username: String, password: String) -> PyResult<Self> { async fn pyconnect(host: String, username: String, password: String) -> PyResult<Self> {
let cli = Ok(Client::new(host, username, password));
pyo3_asyncio::tokio::get_runtime().block_on(Client::new(host, username, password));
Ok(cli?)
} }
#[pyo3(name = "join_workspace")] #[pyo3(name = "join_workspace")]

View file

@ -4,62 +4,80 @@ use crate::api::TextChange;
use crate::buffer::Controller as BufferController; use crate::buffer::Controller as BufferController;
use crate::cursor::Controller as CursorController; use crate::cursor::Controller as CursorController;
use pyo3::prelude::*; use pyo3::prelude::*;
use pyo3_asyncio::tokio::future_into_py;
// use super::CodempController;
// need to do manually since Controller is a trait implementation
#[pymethods] #[pymethods]
impl CursorController { impl CursorController {
#[pyo3(name = "send")] #[pyo3(name = "send")]
pub fn pysend<'p>( async fn pysend(&self, path: String, start: (i32, i32), end: (i32, i32)) -> crate::Result<()> {
&self,
py: Python<'p>,
path: String,
start: (i32, i32),
end: (i32, i32),
) -> PyResult<&'p PyAny> {
let rc = self.clone();
let pos = Cursor { let pos = Cursor {
start, start,
end, end,
buffer: path, buffer: path,
user: None, user: None,
}; };
let rc = self.clone(); self.send(pos).await
future_into_py(py, async move { Ok(rc.send(pos).await?) })
} }
#[pyo3(name = "try_recv")] #[pyo3(name = "try_recv")]
pub fn pytry_recv<'p>(&self, py: Python<'p>) -> PyResult<&'p PyAny> { async fn pytry_recv(&self) -> crate::Result<Option<Cursor>> {
//PyResult<Option<Py<Cursor>>> self.try_recv().await
let rc = self.clone();
future_into_py(py, async move { Ok(rc.try_recv().await?) })
} }
#[pyo3(name = "recv")] #[pyo3(name = "recv")]
pub fn pyrecv<'p>(&'p self, py: Python<'p>) -> PyResult<&'p PyAny> { async fn pyrecv(&self) -> crate::Result<Cursor> {
let rc = self.clone(); self.recv().await
future_into_py(py, async move {
let cur_event: Cursor = rc.recv().await?;
Python::with_gil(|py| Py::new(py, cur_event))
})
} }
#[pyo3(name = "poll")] #[pyo3(name = "poll")]
pub fn pypoll<'p>(&'p self, py: Python<'p>) -> PyResult<&'p PyAny> { async fn pypoll(&self) -> crate::Result<()> {
let rc = self.clone(); self.poll().await
future_into_py(py, async move { Ok(rc.poll().await?) })
} }
#[pyo3(name = "stop")] #[pyo3(name = "stop")]
pub fn pystop(&self) -> bool { fn pystop(&self) -> bool {
self.stop() self.stop()
} }
} }
// need to do manually since Controller is a trait implementation
#[pymethods]
impl BufferController {
#[pyo3(name = "content")]
async fn pycontent(&self) -> crate::Result<String> {
self.content().await
}
#[pyo3(name = "send")]
async fn pysend(&self, start: u32, end: u32, txt: String) -> crate::Result<()> {
let op = TextChange {
start,
end,
content: txt,
hash: None,
};
self.send(op).await
}
#[pyo3(name = "try_recv")]
async fn pytry_recv(&self) -> crate::Result<Option<TextChange>> {
self.try_recv().await
}
#[pyo3(name = "recv")]
async fn pyrecv(&self) -> crate::Result<TextChange> {
self.recv().await
}
#[pyo3(name = "poll")]
async fn pypoll(&self) -> crate::Result<()> {
self.poll().await
}
}
// We have to write this manually since
// cursor.user has type Option which cannot be translated
// automatically
#[pymethods] #[pymethods]
impl Cursor { impl Cursor {
#[getter(start)] #[getter(start)]
@ -85,78 +103,3 @@ impl Cursor {
} }
} }
} }
#[pymethods]
impl BufferController {
#[pyo3(name = "content")]
fn pycontent<'p>(&self, py: Python<'p>) -> PyResult<&'p PyAny> {
let rc = self.clone();
future_into_py(py, async move { Ok(rc.content().await?) })
}
#[pyo3(name = "send")]
fn pysend<'p>(&self, py: Python<'p>, start: u32, end: u32, txt: String) -> PyResult<&'p PyAny> {
let op = TextChange {
start,
end,
content: txt,
hash: None,
};
let rc = self.clone();
future_into_py(py, async move { Ok(rc.send(op).await?) })
}
#[pyo3(name = "try_recv")]
fn pytry_recv<'p>(&self, py: Python<'p>) -> PyResult<&'p PyAny> {
// match self.try_recv()? {
// Some(txt_change) => {
// let evt = txt_change;
// Ok(evt.into_py(py))
// }
// None => Ok(py.None()),
// }
let rc = self.clone();
future_into_py(py, async move { Ok(rc.try_recv().await?) })
}
#[pyo3(name = "recv")]
fn pyrecv<'p>(&'p self, py: Python<'p>) -> PyResult<&'p PyAny> {
let rc = self.clone();
future_into_py(py, async move {
let txt_change: TextChange = rc.recv().await?;
Python::with_gil(|py| Py::new(py, txt_change))
})
}
#[pyo3(name = "poll")]
fn pypoll<'p>(&'p self, py: Python<'p>) -> PyResult<&'p PyAny> {
let rc = self.clone();
pyo3_asyncio::tokio::future_into_py(py, async move { Ok(rc.poll().await?) })
}
}
#[pymethods]
impl TextChange {
#[pyo3(name = "is_deletion")]
fn pyis_deletion(&self) -> bool {
self.is_delete()
}
#[pyo3(name = "is_addition")]
fn pyis_addition(&self) -> bool {
self.is_insert()
}
#[pyo3(name = "is_empty")]
fn pyis_empty(&self) -> bool {
self.is_empty()
}
#[pyo3(name = "apply")]
fn pyapply(&self, txt: &str) -> String {
self.apply(txt)
}
}

View file

@ -82,9 +82,8 @@ impl PyLogger {
} }
} }
fn listen<'p>(&'p self, py: Python<'p>) -> PyResult<&'p PyAny> { async fn listen(&self) -> Option<String> {
let rc = self.0.clone(); self.0.lock().await.recv().await
pyo3_asyncio::tokio::future_into_py(py, async move { Ok(rc.lock().await.recv().await) })
} }
} }

View file

@ -1,127 +1,126 @@
use crate::buffer::Controller as BufferController; // use crate::buffer::Controller as BufferController;
use crate::cursor::Controller as CursorController; // use crate::cursor::Controller as CursorController;
use crate::workspace::Workspace; // use crate::workspace::Workspace;
use pyo3::prelude::*; // use pyo3::prelude::*;
use pyo3::types::PyString; // use pyo3::types::PyString;
use pyo3_asyncio::generic::future_into_py;
#[pymethods] // #[pymethods]
impl Workspace { // impl Workspace {
// join a workspace // // join a workspace
#[pyo3(name = "create")] // #[pyo3(name = "create")]
fn pycreate<'p>(&'p self, py: Python<'p>, path: String) -> PyResult<&'p PyAny> { // fn pycreate<'p>(&'p self, py: Python<'p>, path: String) -> PyResult<&'p PyAny> {
let ws = self.clone(); // let ws = self.clone();
pyo3_asyncio::tokio::future_into_py(py, async move { // pyo3_asyncio::tokio::future_into_py(py, async move {
ws.create(path.as_str()).await?; // ws.create(path.as_str()).await?;
Ok(()) // Ok(())
}) // })
} // }
#[pyo3(name = "attach")] // #[pyo3(name = "attach")]
fn pyattach<'p>(&'p self, py: Python<'p>, path: String) -> PyResult<&PyAny> { // fn pyattach<'p>(&'p self, py: Python<'p>, path: String) -> PyResult<&PyAny> {
let ws = self.clone(); // let ws = self.clone();
pyo3_asyncio::tokio::future_into_py(py, async move { // pyo3_asyncio::tokio::future_into_py(py, async move {
let buffctl: BufferController = ws.attach(path.as_str()).await?; // let buffctl: BufferController = ws.attach(path.as_str()).await?;
Ok(buffctl) // Ok(buffctl)
// Python::with_gil(|py| Py::new(py, buffctl)) // // Python::with_gil(|py| Py::new(py, buffctl))
}) // })
} // }
#[pyo3(name = "detach")] // #[pyo3(name = "detach")]
fn pydetach(&self, path: String) -> bool { // fn pydetach(&self, path: String) -> bool {
match self.detach(path.as_str()) { // match self.detach(path.as_str()) {
crate::workspace::worker::DetachResult::NotAttached => false, // crate::workspace::worker::DetachResult::NotAttached => false,
crate::workspace::worker::DetachResult::Detaching => true, // crate::workspace::worker::DetachResult::Detaching => true,
crate::workspace::worker::DetachResult::AlreadyDetached => true, // crate::workspace::worker::DetachResult::AlreadyDetached => true,
} // }
} // }
// #[pyo3(name = "event")] // // #[pyo3(name = "event")]
// fn pyevent(&self, py: Python<'_>, path: String) -> PyResult<&PyAny> { // // fn pyevent(&self, py: Python<'_>, path: String) -> PyResult<&PyAny> {
// let rc = self.clone(); // // let rc = self.clone();
// future_into_py(py, async move { Ok(rc.event().await?) }) // // future_into_py(py, async move { Ok(rc.event().await?) })
// } // // }
#[pyo3(name = "fetch_buffers")] // #[pyo3(name = "fetch_buffers")]
fn pyfetch_buffers<'p>(&'p self, py: Python<'p>) -> PyResult<&PyAny> { // fn pyfetch_buffers<'p>(&'p self, py: Python<'p>) -> PyResult<&PyAny> {
let ws = self.clone(); // let ws = self.clone();
pyo3_asyncio::tokio::future_into_py(py, async move { // pyo3_asyncio::tokio::future_into_py(py, async move {
ws.fetch_buffers().await?; // ws.fetch_buffers().await?;
Ok(()) // Ok(())
}) // })
} // }
#[pyo3(name = "fetch_users")] // #[pyo3(name = "fetch_users")]
fn pyfetch_users<'p>(&'p self, py: Python<'p>) -> PyResult<&PyAny> { // fn pyfetch_users<'p>(&'p self, py: Python<'p>) -> PyResult<&PyAny> {
let ws = self.clone(); // let ws = self.clone();
pyo3_asyncio::tokio::future_into_py(py, async move { // pyo3_asyncio::tokio::future_into_py(py, async move {
ws.fetch_users().await?; // ws.fetch_users().await?;
Ok(()) // Ok(())
}) // })
} // }
#[pyo3(name = "list_buffer_users")] // #[pyo3(name = "list_buffer_users")]
fn pylist_buffer_users<'p>(&'p self, py: Python<'p>, path: String) -> PyResult<&PyAny> { // fn pylist_buffer_users<'p>(&'p self, py: Python<'p>, path: String) -> PyResult<&PyAny> {
let ws = self.clone(); // let ws = self.clone();
pyo3_asyncio::tokio::future_into_py(py, async move { // pyo3_asyncio::tokio::future_into_py(py, async move {
let usrlist: Vec<String> = ws // let usrlist: Vec<String> = ws
.list_buffer_users(path.as_str()) // .list_buffer_users(path.as_str())
.await? // .await?
.into_iter() // .into_iter()
.map(|e| e.id) // .map(|e| e.id)
.collect(); // .collect();
Ok(usrlist) // Ok(usrlist)
}) // })
} // }
#[pyo3(name = "delete")] // #[pyo3(name = "delete")]
fn pydelete<'p>(&'p self, py: Python<'p>, path: String) -> PyResult<&PyAny> { // fn pydelete<'p>(&'p self, py: Python<'p>, path: String) -> PyResult<&PyAny> {
let ws = self.clone(); // let ws = self.clone();
pyo3_asyncio::tokio::future_into_py(py, async move { // pyo3_asyncio::tokio::future_into_py(py, async move {
ws.delete(path.as_str()).await?; // ws.delete(path.as_str()).await?;
Ok(()) // Ok(())
}) // })
} // }
#[pyo3(name = "id")] // #[pyo3(name = "id")]
fn pyid(&self, py: Python<'_>) -> Py<PyString> { // fn pyid(&self, py: Python<'_>) -> Py<PyString> {
PyString::new(py, self.id().as_str()).into() // PyString::new(py, self.id().as_str()).into()
} // }
#[pyo3(name = "cursor")] // #[pyo3(name = "cursor")]
fn pycursor(&self, py: Python<'_>) -> PyResult<Py<CursorController>> { // fn pycursor(&self, py: Python<'_>) -> PyResult<Py<CursorController>> {
Py::new(py, self.cursor()) // Py::new(py, self.cursor())
} // }
#[pyo3(name = "buffer_by_name")] // #[pyo3(name = "buffer_by_name")]
fn pybuffer_by_name( // fn pybuffer_by_name(
&self, // &self,
py: Python<'_>, // py: Python<'_>,
path: String, // path: String,
) -> PyResult<Option<Py<BufferController>>> { // ) -> PyResult<Option<Py<BufferController>>> {
let Some(bufctl) = self.buffer_by_name(path.as_str()) else { // let Some(bufctl) = self.buffer_by_name(path.as_str()) else {
return Ok(None); // return Ok(None);
}; // };
Ok(Some(Py::new(py, bufctl)?)) // Ok(Some(Py::new(py, bufctl)?))
} // }
#[pyo3(name = "buffer_list")] // #[pyo3(name = "buffer_list")]
fn pybuffer_list(&self) -> Vec<String> { // fn pybuffer_list(&self) -> Vec<String> {
self.buffer_list() // self.buffer_list()
} // }
#[pyo3(name = "filetree")] // #[pyo3(name = "filetree")]
fn pyfiletree(&self) -> Vec<String> { // fn pyfiletree(&self) -> Vec<String> {
self.filetree() // self.filetree()
} // }
} // }
// #[pyclass] // #[pyclass]
// enum PyEvent { // enum PyEvent {

View file

@ -6,9 +6,8 @@ use crate::{
}; };
use codemp_proto::{ use codemp_proto::{
common::Empty,
auth::Token, auth::Token,
common::Identity, common::Empty,
files::BufferNode, files::BufferNode,
workspace::{ workspace::{
workspace_event::{ workspace_event::{
@ -54,14 +53,12 @@ impl Workspace {
token: Token, token: Token,
) -> crate::Result<Self> { ) -> crate::Result<Self> {
let services = Services::try_new(dest, token).await?; let services = Services::try_new(dest, token).await?;
let ws_stream = services.ws() let ws_stream = services.ws().attach(Empty {}).await?.into_inner();
.attach(Empty{})
.await?
.into_inner();
let (tx, rx) = mpsc::channel(256); let (tx, rx) = mpsc::channel(256);
let (ev_tx, ev_rx) = mpsc::unbounded_channel(); let (ev_tx, ev_rx) = mpsc::unbounded_channel();
let cur_stream = services.cur() let cur_stream = services
.cur()
.attach(tokio_stream::wrappers::ReceiverStream::new(rx)) .attach(tokio_stream::wrappers::ReceiverStream::new(rx))
.await? .await?
.into_inner(); .into_inner();
@ -92,7 +89,11 @@ impl Workspace {
Ok(ws) Ok(ws)
} }
pub(crate) fn run_actor(&self, mut stream: Streaming<WorkspaceEvent>, tx: mpsc::UnboundedSender<crate::api::Event>) { pub(crate) fn run_actor(
&self,
mut stream: Streaming<WorkspaceEvent>,
tx: mpsc::UnboundedSender<crate::api::Event>,
) {
// TODO for buffer and cursor controller we invoke the tokio::spawn outside, but here inside..? // TODO for buffer and cursor controller we invoke the tokio::spawn outside, but here inside..?
let inner = self.0.clone(); let inner = self.0.clone();
let name = self.id(); let name = self.id();
@ -109,7 +110,9 @@ impl Workspace {
match ev { match ev {
// user // user
WorkspaceEventInner::Join(UserJoin { user }) => { WorkspaceEventInner::Join(UserJoin { user }) => {
inner.users.insert(user.clone().into(), User { id: user.into() }); inner
.users
.insert(user.clone().into(), User { id: user.into() });
} }
WorkspaceEventInner::Leave(UserLeave { user }) => { WorkspaceEventInner::Leave(UserLeave { user }) => {
inner.users.remove(&user.into()); inner.users.remove(&user.into());
@ -132,12 +135,15 @@ impl Workspace {
if tx.send(update).is_err() { if tx.send(update).is_err() {
tracing::warn!("no active controller to receive workspace event"); tracing::warn!("no active controller to receive workspace event");
} }
}, }
} }
} }
}); });
} }
}
#[cfg_attr(feature = "python", pyo3::pymethods)] //LMAO it works
impl Workspace {
/// create a new buffer in current workspace /// create a new buffer in current workspace
pub async fn create(&self, path: &str) -> crate::Result<()> { pub async fn create(&self, path: &str) -> crate::Result<()> {
let mut workspace_client = self.0.services.ws(); let mut workspace_client = self.0.services.ws();
@ -175,10 +181,7 @@ impl Workspace {
tonic::metadata::MetadataValue::try_from(credentials.id.id) tonic::metadata::MetadataValue::try_from(credentials.id.id)
.expect("could not represent path as byte sequence"), .expect("could not represent path as byte sequence"),
); );
let stream = self.0.services.buf() let stream = self.0.services.buf().attach(req).await?.into_inner();
.attach(req)
.await?
.into_inner();
let worker = BufferWorker::new(self.0.user_id, path); let worker = BufferWorker::new(self.0.user_id, path);
let controller = worker.controller(); let controller = worker.controller();
@ -206,17 +209,24 @@ impl Workspace {
pub fn detach(&self, path: &str) -> DetachResult { pub fn detach(&self, path: &str) -> DetachResult {
match self.0.buffers.remove(path) { match self.0.buffers.remove(path) {
None => DetachResult::NotAttached, None => DetachResult::NotAttached,
Some((_name, controller)) => if controller.stop() { Some((_name, controller)) => {
DetachResult::Detaching if controller.stop() {
} else { DetachResult::Detaching
DetachResult::AlreadyDetached } else {
DetachResult::AlreadyDetached
}
} }
} }
} }
/// await next workspace [crate::api::Event] and return it /// await next workspace [crate::api::Event] and return it
pub async fn event(&self) -> crate::Result<crate::api::Event> { pub async fn event(&self) -> crate::Result<crate::api::Event> {
self.0.events.lock().await.recv().await self.0
.events
.lock()
.await
.recv()
.await
.ok_or(crate::Error::Channel { send: false }) .ok_or(crate::Error::Channel { send: false })
} }
@ -261,7 +271,7 @@ impl Workspace {
/// get a list of the users attached to a specific buffer /// get a list of the users attached to a specific buffer
/// ///
/// TODO: discuss implementation details /// TODO: discuss implementation details
pub async fn list_buffer_users(&self, path: &str) -> crate::Result<Vec<Identity>> { pub async fn list_buffer_users(&self, path: &str) -> crate::Result<Vec<String>> {
let mut workspace_client = self.0.services.ws(); let mut workspace_client = self.0.services.ws();
let buffer_users = workspace_client let buffer_users = workspace_client
.list_buffer_users(tonic::Request::new(BufferNode { .list_buffer_users(tonic::Request::new(BufferNode {
@ -269,7 +279,10 @@ impl Workspace {
})) }))
.await? .await?
.into_inner() .into_inner()
.users; .users
.into_iter()
.map(|u| u.id)
.collect();
Ok(buffer_users) Ok(buffer_users)
} }
@ -313,7 +326,11 @@ impl Workspace {
/// get a list of all the currently attached to buffers /// get a list of all the currently attached to buffers
// #[cfg_attr(feature = "js", napi)] // https://github.com/napi-rs/napi-rs/issues/1120 // #[cfg_attr(feature = "js", napi)] // https://github.com/napi-rs/napi-rs/issues/1120
pub fn buffer_list(&self) -> Vec<String> { pub fn buffer_list(&self) -> Vec<String> {
self.0.buffers.iter().map(|elem| elem.key().clone()).collect() self.0
.buffers
.iter()
.map(|elem| elem.key().clone())
.collect()
} }
/// get the currently cached "filetree" /// get the currently cached "filetree"
@ -327,7 +344,11 @@ impl Drop for WorkspaceInner {
fn drop(&mut self) { fn drop(&mut self) {
for entry in self.buffers.iter() { for entry in self.buffers.iter() {
if !entry.value().stop() { if !entry.value().stop() {
tracing::warn!("could not stop buffer worker {} for workspace {}", entry.value().name(), self.id); tracing::warn!(
"could not stop buffer worker {} for workspace {}",
entry.value().name(),
self.id
);
} }
} }
if !self.cursor.stop() { if !self.cursor.stop() {
@ -336,6 +357,8 @@ impl Drop for WorkspaceInner {
} }
} }
#[cfg_attr(feature = "python", pyo3::pyclass(eq, eq_int))]
#[cfg_attr(feature = "python", derive(PartialEq))]
pub enum DetachResult { pub enum DetachResult {
NotAttached, NotAttached,
Detaching, Detaching,