chore: updated send methods (+format)

This commit is contained in:
əlemi 2024-10-03 03:17:30 +02:00 committed by alemi.dev
parent 907a0329d3
commit d5518a7b48
13 changed files with 69 additions and 49 deletions

2
Cargo.lock generated
View file

@ -231,7 +231,7 @@ dependencies = [
[[package]] [[package]]
name = "codemp" name = "codemp"
version = "0.7.2" version = "0.7.3"
dependencies = [ dependencies = [
"async-trait", "async-trait",
"codemp-proto", "codemp-proto",

View file

@ -37,7 +37,7 @@ where
/// See [`Controller`]'s documentation for details. /// See [`Controller`]'s documentation for details.
/// ///
/// Details about the receiving end are left to the implementor. /// Details about the receiving end are left to the implementor.
pub trait AsyncSender<T : Sized + Send + Sync> : Sized + Send + Sync { pub trait AsyncSender<T: Sized + Send + Sync>: Sized + Send + Sync {
/// Enqueue a new value to be sent to all other users without blocking /// Enqueue a new value to be sent to all other users without blocking
fn send(&self, x: T) -> ControllerResult<()>; fn send(&self, x: T) -> ControllerResult<()>;
} }

View file

@ -22,7 +22,8 @@ pub(crate) struct BufferAck {
impl Acknowledgeable for BufferAck { impl Acknowledgeable for BufferAck {
fn send(&mut self) { fn send(&mut self) {
self.tx.send(self.version.clone()) self.tx
.send(self.version.clone())
.unwrap_or_warn("no worker to receive sent ack"); .unwrap_or_warn("no worker to receive sent ack");
} }
} }

View file

@ -75,7 +75,8 @@ impl BufferController {
path: path.to_string(), path: path.to_string(),
latest_version: latest_version_tx, latest_version: latest_version_tx,
local_version: my_version_tx, local_version: my_version_tx,
ack_tx, ack_rx, ack_tx,
ack_rx,
ops_in: opin_rx, ops_in: opin_rx,
poller: poller_rx, poller: poller_rx,
pollers: Vec::new(), pollers: Vec::new(),
@ -185,7 +186,8 @@ impl BufferWorker {
self.latest_version self.latest_version
.send(self.oplog.local_version()) .send(self.oplog.local_version())
.unwrap_or_warn("failed to update latest version!"); .unwrap_or_warn("failed to update latest version!");
self.local_version.send(self.branch.local_version()) self.local_version
.send(self.branch.local_version())
.unwrap_or_warn("failed to update local version!"); .unwrap_or_warn("failed to update local version!");
} }
} }
@ -261,10 +263,14 @@ impl BufferWorker {
version: step_ver, version: step_ver,
}, },
}; };
self.local_version.send(new_local_v).unwrap_or_warn("could not update local version"); self.local_version
tx.send(Some(delta)).unwrap_or_warn("could not update ops channel -- is controller dead?"); .send(new_local_v)
.unwrap_or_warn("could not update local version");
tx.send(Some(delta))
.unwrap_or_warn("could not update ops channel -- is controller dead?");
} else { } else {
tx.send(None).unwrap_or_warn("could not update ops channel -- is controller dead?"); tx.send(None)
.unwrap_or_warn("could not update ops channel -- is controller dead?");
} }
} }
} }

View file

@ -42,10 +42,7 @@ impl AsyncSender<Cursor> for CursorController {
if cursor.start > cursor.end { if cursor.start > cursor.end {
std::mem::swap(&mut cursor.start, &mut cursor.end); std::mem::swap(&mut cursor.start, &mut cursor.end);
} }
Ok(self Ok(self.0.op.send(CursorPosition {
.0
.op
.send(CursorPosition {
buffer: BufferNode { buffer: BufferNode {
path: cursor.buffer, path: cursor.buffer,
}, },

View file

@ -3,9 +3,11 @@ use jni_toolbox::jni;
use crate::{ use crate::{
api::{ api::{
change::Delta,
controller::{AsyncReceiver, AsyncSender}, controller::{AsyncReceiver, AsyncSender},
TextChange, TextChange,
}, },
buffer::controller::BufferAck,
errors::ControllerError, errors::ControllerError,
}; };
@ -27,13 +29,13 @@ fn get_content(controller: &mut crate::buffer::Controller) -> Result<String, Con
#[jni(package = "mp.code", class = "BufferController")] #[jni(package = "mp.code", class = "BufferController")]
fn try_recv( fn try_recv(
controller: &mut crate::buffer::Controller, controller: &mut crate::buffer::Controller,
) -> Result<Option<TextChange>, ControllerError> { ) -> Result<Option<Delta<BufferAck>>, ControllerError> {
super::tokio().block_on(controller.try_recv()) super::tokio().block_on(controller.try_recv())
} }
/// Block until it receives a [TextChange]. /// Block until it receives a [TextChange].
#[jni(package = "mp.code", class = "BufferController")] #[jni(package = "mp.code", class = "BufferController")]
fn recv(controller: &mut crate::buffer::Controller) -> Result<TextChange, ControllerError> { fn recv(controller: &mut crate::buffer::Controller) -> Result<Delta<BufferAck>, ControllerError> {
super::tokio().block_on(controller.recv()) super::tokio().block_on(controller.recv())
} }
@ -43,7 +45,7 @@ fn send(
controller: &mut crate::buffer::Controller, controller: &mut crate::buffer::Controller,
change: TextChange, change: TextChange,
) -> Result<(), ControllerError> { ) -> Result<(), ControllerError> {
super::tokio().block_on(controller.send(change)) controller.send(change)
} }
/// Register a callback for buffer changes. /// Register a callback for buffer changes.

View file

@ -5,6 +5,10 @@ use crate::{
}, },
errors::ControllerError, errors::ControllerError,
}; };
use crate::{
api::{Controller, Cursor},
errors::ControllerError,
};
use jni::{objects::JObject, JNIEnv}; use jni::{objects::JObject, JNIEnv};
use jni_toolbox::jni; use jni_toolbox::jni;
@ -25,7 +29,7 @@ fn recv(controller: &mut crate::cursor::Controller) -> Result<Cursor, Controller
/// Receive from Java, converts and sends a [Cursor]. /// Receive from Java, converts and sends a [Cursor].
#[jni(package = "mp.code", class = "CursorController")] #[jni(package = "mp.code", class = "CursorController")]
fn send(controller: &mut crate::cursor::Controller, cursor: Cursor) -> Result<(), ControllerError> { fn send(controller: &mut crate::cursor::Controller, cursor: Cursor) -> Result<(), ControllerError> {
super::tokio().block_on(controller.send(cursor)) controller.send(cursor)
} }
/// Register a callback for cursor changes. /// Register a callback for cursor changes.

View file

@ -1,9 +1,14 @@
use crate::api::change::Delta;
use crate::api::controller::{AsyncReceiver, AsyncSender}; use crate::api::controller::{AsyncReceiver, AsyncSender};
use crate::api::TextChange; use crate::api::TextChange;
use crate::buffer::controller::BufferController; use crate::buffer::controller::BufferController;
use napi::threadsafe_function::{ use napi::threadsafe_function::{
ErrorStrategy::Fatal, ThreadSafeCallContext, ThreadsafeFunction, ThreadsafeFunctionCallMode, ErrorStrategy::Fatal, ThreadSafeCallContext, ThreadsafeFunction, ThreadsafeFunctionCallMode,
}; };
use napi::threadsafe_function::{
ErrorStrategy::Fatal, ThreadSafeCallContext, ThreadsafeFunction, ThreadsafeFunctionCallMode,
};
use napi_derive::napi;
use napi_derive::napi; use napi_derive::napi;
#[napi] #[napi]
@ -51,20 +56,20 @@ impl BufferController {
/// Return next buffer event if present /// Return next buffer event if present
#[napi(js_name = "try_recv")] #[napi(js_name = "try_recv")]
pub async fn js_try_recv(&self) -> napi::Result<Option<TextChange>> { pub async fn js_try_recv(&self) -> napi::Result<Option<Delta<BufferAck>>> {
Ok(self.try_recv().await?) Ok(self.try_recv().await?)
} }
/// Wait for next buffer event and return it /// Wait for next buffer event and return it
#[napi(js_name = "recv")] #[napi(js_name = "recv")]
pub async fn js_recv(&self) -> napi::Result<TextChange> { pub async fn js_recv(&self) -> napi::Result<Delta<BufferAck>> {
Ok(self.recv().await?) Ok(self.recv().await?)
} }
/// Send a buffer update to workspace /// Send a buffer update to workspace
#[napi(js_name = "send")] #[napi(js_name = "send")]
pub async fn js_send(&self, op: TextChange) -> napi::Result<()> { pub fn js_send(&self, op: TextChange) -> napi::Result<()> {
Ok(self.send(op).await?) Ok(self.send(op)?)
} }
/// Return buffer whole content /// Return buffer whole content

View file

@ -1,9 +1,14 @@
use crate::api::controller::{AsyncReceiver, AsyncSender}; use crate::api::controller::{AsyncReceiver, AsyncSender};
use crate::cursor::controller::CursorController; use crate::cursor::controller::CursorController;
use napi::threadsafe_function::ErrorStrategy::Fatal; use napi::threadsafe_function::ErrorStrategy::Fatal;
use napi::threadsafe_function::ErrorStrategy::Fatal;
use napi::threadsafe_function::{ use napi::threadsafe_function::{
ThreadSafeCallContext, ThreadsafeFunction, ThreadsafeFunctionCallMode, ThreadSafeCallContext, ThreadsafeFunction, ThreadsafeFunctionCallMode,
}; };
use napi::threadsafe_function::{
ThreadSafeCallContext, ThreadsafeFunction, ThreadsafeFunctionCallMode,
};
use napi_derive::napi;
use napi_derive::napi; use napi_derive::napi;
#[napi(object, js_name = "Cursor")] #[napi(object, js_name = "Cursor")]
@ -74,8 +79,8 @@ impl CursorController {
/// Send a new cursor event to remote /// Send a new cursor event to remote
#[napi(js_name = "send")] #[napi(js_name = "send")]
pub async fn js_send(&self, pos: JsCursor) -> napi::Result<()> { pub fn js_send(&self, pos: JsCursor) -> napi::Result<()> {
Ok(self.send(crate::api::Cursor::from(pos)).await?) Ok(self.send(crate::api::Cursor::from(pos))?)
} }
/// Get next cursor event if available without blocking /// Get next cursor event if available without blocking

View file

@ -8,8 +8,6 @@ use napi::threadsafe_function::{
}; };
use napi_derive::napi; use napi_derive::napi;
use super::client::JsUser;
#[napi(object, js_name = "Event")] #[napi(object, js_name = "Event")]
pub struct JsEvent { pub struct JsEvent {
pub r#type: String, pub r#type: String,

View file

@ -11,10 +11,10 @@ impl LuaUserData for CodempBufferController {
Ok(format!("{:?}", this)) Ok(format!("{:?}", this))
}); });
methods.add_method( methods.add_method("send", |_, this, (change,): (CodempTextChange,)| {
"send", this.send(change)?;
|_, this, (change,): (CodempTextChange,)| a_sync! { this => this.send(change).await? }, Ok(())
); });
methods.add_method( methods.add_method(
"try_recv", "try_recv",

View file

@ -12,10 +12,10 @@ impl LuaUserData for CodempCursorController {
Ok(format!("{:?}", this)) Ok(format!("{:?}", this))
}); });
methods.add_method( methods.add_method("send", |_, this, (cursor,): (CodempCursor,)| {
"send", this.send(cursor)?;
|_, this, (cursor,): (CodempCursor,)| a_sync! { this => this.send(cursor).await? }, Ok(())
); });
methods.add_method( methods.add_method(
"try_recv", "try_recv",
|_, this, ()| a_sync! { this => this.try_recv().await? }, |_, this, ()| a_sync! { this => this.try_recv().await? },

View file

@ -15,11 +15,11 @@ impl CursorController {
#[pyo3(name = "send")] #[pyo3(name = "send")]
fn pysend( fn pysend(
&self, &self,
py: Python, _py: Python,
path: String, path: String,
start: (i32, i32), start: (i32, i32),
end: (i32, i32), end: (i32, i32),
) -> PyResult<Promise> { ) -> PyResult<()> {
let pos = Cursor { let pos = Cursor {
start, start,
end, end,
@ -27,7 +27,8 @@ impl CursorController {
user: None, user: None,
}; };
let this = self.clone(); let this = self.clone();
a_sync_allow_threads!(py, this.send(pos).await) this.send(pos)?;
Ok(())
} }
#[pyo3(name = "try_recv")] #[pyo3(name = "try_recv")]
@ -84,7 +85,7 @@ impl BufferController {
} }
#[pyo3(name = "send")] #[pyo3(name = "send")]
fn pysend(&self, py: Python, start: u32, end: u32, txt: String) -> PyResult<Promise> { fn pysend(&self, _py: Python, start: u32, end: u32, txt: String) -> PyResult<()> {
let op = TextChange { let op = TextChange {
start, start,
end, end,
@ -92,7 +93,8 @@ impl BufferController {
hash: None, hash: None,
}; };
let this = self.clone(); let this = self.clone();
a_sync_allow_threads!(py, this.send(op).await) this.send(op)?;
Ok(())
} }
#[pyo3(name = "try_recv")] #[pyo3(name = "try_recv")]