diff --git a/Cargo.lock b/Cargo.lock index 8d50945..bb7e87a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -231,7 +231,7 @@ dependencies = [ [[package]] name = "codemp" -version = "0.7.2" +version = "0.7.3" dependencies = [ "async-trait", "codemp-proto", diff --git a/src/api/controller.rs b/src/api/controller.rs index 0f19d70..853f1e5 100644 --- a/src/api/controller.rs +++ b/src/api/controller.rs @@ -37,7 +37,7 @@ where /// See [`Controller`]'s documentation for details. /// /// Details about the receiving end are left to the implementor. -pub trait AsyncSender : Sized + Send + Sync { +pub trait AsyncSender: Sized + Send + Sync { /// Enqueue a new value to be sent to all other users without blocking fn send(&self, x: T) -> ControllerResult<()>; } diff --git a/src/buffer/controller.rs b/src/buffer/controller.rs index d52b472..efb5a7d 100644 --- a/src/buffer/controller.rs +++ b/src/buffer/controller.rs @@ -22,7 +22,8 @@ pub(crate) struct BufferAck { impl Acknowledgeable for BufferAck { fn send(&mut self) { - self.tx.send(self.version.clone()) + self.tx + .send(self.version.clone()) .unwrap_or_warn("no worker to receive sent ack"); } } diff --git a/src/buffer/worker.rs b/src/buffer/worker.rs index ba1a318..01826b8 100644 --- a/src/buffer/worker.rs +++ b/src/buffer/worker.rs @@ -75,7 +75,8 @@ impl BufferController { path: path.to_string(), latest_version: latest_version_tx, local_version: my_version_tx, - ack_tx, ack_rx, + ack_tx, + ack_rx, ops_in: opin_rx, poller: poller_rx, pollers: Vec::new(), @@ -185,7 +186,8 @@ impl BufferWorker { self.latest_version .send(self.oplog.local_version()) .unwrap_or_warn("failed to update latest version!"); - self.local_version.send(self.branch.local_version()) + self.local_version + .send(self.branch.local_version()) .unwrap_or_warn("failed to update local version!"); } } @@ -261,10 +263,14 @@ impl BufferWorker { version: step_ver, }, }; - self.local_version.send(new_local_v).unwrap_or_warn("could not update local version"); - tx.send(Some(delta)).unwrap_or_warn("could not update ops channel -- is controller dead?"); + self.local_version + .send(new_local_v) + .unwrap_or_warn("could not update local version"); + tx.send(Some(delta)) + .unwrap_or_warn("could not update ops channel -- is controller dead?"); } else { - tx.send(None).unwrap_or_warn("could not update ops channel -- is controller dead?"); + tx.send(None) + .unwrap_or_warn("could not update ops channel -- is controller dead?"); } } } diff --git a/src/cursor/controller.rs b/src/cursor/controller.rs index bac20cf..63a5483 100644 --- a/src/cursor/controller.rs +++ b/src/cursor/controller.rs @@ -42,22 +42,19 @@ impl AsyncSender for CursorController { if cursor.start > cursor.end { std::mem::swap(&mut cursor.start, &mut cursor.end); } - Ok(self - .0 - .op - .send(CursorPosition { - buffer: BufferNode { - path: cursor.buffer, - }, - start: RowCol { - row: cursor.start.0, - col: cursor.start.1, - }, - end: RowCol { - row: cursor.end.0, - col: cursor.end.1, - }, - })?) + Ok(self.0.op.send(CursorPosition { + buffer: BufferNode { + path: cursor.buffer, + }, + start: RowCol { + row: cursor.start.0, + col: cursor.start.1, + }, + end: RowCol { + row: cursor.end.0, + col: cursor.end.1, + }, + })?) } } diff --git a/src/ffi/java/buffer.rs b/src/ffi/java/buffer.rs index 639c187..1a5e2d8 100644 --- a/src/ffi/java/buffer.rs +++ b/src/ffi/java/buffer.rs @@ -3,9 +3,11 @@ use jni_toolbox::jni; use crate::{ api::{ + change::Delta, controller::{AsyncReceiver, AsyncSender}, TextChange, }, + buffer::controller::BufferAck, errors::ControllerError, }; @@ -27,13 +29,13 @@ fn get_content(controller: &mut crate::buffer::Controller) -> Result Result, ControllerError> { +) -> Result>, ControllerError> { super::tokio().block_on(controller.try_recv()) } /// Block until it receives a [TextChange]. #[jni(package = "mp.code", class = "BufferController")] -fn recv(controller: &mut crate::buffer::Controller) -> Result { +fn recv(controller: &mut crate::buffer::Controller) -> Result, ControllerError> { super::tokio().block_on(controller.recv()) } @@ -43,7 +45,7 @@ fn send( controller: &mut crate::buffer::Controller, change: TextChange, ) -> Result<(), ControllerError> { - super::tokio().block_on(controller.send(change)) + controller.send(change) } /// Register a callback for buffer changes. diff --git a/src/ffi/java/cursor.rs b/src/ffi/java/cursor.rs index 65cb0c4..60f2810 100644 --- a/src/ffi/java/cursor.rs +++ b/src/ffi/java/cursor.rs @@ -5,6 +5,10 @@ use crate::{ }, errors::ControllerError, }; +use crate::{ + api::{Controller, Cursor}, + errors::ControllerError, +}; use jni::{objects::JObject, JNIEnv}; use jni_toolbox::jni; @@ -25,7 +29,7 @@ fn recv(controller: &mut crate::cursor::Controller) -> Result Result<(), ControllerError> { - super::tokio().block_on(controller.send(cursor)) + controller.send(cursor) } /// Register a callback for cursor changes. diff --git a/src/ffi/js/buffer.rs b/src/ffi/js/buffer.rs index df89f06..945b25c 100644 --- a/src/ffi/js/buffer.rs +++ b/src/ffi/js/buffer.rs @@ -1,9 +1,14 @@ +use crate::api::change::Delta; use crate::api::controller::{AsyncReceiver, AsyncSender}; use crate::api::TextChange; use crate::buffer::controller::BufferController; use napi::threadsafe_function::{ ErrorStrategy::Fatal, ThreadSafeCallContext, ThreadsafeFunction, ThreadsafeFunctionCallMode, }; +use napi::threadsafe_function::{ + ErrorStrategy::Fatal, ThreadSafeCallContext, ThreadsafeFunction, ThreadsafeFunctionCallMode, +}; +use napi_derive::napi; use napi_derive::napi; #[napi] @@ -51,20 +56,20 @@ impl BufferController { /// Return next buffer event if present #[napi(js_name = "try_recv")] - pub async fn js_try_recv(&self) -> napi::Result> { + pub async fn js_try_recv(&self) -> napi::Result>> { Ok(self.try_recv().await?) } /// Wait for next buffer event and return it #[napi(js_name = "recv")] - pub async fn js_recv(&self) -> napi::Result { + pub async fn js_recv(&self) -> napi::Result> { Ok(self.recv().await?) } /// Send a buffer update to workspace #[napi(js_name = "send")] - pub async fn js_send(&self, op: TextChange) -> napi::Result<()> { - Ok(self.send(op).await?) + pub fn js_send(&self, op: TextChange) -> napi::Result<()> { + Ok(self.send(op)?) } /// Return buffer whole content diff --git a/src/ffi/js/cursor.rs b/src/ffi/js/cursor.rs index 2bb09e5..f7fbf4f 100644 --- a/src/ffi/js/cursor.rs +++ b/src/ffi/js/cursor.rs @@ -1,9 +1,14 @@ use crate::api::controller::{AsyncReceiver, AsyncSender}; use crate::cursor::controller::CursorController; use napi::threadsafe_function::ErrorStrategy::Fatal; +use napi::threadsafe_function::ErrorStrategy::Fatal; use napi::threadsafe_function::{ ThreadSafeCallContext, ThreadsafeFunction, ThreadsafeFunctionCallMode, }; +use napi::threadsafe_function::{ + ThreadSafeCallContext, ThreadsafeFunction, ThreadsafeFunctionCallMode, +}; +use napi_derive::napi; use napi_derive::napi; #[napi(object, js_name = "Cursor")] @@ -74,8 +79,8 @@ impl CursorController { /// Send a new cursor event to remote #[napi(js_name = "send")] - pub async fn js_send(&self, pos: JsCursor) -> napi::Result<()> { - Ok(self.send(crate::api::Cursor::from(pos)).await?) + pub fn js_send(&self, pos: JsCursor) -> napi::Result<()> { + Ok(self.send(crate::api::Cursor::from(pos))?) } /// Get next cursor event if available without blocking diff --git a/src/ffi/js/workspace.rs b/src/ffi/js/workspace.rs index e44fe81..294dd98 100644 --- a/src/ffi/js/workspace.rs +++ b/src/ffi/js/workspace.rs @@ -8,8 +8,6 @@ use napi::threadsafe_function::{ }; use napi_derive::napi; -use super::client::JsUser; - #[napi(object, js_name = "Event")] pub struct JsEvent { pub r#type: String, diff --git a/src/ffi/lua/buffer.rs b/src/ffi/lua/buffer.rs index 76c89cd..2a46dc6 100644 --- a/src/ffi/lua/buffer.rs +++ b/src/ffi/lua/buffer.rs @@ -11,10 +11,10 @@ impl LuaUserData for CodempBufferController { Ok(format!("{:?}", this)) }); - methods.add_method( - "send", - |_, this, (change,): (CodempTextChange,)| a_sync! { this => this.send(change).await? }, - ); + methods.add_method("send", |_, this, (change,): (CodempTextChange,)| { + this.send(change)?; + Ok(()) + }); methods.add_method( "try_recv", diff --git a/src/ffi/lua/cursor.rs b/src/ffi/lua/cursor.rs index 3deea7c..2e93fc8 100644 --- a/src/ffi/lua/cursor.rs +++ b/src/ffi/lua/cursor.rs @@ -12,10 +12,10 @@ impl LuaUserData for CodempCursorController { Ok(format!("{:?}", this)) }); - methods.add_method( - "send", - |_, this, (cursor,): (CodempCursor,)| a_sync! { this => this.send(cursor).await? }, - ); + methods.add_method("send", |_, this, (cursor,): (CodempCursor,)| { + this.send(cursor)?; + Ok(()) + }); methods.add_method( "try_recv", |_, this, ()| a_sync! { this => this.try_recv().await? }, diff --git a/src/ffi/python/controllers.rs b/src/ffi/python/controllers.rs index 6ee1e27..5b0381c 100644 --- a/src/ffi/python/controllers.rs +++ b/src/ffi/python/controllers.rs @@ -15,11 +15,11 @@ impl CursorController { #[pyo3(name = "send")] fn pysend( &self, - py: Python, + _py: Python, path: String, start: (i32, i32), end: (i32, i32), - ) -> PyResult { + ) -> PyResult<()> { let pos = Cursor { start, end, @@ -27,7 +27,8 @@ impl CursorController { user: None, }; let this = self.clone(); - a_sync_allow_threads!(py, this.send(pos).await) + this.send(pos)?; + Ok(()) } #[pyo3(name = "try_recv")] @@ -84,7 +85,7 @@ impl BufferController { } #[pyo3(name = "send")] - fn pysend(&self, py: Python, start: u32, end: u32, txt: String) -> PyResult { + fn pysend(&self, _py: Python, start: u32, end: u32, txt: String) -> PyResult<()> { let op = TextChange { start, end, @@ -92,7 +93,8 @@ impl BufferController { hash: None, }; let this = self.clone(); - a_sync_allow_threads!(py, this.send(op).await) + this.send(op)?; + Ok(()) } #[pyo3(name = "try_recv")]