Merge pull request #33 from hexedtech/feat/workspace-receiver

feat: workspace receiver
This commit is contained in:
zaaarf 2024-10-08 23:04:49 +02:00 committed by GitHub
commit 1e3b3dc705
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
23 changed files with 504 additions and 233 deletions

View file

@ -2,8 +2,8 @@ package mp.code;
import java.util.Optional; import java.util.Optional;
import java.util.UUID; import java.util.UUID;
import java.util.function.Consumer;
import mp.code.data.DetachResult;
import mp.code.exceptions.ConnectionException; import mp.code.exceptions.ConnectionException;
import mp.code.exceptions.ConnectionRemoteException; import mp.code.exceptions.ConnectionRemoteException;
import mp.code.exceptions.ControllerException; import mp.code.exceptions.ControllerException;
@ -168,16 +168,58 @@ public final class Workspace {
delete_buffer(this.ptr, path); delete_buffer(this.ptr, path);
} }
private static native Event event(long self) throws ControllerException; private static native Event try_recv(long self) throws ControllerException;
/** /**
* Blocks until a workspace event occurs. * Tries to get a {@link Event} from the queue if any were present, and returns
* You shouldn't call this, unless it's on a dedicated thread. * an empty optional otherwise.
* @return the {@link Event} that has occurred * @return the first workspace event in queue, if any are present
* @throws ControllerException if the event arrived while the underlying controller was already closed * @throws ControllerException if the controller was stopped
*/ */
public Event event() throws ControllerException { public Optional<Event> tryRecv() throws ControllerException {
return event(this.ptr); return Optional.ofNullable(try_recv(this.ptr));
}
private static native Event recv(long self) throws ControllerException;
/**
* Blocks until a {@link Event} is available and returns it.
* @return the workspace event that occurred
* @throws ControllerException if the controller was stopped
*/
public Event recv() throws ControllerException {
return recv(this.ptr);
}
private static native void callback(long self, Consumer<Workspace> cb);
/**
* Registers a callback to be invoked whenever a new {@link Event} is ready to be received.
* This will not work unless a Java thread has been dedicated to the event loop.
* @see Extensions#drive(boolean)
*/
public void callback(Consumer<Workspace> cb) {
callback(this.ptr, cb);
}
private static native void clear_callback(long self);
/**
* Clears the registered callback.
* @see #callback(Consumer)
*/
public void clearCallback() {
clear_callback(this.ptr);
}
private static native void poll(long self) throws ControllerException;
/**
* Blocks until a {@link Event} is available.
* @throws ControllerException if the controller was stopped
*/
public void poll() throws ControllerException {
poll(this.ptr);
} }
private static native void free(long self); private static native void free(long self);

View file

@ -1,15 +0,0 @@
package mp.code.data;
import mp.code.Workspace;
/**
* The result of a {@link Workspace#detachFromBuffer(String)} operation.
*/
public enum DetachResult {
/** The user was not attached to this buffer. */
NOT_ATTACHED,
/** The user detached from the buffer and stopped it. */
DETACHING,
/** The user was attached, but the buffer was already stopped. */
ALREADY_DETACHED
}

View file

@ -89,6 +89,16 @@ function WorkspaceEventPromise:cancel() end
function WorkspaceEventPromise:and_then(cb) end function WorkspaceEventPromise:and_then(cb) end
---@class (exact) MaybeWorkspaceEventPromise : Promise
local MaybeWorkspaceEventPromise = {}
--- block until promise is ready and return value
--- @return WorkspaceEvent | nil
function MaybeWorkspaceEventPromise:await() end
---@param cb fun(x: WorkspaceEvent | nil) callback to invoke
---invoke callback asynchronously as soon as promise is ready
function MaybeWorkspaceEventPromise:and_then(cb) end
---@class (exact) BufferControllerPromise : Promise ---@class (exact) BufferControllerPromise : Promise
local BufferControllerPromise = {} local BufferControllerPromise = {}
--- block until promise is ready and return value --- block until promise is ready and return value
@ -276,11 +286,30 @@ function Workspace:fetch_users(path) end
---@field type string ---@field type string
---@field value string ---@field value string
---@return MaybeWorkspaceEventPromise
---@async
---@nodiscard
---try to receive workspace events, returning nil if none is available
function Workspace:try_recv() end
---@return WorkspaceEventPromise ---@return WorkspaceEventPromise
---@async ---@async
---@nodiscard ---@nodiscard
---get next workspace event ---block until next workspace event and return it
function Workspace:event() end function Workspace:recv() end
---@return NilPromise
---@async
---@nodiscard
---block until next workspace event without returning it
function Workspace:poll() end
---clears any previously registered workspace callback
function Workspace:clear_callback() end
---@param cb fun(w: Workspace) callback to invoke on each workspace event received
---register a new callback to be called on workspace events (replaces any previously registered one)
function Workspace:callback(cb) end

View file

@ -52,6 +52,9 @@ class Client:
def user_name(self) -> str: ... def user_name(self) -> str: ...
def refresh(self) -> Promise[None]: ... def refresh(self) -> Promise[None]: ...
class Event:
pass
class Workspace: class Workspace:
""" """
Handle to a workspace inside codemp. It manages buffers. Handle to a workspace inside codemp. It manages buffers.
@ -69,6 +72,11 @@ class Workspace:
def buffer_by_name(self, path: str) -> Optional[BufferController]: ... def buffer_by_name(self, path: str) -> Optional[BufferController]: ...
def buffer_list(self) -> list[str]: ... def buffer_list(self) -> list[str]: ...
def filetree(self, filter: Optional[str], strict: bool) -> list[str]: ... def filetree(self, filter: Optional[str], strict: bool) -> list[str]: ...
def recv(self) -> Promise[Event]: ...
def try_recv(self) -> Promise[Optional[Event]]: ...
def poll(self) -> Promise[None]: ...
def clear_callback(self) -> None: ...
def callback(self, cb: Callable[[Workspace], None]) -> None: ...
class TextChange: class TextChange:
""" """

View file

@ -8,16 +8,17 @@ use crate::errors::ControllerResult;
// note that we don't use thiserror's #[from] because we don't want the error structs to contain // note that we don't use thiserror's #[from] because we don't want the error structs to contain
// these foreign types, and also we want these to be easily constructable // these foreign types, and also we want these to be easily constructable
/// Asynchronous and thread-safe handle to a generic bidirectional stream. /// Asynchronous and thread-safe handle to a generic bidirectional stream. Exists as a combination
/// of [`AsyncSender`] and [`AsyncReceiver`].
/// ///
/// This generic trait is implemented by actors managing stream procedures, and will generally /// This generic trait is implemented by actors managing stream procedures, and will generally
/// imply a background worker. /// imply a background worker.
/// ///
/// Events can be enqueued for dispatching without blocking with [`Controller::send`]. /// Events can be enqueued for dispatching without blocking with [`AsyncSender::send`].
/// ///
/// For receiving events from the server, an asynchronous API with [`Controller::recv`] is /// For receiving events from the server, an asynchronous API with [`AsyncReceiver::recv`] is
/// provided; if that is not feasible, consider using [`Controller::callback`] or, alternatively, /// provided; if that is not feasible, consider using [`AsyncReceiver::callback`] or, alternatively,
/// [`Controller::poll`] combined with [`Controller::try_recv`]. /// [`AsyncReceiver::poll`] combined with [`AsyncReceiver::try_recv`].
/// ///
/// Every [`Controller`]'s worker will stop cleanly when all references to its [`Controller`] have /// Every [`Controller`]'s worker will stop cleanly when all references to its [`Controller`] have
/// been dropped. /// been dropped.
@ -25,10 +26,31 @@ use crate::errors::ControllerResult;
/// [`crate::ext::select_buffer`] may provide a useful helper for managing multiple controllers. /// [`crate::ext::select_buffer`] may provide a useful helper for managing multiple controllers.
#[allow(async_fn_in_trait)] #[allow(async_fn_in_trait)]
#[cfg_attr(feature = "async-trait", async_trait::async_trait)] #[cfg_attr(feature = "async-trait", async_trait::async_trait)]
pub trait Controller<T: Sized + Send + Sync>: Sized + Send + Sync { pub trait Controller<Tx, Rx = Tx>: AsyncSender<Tx> + AsyncReceiver<Rx>
where
Tx: Sized + Sync + Send,
Rx: Sized + Sync + Send,
{
}
/// Asynchronous and thread-safe handle to send data over a stream.
/// See [`Controller`]'s documentation for details.
///
/// Details about the receiving end are left to the implementor.
#[allow(async_fn_in_trait)]
#[cfg_attr(feature = "async-trait", async_trait::async_trait)]
pub trait AsyncSender<T: Sized + Send + Sync>: Sized + Send + Sync {
/// Enqueue a new value to be sent to all other users. /// Enqueue a new value to be sent to all other users.
async fn send(&self, x: T) -> ControllerResult<()>; async fn send(&self, x: T) -> ControllerResult<()>;
}
/// Asynchronous and thread-safe handle to receive data from a stream.
/// See [`Controller`]'s documentation for details.
///
/// Details about the sender are left to the implementor.
#[allow(async_fn_in_trait)]
#[cfg_attr(feature = "async-trait", async_trait::async_trait)]
pub trait AsyncReceiver<T: Sized + Send + Sync>: Sized + Send + Sync {
/// Block until a value is available and returns it. /// Block until a value is available and returns it.
async fn recv(&self) -> ControllerResult<T> { async fn recv(&self) -> ControllerResult<T> {
loop { loop {

View file

@ -6,7 +6,7 @@ use std::sync::Arc;
use diamond_types::LocalVersion; use diamond_types::LocalVersion;
use tokio::sync::{mpsc, oneshot, watch}; use tokio::sync::{mpsc, oneshot, watch};
use crate::api::controller::{Controller, ControllerCallback}; use crate::api::controller::{AsyncReceiver, AsyncSender, Controller, ControllerCallback};
use crate::api::TextChange; use crate::api::TextChange;
use crate::errors::ControllerResult; use crate::errors::ControllerResult;
use crate::ext::InternallyMutable; use crate::ext::InternallyMutable;
@ -53,7 +53,21 @@ pub(crate) struct BufferControllerInner {
} }
#[cfg_attr(feature = "async-trait", async_trait::async_trait)] #[cfg_attr(feature = "async-trait", async_trait::async_trait)]
impl Controller<TextChange> for BufferController { impl Controller<TextChange> for BufferController {}
#[cfg_attr(feature = "async-trait", async_trait::async_trait)]
impl AsyncSender<TextChange> for BufferController {
async fn send(&self, op: TextChange) -> ControllerResult<()> {
// we let the worker do the updating to the last version and send it back.
let (tx, rx) = oneshot::channel();
self.0.ops_in.send((op, tx))?;
self.0.last_update.set(rx.await?);
Ok(())
}
}
#[cfg_attr(feature = "async-trait", async_trait::async_trait)]
impl AsyncReceiver<TextChange> for BufferController {
async fn poll(&self) -> ControllerResult<()> { async fn poll(&self) -> ControllerResult<()> {
if self.0.last_update.get() != *self.0.latest_version.borrow() { if self.0.last_update.get() != *self.0.latest_version.borrow() {
return Ok(()); return Ok(());
@ -80,14 +94,6 @@ impl Controller<TextChange> for BufferController {
Ok(change) Ok(change)
} }
async fn send(&self, op: TextChange) -> ControllerResult<()> {
// we let the worker do the updating to the last version and send it back.
let (tx, rx) = oneshot::channel();
self.0.ops_in.send((op, tx))?;
self.0.last_update.set(rx.await?);
Ok(())
}
fn callback(&self, cb: impl Into<ControllerCallback<BufferController>>) { fn callback(&self, cb: impl Into<ControllerCallback<BufferController>>) {
if self.0.callback.send(Some(cb.into())).is_err() { if self.0.callback.send(Some(cb.into())).is_err() {
// TODO should we panic? we failed what we were supposed to do // TODO should we panic? we failed what we were supposed to do

View file

@ -1,5 +1,6 @@
use std::sync::Arc; use std::sync::Arc;
use diamond_types::list::{Branch, OpLog};
use diamond_types::LocalVersion; use diamond_types::LocalVersion;
use tokio::sync::{mpsc, oneshot, watch}; use tokio::sync::{mpsc, oneshot, watch};
use tonic::Streaming; use tonic::Streaming;
@ -27,6 +28,9 @@ struct BufferWorker {
delta_req: mpsc::Receiver<DeltaRequest>, delta_req: mpsc::Receiver<DeltaRequest>,
controller: std::sync::Weak<BufferControllerInner>, controller: std::sync::Weak<BufferControllerInner>,
callback: watch::Receiver<Option<ControllerCallback<BufferController>>>, callback: watch::Receiver<Option<ControllerCallback<BufferController>>>,
oplog: OpLog,
branch: Branch,
timer: Timer,
} }
impl BufferController { impl BufferController {
@ -71,6 +75,9 @@ impl BufferController {
content_checkout: req_rx, content_checkout: req_rx,
delta_req: recv_rx, delta_req: recv_rx,
callback: cb_rx, callback: cb_rx,
oplog: OpLog::new(),
branch: Branch::new(),
timer: Timer::new(10), // TODO configurable!
}; };
tokio::spawn(async move { BufferController::work(worker, tx, rx).await }); tokio::spawn(async move { BufferController::work(worker, tx, rx).await });
@ -83,11 +90,7 @@ impl BufferController {
tx: mpsc::Sender<Operation>, tx: mpsc::Sender<Operation>,
mut rx: Streaming<BufferEvent>, mut rx: Streaming<BufferEvent>,
) { ) {
let mut branch = diamond_types::list::Branch::new();
let mut oplog = diamond_types::list::OpLog::new();
let mut timer = Timer::new(10); // TODO configurable!!
tracing::debug!("controller worker started"); tracing::debug!("controller worker started");
loop { loop {
if worker.controller.upgrade().is_none() { if worker.controller.upgrade().is_none() {
break; break;
@ -106,105 +109,28 @@ impl BufferController {
// received a text change from editor // received a text change from editor
res = worker.ops_in.recv() => match res { res = worker.ops_in.recv() => match res {
None => break tracing::debug!("stopping: editor closed channel"), None => break tracing::debug!("stopping: editor closed channel"),
Some((change, ack)) => { Some((change, ack)) => worker.handle_editor_change(change, ack, &tx).await,
let agent_id = oplog.get_or_create_agent_id(&worker.user_id.to_string());
let last_ver = oplog.local_version();
// clip to buffer extents
let clip_end = std::cmp::min(branch.len(), change.end as usize);
let clip_start = std::cmp::max(0, change.start as usize);
// in case we have a "replace" span
if change.is_delete() {
branch.delete_without_content(&mut oplog, agent_id, clip_start..clip_end);
}
if change.is_insert() {
branch.insert(&mut oplog, agent_id, clip_start, &change.content);
}
if change.is_delete() || change.is_insert() {
tx.send(Operation { data: oplog.encode_from(Default::default(), &last_ver) }).await
.unwrap_or_warn("failed to send change!");
worker.latest_version.send(oplog.local_version())
.unwrap_or_warn("failed to update latest version!");
}
ack.send(branch.local_version()).unwrap_or_warn("controller didn't wait for ack");
},
}, },
// received a message from server: add to oplog and update latest version (+unlock pollers) // received a message from server: add to oplog and update latest version (+unlock pollers)
res = rx.message() => match res { res = rx.message() => match res {
Err(e) => break tracing::warn!("error receiving from server for buffer {}: {e}", worker.path), Err(e) => break tracing::warn!("error receiving from server for buffer {}: {e}", worker.path),
Ok(None) => break tracing::info!("disconnected from buffer {}", worker.path), Ok(None) => break tracing::info!("disconnected from buffer {}", worker.path),
Ok(Some(change)) => match worker.controller.upgrade() { Ok(Some(change)) => if worker.handle_server_change(change).await { break },
None => break, // clean exit actually, just weird we caught it here
Some(controller) => match oplog.decode_and_add(&change.op.data) {
Ok(local_version) => {
worker.latest_version.send(local_version)
.unwrap_or_warn("failed to update latest version!");
for tx in worker.pollers.drain(..) {
tx.send(()).unwrap_or_warn("could not wake up poller");
}
if let Some(cb) = worker.callback.borrow().as_ref() {
cb.call(BufferController(controller)); // TODO should we run this on another task/thread?
}
},
Err(e) => tracing::error!("could not deserialize operation from server: {}", e),
}
},
}, },
// controller is ready to apply change and recv(), calculate it and send it back // controller is ready to apply change and recv(), calculate it and send it back
res = worker.delta_req.recv() => match res { res = worker.delta_req.recv() => match res {
None => break tracing::error!("no more active controllers: can't send changes"), None => break tracing::error!("no more active controllers: can't send changes"),
Some((last_ver, tx)) => { Some((last_ver, tx)) => worker.handle_delta_request(last_ver, tx).await,
if let Some((lv, Some(dtop))) = oplog.iter_xf_operations_from(&last_ver, oplog.local_version_ref()).next() {
// x.0.start should always be after lastver!
// this step_ver will be the version after we apply the operation
// we give it to the controller so that he knows where it's at.
let step_ver = oplog.version_union(&[lv.end-1], &last_ver);
branch.merge(&oplog, &step_ver);
let new_local_v = branch.local_version();
let hash = if timer.step() {
Some(crate::ext::hash(branch.content().to_string()))
} else { None };
let tc = match dtop.kind {
diamond_types::list::operation::OpKind::Ins => {
if dtop.end() - dtop.start() != dtop.content_as_str().unwrap_or_default().len() {
tracing::error!("[?!?!] Insert span differs from effective content len (TODO remove this error after a bit)");
}
crate::api::change::TextChange {
start: dtop.start() as u32,
end: dtop.start() as u32,
content: dtop.content_as_str().unwrap_or_default().to_string(),
hash
}
},
diamond_types::list::operation::OpKind::Del => {
crate::api::change::TextChange {
start: dtop.start() as u32,
end: dtop.end() as u32,
content: dtop.content_as_str().unwrap_or_default().to_string(),
hash
}
}
};
tx.send((new_local_v, Some(tc))).unwrap_or_warn("could not update ops channel -- is controller dead?");
} else {
tx.send((last_ver, None)).unwrap_or_warn("could not update ops channel -- is controller dead?");
}
},
}, },
// received a request for full CRDT content // received a request for full CRDT content
res = worker.content_checkout.recv() => match res { res = worker.content_checkout.recv() => match res {
None => break tracing::error!("no more active controllers: can't update content"), None => break tracing::error!("no more active controllers: can't update content"),
Some(tx) => { Some(tx) => {
branch.merge(&oplog, oplog.local_version_ref()); worker.branch.merge(&worker.oplog, worker.oplog.local_version_ref());
let content = branch.content().to_string(); let content = worker.branch.content().to_string();
tx.send(content).unwrap_or_warn("checkout request dropped"); tx.send(content).unwrap_or_warn("checkout request dropped");
}, },
} }
@ -215,6 +141,117 @@ impl BufferController {
} }
} }
impl BufferWorker {
async fn handle_editor_change(
&mut self,
change: TextChange,
ack: oneshot::Sender<LocalVersion>,
tx: &mpsc::Sender<Operation>,
) {
let agent_id = self.oplog.get_or_create_agent_id(&self.user_id.to_string());
let last_ver = self.oplog.local_version();
// clip to buffer extents
let clip_end = std::cmp::min(self.branch.len(), change.end as usize);
let clip_start = std::cmp::max(0, change.start as usize);
// in case we have a "replace" span
if change.is_delete() {
self.branch
.delete_without_content(&mut self.oplog, agent_id, clip_start..clip_end);
}
if change.is_insert() {
self.branch
.insert(&mut self.oplog, agent_id, clip_start, &change.content);
}
if change.is_delete() || change.is_insert() {
tx.send(Operation {
data: self.oplog.encode_from(Default::default(), &last_ver),
})
.await
.unwrap_or_warn("failed to send change!");
self.latest_version
.send(self.oplog.local_version())
.unwrap_or_warn("failed to update latest version!");
}
ack.send(self.branch.local_version())
.unwrap_or_warn("controller didn't wait for ack");
}
async fn handle_server_change(&mut self, change: BufferEvent) -> bool {
match self.controller.upgrade() {
None => true, // clean exit actually, just weird we caught it here
Some(controller) => match self.oplog.decode_and_add(&change.op.data) {
Ok(local_version) => {
self.latest_version
.send(local_version)
.unwrap_or_warn("failed to update latest version!");
for tx in self.pollers.drain(..) {
tx.send(()).unwrap_or_warn("could not wake up poller");
}
if let Some(cb) = self.callback.borrow().as_ref() {
cb.call(BufferController(controller)); // TODO should we run this on another task/thread?
}
false
}
Err(e) => {
tracing::error!("could not deserialize operation from server: {}", e);
true
}
},
}
}
async fn handle_delta_request(&mut self, last_ver: LocalVersion, tx: oneshot::Sender<DeltaOp>) {
if let Some((lv, Some(dtop))) = self
.oplog
.iter_xf_operations_from(&last_ver, self.oplog.local_version_ref())
.next()
{
// x.0.start should always be after lastver!
// this step_ver will be the version after we apply the operation
// we give it to the controller so that he knows where it's at.
let step_ver = self.oplog.version_union(&[lv.end - 1], &last_ver);
self.branch.merge(&self.oplog, &step_ver);
let new_local_v = self.branch.local_version();
let hash = if self.timer.step() {
Some(crate::ext::hash(self.branch.content().to_string()))
} else {
None
};
let tc = match dtop.kind {
diamond_types::list::operation::OpKind::Ins => {
if dtop.end() - dtop.start() != dtop.content_as_str().unwrap_or_default().len()
{
tracing::error!("[?!?!] Insert span differs from effective content len (TODO remove this error after a bit)");
}
crate::api::change::TextChange {
start: dtop.start() as u32,
end: dtop.start() as u32,
content: dtop.content_as_str().unwrap_or_default().to_string(),
hash,
}
}
diamond_types::list::operation::OpKind::Del => crate::api::change::TextChange {
start: dtop.start() as u32,
end: dtop.end() as u32,
content: dtop.content_as_str().unwrap_or_default().to_string(),
hash,
},
};
tx.send((new_local_v, Some(tc)))
.unwrap_or_warn("could not update ops channel -- is controller dead?");
} else {
tx.send((last_ver, None))
.unwrap_or_warn("could not update ops channel -- is controller dead?");
}
}
}
struct Timer(u32, u32); struct Timer(u32, u32);
impl Timer { impl Timer {
fn new(period: u32) -> Self { fn new(period: u32) -> Self {

View file

@ -6,7 +6,10 @@ use std::sync::Arc;
use tokio::sync::{mpsc, oneshot, watch}; use tokio::sync::{mpsc, oneshot, watch};
use crate::{ use crate::{
api::{controller::ControllerCallback, Controller, Cursor}, api::{
controller::{AsyncReceiver, AsyncSender, ControllerCallback},
Controller, Cursor,
},
errors::ControllerResult, errors::ControllerResult,
}; };
use codemp_proto::{ use codemp_proto::{
@ -31,7 +34,10 @@ pub(crate) struct CursorControllerInner {
} }
#[cfg_attr(feature = "async-trait", async_trait::async_trait)] #[cfg_attr(feature = "async-trait", async_trait::async_trait)]
impl Controller<Cursor> for CursorController { impl Controller<Cursor> for CursorController {}
#[cfg_attr(feature = "async-trait", async_trait::async_trait)]
impl AsyncSender<Cursor> for CursorController {
async fn send(&self, mut cursor: Cursor) -> ControllerResult<()> { async fn send(&self, mut cursor: Cursor) -> ControllerResult<()> {
if cursor.start > cursor.end { if cursor.start > cursor.end {
std::mem::swap(&mut cursor.start, &mut cursor.end); std::mem::swap(&mut cursor.start, &mut cursor.end);
@ -54,7 +60,10 @@ impl Controller<Cursor> for CursorController {
}) })
.await?) .await?)
} }
}
#[cfg_attr(feature = "async-trait", async_trait::async_trait)]
impl AsyncReceiver<Cursor> for CursorController {
async fn try_recv(&self) -> ControllerResult<Option<Cursor>> { async fn try_recv(&self) -> ControllerResult<Option<Cursor>> {
let (tx, rx) = oneshot::channel(); let (tx, rx) = oneshot::channel();
self.0.stream.send(tx).await?; self.0.stream.send(tx).await?;

View file

@ -1,7 +1,7 @@
//! ### Extensions //! ### Extensions
//! Contains a number of utils used internally or that may be of general interest. //! Contains a number of utils used internally or that may be of general interest.
use crate::{api::Controller, errors::ControllerResult}; use crate::{api::controller::AsyncReceiver, errors::ControllerResult};
use tokio::sync::mpsc; use tokio::sync::mpsc;
/// Poll all given buffer controllers and wait, returning the first one ready. /// Poll all given buffer controllers and wait, returning the first one ready.

View file

@ -2,7 +2,10 @@ use jni::{objects::JObject, JNIEnv};
use jni_toolbox::jni; use jni_toolbox::jni;
use crate::{ use crate::{
api::{Controller, TextChange}, api::{
controller::{AsyncReceiver, AsyncSender},
TextChange,
},
errors::ControllerError, errors::ControllerError,
}; };

View file

@ -1,5 +1,8 @@
use crate::{ use crate::{
api::{Controller, Cursor}, api::{
controller::{AsyncReceiver, AsyncSender},
Cursor,
},
errors::ControllerError, errors::ControllerError,
}; };
use jni::{objects::JObject, JNIEnv}; use jni::{objects::JObject, JNIEnv};

View file

@ -1,7 +1,10 @@
use crate::{ use crate::{
api::controller::AsyncReceiver,
errors::{ConnectionError, ControllerError, RemoteError}, errors::{ConnectionError, ControllerError, RemoteError},
ffi::java::null_check,
Workspace, Workspace,
}; };
use jni::{objects::JObject, JNIEnv};
use jni_toolbox::jni; use jni_toolbox::jni;
/// Get the workspace id. /// Get the workspace id.
@ -88,10 +91,69 @@ fn delete_buffer(workspace: &mut Workspace, path: String) -> Result<(), RemoteEr
super::tokio().block_on(workspace.delete(&path)) super::tokio().block_on(workspace.delete(&path))
} }
/// Block and receive a workspace event
#[jni(package = "mp.code", class = "Workspace")]
fn recv(workspace: &mut Workspace) -> Result<crate::api::Event, ControllerError> {
super::tokio().block_on(workspace.recv())
}
/// Receive a workspace event if present. /// Receive a workspace event if present.
#[jni(package = "mp.code", class = "Workspace")] #[jni(package = "mp.code", class = "Workspace")]
fn event(workspace: &mut Workspace) -> Result<crate::api::Event, ControllerError> { fn try_recv(workspace: &mut Workspace) -> Result<Option<crate::api::Event>, ControllerError> {
super::tokio().block_on(workspace.event()) super::tokio().block_on(workspace.try_recv())
}
/// Block until a workspace event is available
#[jni(package = "mp.code", class = "Workspace")]
fn poll(workspace: &mut Workspace) -> Result<(), ControllerError> {
super::tokio().block_on(workspace.poll())
}
/// Clear previously registered callback
#[jni(package = "mp.code", class = "Workspace")]
fn clear_callback(workspace: &mut Workspace) {
workspace.clear_callback();
}
/// Register a callback for workspace events.
#[jni(package = "mp.code", class = "Workspace")]
fn callback<'local>(
env: &mut JNIEnv<'local>,
controller: &mut crate::Workspace,
cb: JObject<'local>,
) {
null_check!(env, cb, {});
let Ok(cb_ref) = env.new_global_ref(cb) else {
env.throw_new(
"mp/code/exceptions/JNIException",
"Failed to pin callback reference!",
)
.expect("Failed to throw exception!");
return;
};
controller.callback(move |workspace: crate::Workspace| {
let jvm = super::jvm();
let mut env = jvm
.attach_current_thread_permanently()
.expect("failed attaching to main JVM thread");
if let Err(e) = env.with_local_frame(5, |env| {
use jni_toolbox::IntoJavaObject;
let jworkspace = workspace.into_java_object(env)?;
if let Err(e) = env.call_method(
&cb_ref,
"accept",
"(Ljava/lang/Object;)V",
&[jni::objects::JValueGen::Object(&jworkspace)],
) {
tracing::error!("error invoking callback: {e:?}");
};
Ok::<(), jni::errors::Error>(())
}) {
tracing::error!("error invoking callback: {e}");
let _ = env.exception_describe();
}
});
} }
/// Called by the Java GC to drop a [Workspace]. /// Called by the Java GC to drop a [Workspace].

View file

@ -1,4 +1,4 @@
use crate::api::Controller; use crate::api::controller::{AsyncReceiver, AsyncSender};
use crate::api::TextChange; use crate::api::TextChange;
use crate::buffer::controller::BufferController; use crate::buffer::controller::BufferController;
use napi::threadsafe_function::{ use napi::threadsafe_function::{

View file

@ -1,4 +1,4 @@
use crate::api::Controller; use crate::api::controller::{AsyncReceiver, AsyncSender};
use crate::cursor::controller::CursorController; use crate::cursor::controller::CursorController;
use napi::threadsafe_function::ErrorStrategy::Fatal; use napi::threadsafe_function::ErrorStrategy::Fatal;
use napi::threadsafe_function::{ use napi::threadsafe_function::{

View file

@ -1,9 +1,15 @@
use crate::api::controller::AsyncReceiver;
use crate::buffer::controller::BufferController; use crate::buffer::controller::BufferController;
use crate::cursor::controller::CursorController; use crate::cursor::controller::CursorController;
use crate::ffi::js::client::JsUser;
use crate::Workspace; use crate::Workspace;
use napi::threadsafe_function::ErrorStrategy::Fatal;
use napi::threadsafe_function::{
ThreadSafeCallContext, ThreadsafeFunction, ThreadsafeFunctionCallMode,
};
use napi_derive::napi; use napi_derive::napi;
use super::client::JsUser;
#[napi(object, js_name = "Event")] #[napi(object, js_name = "Event")]
pub struct JsEvent { pub struct JsEvent {
pub r#type: String, pub r#type: String,
@ -85,6 +91,42 @@ impl Workspace {
Ok(self.delete(&path).await?) Ok(self.delete(&path).await?)
} }
#[napi(js_name = "recv")]
pub async fn js_recv(&self) -> napi::Result<JsEvent> {
Ok(JsEvent::from(self.recv().await?))
}
#[napi(js_name = "try_recv")]
pub async fn js_try_recv(&self) -> napi::Result<Option<JsEvent>> {
Ok(self.try_recv().await?.map(JsEvent::from))
}
#[napi(js_name = "poll")]
pub async fn js_poll(&self) -> napi::Result<()> {
self.poll().await?;
Ok(())
}
#[napi(js_name = "clear_callback")]
pub fn js_clear_callback(&self) -> napi::Result<()> {
self.clear_callback();
Ok(())
}
#[napi(js_name = "callback", ts_args_type = "fun: (event: Workspace) => void")]
pub fn js_callback(&self, fun: napi::JsFunction) -> napi::Result<()> {
let tsfn: ThreadsafeFunction<crate::Workspace, Fatal> = fun
.create_threadsafe_function(0, |ctx: ThreadSafeCallContext<crate::Workspace>| {
Ok(vec![ctx.value])
})?;
self.callback(move |controller: Workspace| {
tsfn.call(controller.clone(), ThreadsafeFunctionCallMode::Blocking); //check this with tracing also we could use Ok(event) to get the error
// If it blocks the main thread too many time we have to change this
});
Ok(())
}
/// Detach from an active buffer, stopping its underlying worker /// Detach from an active buffer, stopping its underlying worker
/// this method returns true if no reference or last reference was held, false if there are still /// this method returns true if no reference or last reference was held, false if there are still
/// dangling references to clear /// dangling references to clear
@ -93,12 +135,6 @@ impl Workspace {
self.detach(&path) self.detach(&path)
} }
/// Wait for next workspace event and return it
#[napi(js_name = "event")]
pub async fn js_event(&self) -> napi::Result<JsEvent> {
Ok(JsEvent::from(self.event().await?))
}
/// Re-fetch remote buffer list /// Re-fetch remote buffer list
#[napi(js_name = "fetch_buffers")] #[napi(js_name = "fetch_buffers")]
pub async fn js_fetch_buffers(&self) -> napi::Result<()> { pub async fn js_fetch_buffers(&self) -> napi::Result<()> {

View file

@ -54,7 +54,7 @@ impl LuaUserData for Promise {
Some(x) => { Some(x) => {
x.abort(); x.abort();
Ok(()) Ok(())
}, }
}); });
methods.add_method_mut("and_then", |_, this, (cb,): (LuaFunction,)| { methods.add_method_mut("and_then", |_, this, (cb,): (LuaFunction,)| {
match this.0.take() { match this.0.take() {

View file

@ -71,6 +71,7 @@ pub(crate) enum CallbackArg {
BufferController(CodempBufferController), BufferController(CodempBufferController),
Workspace(CodempWorkspace), Workspace(CodempWorkspace),
Event(CodempEvent), Event(CodempEvent),
MaybeEvent(Option<CodempEvent>),
Cursor(CodempCursor), Cursor(CodempCursor),
MaybeCursor(Option<CodempCursor>), MaybeCursor(Option<CodempCursor>),
TextChange(CodempTextChange), TextChange(CodempTextChange),
@ -91,6 +92,7 @@ impl IntoLua for CallbackArg {
CallbackArg::Workspace(x) => x.into_lua(lua), CallbackArg::Workspace(x) => x.into_lua(lua),
CallbackArg::VecStr(x) => x.into_lua(lua), CallbackArg::VecStr(x) => x.into_lua(lua),
CallbackArg::Event(x) => x.into_lua(lua), CallbackArg::Event(x) => x.into_lua(lua),
CallbackArg::MaybeEvent(x) => x.into_lua(lua),
CallbackArg::Cursor(x) => x.into_lua(lua), CallbackArg::Cursor(x) => x.into_lua(lua),
CallbackArg::MaybeCursor(x) => x.into_lua(lua), CallbackArg::MaybeCursor(x) => x.into_lua(lua),
CallbackArg::TextChange(x) => x.into_lua(lua), CallbackArg::TextChange(x) => x.into_lua(lua),
@ -99,63 +101,16 @@ impl IntoLua for CallbackArg {
} }
} }
impl From<()> for CallbackArg { impl From<()> for CallbackArg { fn from(_: ()) -> Self { CallbackArg::Nil } }
fn from(_: ()) -> Self { impl From<String> for CallbackArg { fn from(value: String) -> Self { CallbackArg::Str(value) } }
CallbackArg::Nil impl From<CodempClient> for CallbackArg { fn from(value: CodempClient) -> Self { CallbackArg::Client(value) } }
} impl From<CodempCursorController> for CallbackArg { fn from(value: CodempCursorController) -> Self { CallbackArg::CursorController(value) } }
} impl From<CodempBufferController> for CallbackArg { fn from(value: CodempBufferController) -> Self { CallbackArg::BufferController(value) } }
impl From<String> for CallbackArg { impl From<CodempWorkspace> for CallbackArg { fn from(value: CodempWorkspace) -> Self { CallbackArg::Workspace(value) } }
fn from(value: String) -> Self { impl From<Vec<String>> for CallbackArg { fn from(value: Vec<String>) -> Self { CallbackArg::VecStr(value) } }
CallbackArg::Str(value) impl From<CodempEvent> for CallbackArg { fn from(value: CodempEvent) -> Self { CallbackArg::Event(value) } }
} impl From<CodempCursor> for CallbackArg { fn from(value: CodempCursor) -> Self { CallbackArg::Cursor(value) } }
} impl From<Option<CodempCursor>> for CallbackArg { fn from(value: Option<CodempCursor>) -> Self { CallbackArg::MaybeCursor(value) } }
impl From<CodempClient> for CallbackArg { impl From<CodempTextChange> for CallbackArg { fn from(value: CodempTextChange) -> Self { CallbackArg::TextChange(value) } }
fn from(value: CodempClient) -> Self { impl From<Option<CodempTextChange>> for CallbackArg { fn from(value: Option<CodempTextChange>) -> Self { CallbackArg::MaybeTextChange(value) } }
CallbackArg::Client(value) impl From<Option<CodempEvent>> for CallbackArg { fn from(value: Option<CodempEvent>) -> Self { CallbackArg::MaybeEvent(value) } }
}
}
impl From<CodempCursorController> for CallbackArg {
fn from(value: CodempCursorController) -> Self {
CallbackArg::CursorController(value)
}
}
impl From<CodempBufferController> for CallbackArg {
fn from(value: CodempBufferController) -> Self {
CallbackArg::BufferController(value)
}
}
impl From<CodempWorkspace> for CallbackArg {
fn from(value: CodempWorkspace) -> Self {
CallbackArg::Workspace(value)
}
}
impl From<Vec<String>> for CallbackArg {
fn from(value: Vec<String>) -> Self {
CallbackArg::VecStr(value)
}
}
impl From<CodempEvent> for CallbackArg {
fn from(value: CodempEvent) -> Self {
CallbackArg::Event(value)
}
}
impl From<CodempCursor> for CallbackArg {
fn from(value: CodempCursor) -> Self {
CallbackArg::Cursor(value)
}
}
impl From<Option<CodempCursor>> for CallbackArg {
fn from(value: Option<CodempCursor>) -> Self {
CallbackArg::MaybeCursor(value)
}
}
impl From<CodempTextChange> for CallbackArg {
fn from(value: CodempTextChange) -> Self {
CallbackArg::TextChange(value)
}
}
impl From<Option<CodempTextChange>> for CallbackArg {
fn from(value: Option<CodempTextChange>) -> Self {
CallbackArg::MaybeTextChange(value)
}
}

View file

@ -33,11 +33,6 @@ impl LuaUserData for CodempWorkspace {
Ok(this.buffer_by_name(&name)) Ok(this.buffer_by_name(&name))
}); });
methods.add_method(
"event",
|_, this, ()| a_sync! { this => this.event().await? },
);
methods.add_method( methods.add_method(
"fetch_buffers", "fetch_buffers",
|_, this, ()| a_sync! { this => this.fetch_buffers().await? }, |_, this, ()| a_sync! { this => this.fetch_buffers().await? },
@ -55,6 +50,27 @@ impl LuaUserData for CodempWorkspace {
); );
methods.add_method("user_list", |_, this, ()| Ok(this.user_list())); methods.add_method("user_list", |_, this, ()| Ok(this.user_list()));
methods.add_method("recv", |_, this, ()| a_sync! { this => this.recv().await? });
methods.add_method(
"try_recv",
|_, this, ()| a_sync! { this => this.try_recv().await? },
);
methods.add_method("poll", |_, this, ()| a_sync! { this => this.poll().await? });
methods.add_method("callback", |_, this, (cb,): (LuaFunction,)| {
this.callback(move |controller: CodempWorkspace| {
super::ext::callback().invoke(cb.clone(), controller)
});
Ok(())
});
methods.add_method("clear_callbacl", |_, this, ()| {
this.clear_callback();
Ok(())
});
} }
fn add_fields<F: LuaUserDataFields<Self>>(fields: &mut F) { fn add_fields<F: LuaUserDataFields<Self>>(fields: &mut F) {

View file

@ -1,4 +1,4 @@
use crate::api::Controller; use crate::api::controller::{AsyncReceiver, AsyncSender};
use crate::api::Cursor; use crate::api::Cursor;
use crate::api::TextChange; use crate::api::TextChange;
use crate::buffer::Controller as BufferController; use crate::buffer::Controller as BufferController;

View file

@ -1,6 +1,8 @@
use crate::api::controller::AsyncReceiver;
use crate::buffer::Controller as BufferController; use crate::buffer::Controller as BufferController;
use crate::cursor::Controller as CursorController; use crate::cursor::Controller as CursorController;
use crate::workspace::Workspace; use crate::workspace::Workspace;
use pyo3::exceptions::PyValueError;
use pyo3::prelude::*; use pyo3::prelude::*;
use super::a_sync_allow_threads; use super::a_sync_allow_threads;
@ -26,12 +28,6 @@ impl Workspace {
self.detach(path.as_str()) self.detach(path.as_str())
} }
#[pyo3(name = "event")]
fn pyevent(&self, py: Python) -> PyResult<Promise> {
let this = self.clone();
a_sync_allow_threads!(py, this.event().await)
}
#[pyo3(name = "fetch_buffers")] #[pyo3(name = "fetch_buffers")]
fn pyfetch_buffers(&self, py: Python) -> PyResult<Promise> { fn pyfetch_buffers(&self, py: Python) -> PyResult<Promise> {
let this = self.clone(); let this = self.clone();
@ -87,4 +83,42 @@ impl Workspace {
fn pyuser_list(&self) -> Vec<String> { fn pyuser_list(&self) -> Vec<String> {
self.user_list() self.user_list()
} }
#[pyo3(name = "recv")]
fn pyrecv(&self, py: Python) -> PyResult<Promise> {
let this = self.clone();
a_sync_allow_threads!(py, this.recv().await)
}
#[pyo3(name = "try_recv")]
fn pytry_recv(&self, py: Python) -> PyResult<Promise> {
let this = self.clone();
a_sync_allow_threads!(py, this.try_recv().await)
}
#[pyo3(name = "poll")]
fn pypoll(&self, py: Python) -> PyResult<Promise> {
let this = self.clone();
a_sync_allow_threads!(py, this.poll().await)
}
#[pyo3(name = "clear_callback")]
fn pyclear_callbacl(&self, _py: Python) {
self.clear_callback();
}
#[pyo3(name = "callback")]
fn pycallback(&self, py: Python, cb: PyObject) -> PyResult<()> {
if !cb.bind_borrowed(py).is_callable() {
return Err(PyValueError::new_err("The object passed must be callable."));
}
self.callback(move |ws| {
Python::with_gil(|py| {
// TODO what to do with this error?
let _ = cb.call1(py, (ws,));
})
});
Ok(())
}
} }

View file

@ -50,7 +50,7 @@
//! # let client = codemp::Client::connect(codemp::api::Config::new("", "")).await.unwrap(); //! # let client = codemp::Client::connect(codemp::api::Config::new("", "")).await.unwrap();
//! # client.create_workspace("").await.unwrap(); //! # client.create_workspace("").await.unwrap();
//! # let workspace = client.join_workspace("").await.unwrap(); //! # let workspace = client.join_workspace("").await.unwrap();
//! use codemp::api::Controller; // needed to access trait methods //! use codemp::api::controller::{AsyncSender, AsyncReceiver}; // needed to access trait methods
//! let cursor = workspace.cursor(); //! let cursor = workspace.cursor();
//! let event = cursor.recv().await.expect("disconnected while waiting for event!"); //! let event = cursor.recv().await.expect("disconnected while waiting for event!");
//! println!("user {} moved on buffer {}", event.user.unwrap_or_default(), event.buffer); //! println!("user {} moved on buffer {}", event.user.unwrap_or_default(), event.buffer);
@ -66,7 +66,7 @@
//! # let client = codemp::Client::connect(codemp::api::Config::new("", "")).await.unwrap(); //! # let client = codemp::Client::connect(codemp::api::Config::new("", "")).await.unwrap();
//! # client.create_workspace("").await.unwrap(); //! # client.create_workspace("").await.unwrap();
//! # let workspace = client.join_workspace("").await.unwrap(); //! # let workspace = client.join_workspace("").await.unwrap();
//! # use codemp::api::Controller; //! # use codemp::api::controller::{AsyncSender, AsyncReceiver};
//! let buffer = workspace.attach("/some/file.txt").await.expect("failed to attach"); //! let buffer = workspace.attach("/some/file.txt").await.expect("failed to attach");
//! buffer.content(); // force-sync //! buffer.content(); // force-sync
//! if let Some(change) = buffer.try_recv().await.unwrap() { //! if let Some(change) = buffer.try_recv().await.unwrap() {

View file

@ -2,6 +2,7 @@
//! All-in-one renamed imports with `use codemp::prelude::*`. //! All-in-one renamed imports with `use codemp::prelude::*`.
pub use crate::api::{ pub use crate::api::{
controller::AsyncReceiver as CodempAsyncReceiver, controller::AsyncSender as CodempAsyncSender,
Config as CodempConfig, Controller as CodempController, Cursor as CodempCursor, Config as CodempConfig, Controller as CodempController, Cursor as CodempCursor,
Event as CodempEvent, TextChange as CodempTextChange, User as CodempUser, Event as CodempEvent, TextChange as CodempTextChange, User as CodempUser,
}; };

View file

@ -4,7 +4,10 @@
//! Buffers are typically organized in a filetree-like reminiscent of POSIX filesystems. //! Buffers are typically organized in a filetree-like reminiscent of POSIX filesystems.
use crate::{ use crate::{
api::{Event, User}, api::{
controller::{AsyncReceiver, ControllerCallback},
Event, User,
},
buffer, cursor, buffer, cursor,
errors::{ConnectionResult, ControllerResult, RemoteResult}, errors::{ConnectionResult, ControllerResult, RemoteResult},
ext::InternallyMutable, ext::InternallyMutable,
@ -24,7 +27,7 @@ use codemp_proto::{
use dashmap::{DashMap, DashSet}; use dashmap::{DashMap, DashSet};
use std::{collections::BTreeSet, sync::Arc}; use std::{collections::BTreeSet, sync::Arc};
use tokio::sync::mpsc; use tokio::sync::{mpsc, mpsc::error::TryRecvError};
use tonic::Streaming; use tonic::Streaming;
use uuid::Uuid; use uuid::Uuid;
@ -55,6 +58,36 @@ struct WorkspaceInner {
users: Arc<DashMap<Uuid, User>>, users: Arc<DashMap<Uuid, User>>,
// TODO can we drop the mutex? // TODO can we drop the mutex?
events: tokio::sync::Mutex<mpsc::UnboundedReceiver<crate::api::Event>>, events: tokio::sync::Mutex<mpsc::UnboundedReceiver<crate::api::Event>>,
callback: std::sync::Mutex<Option<ControllerCallback<Workspace>>>, // TODO lmao another one
}
impl AsyncReceiver<Event> for Workspace {
async fn try_recv(&self) -> ControllerResult<Option<Event>> {
match self.0.events.lock().await.try_recv() {
Ok(x) => Ok(Some(x)),
Err(TryRecvError::Empty) => Ok(None),
Err(TryRecvError::Disconnected) => Err(crate::errors::ControllerError::Stopped),
}
}
async fn poll(&self) -> ControllerResult<()> {
loop {
if !self.0.events.lock().await.is_empty() {
break Ok(());
}
// TODO disgusting, please send help
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
}
}
// TODO please send HELP ASAP this is hurting me emotionally
fn clear_callback(&self) {
*self.0.callback.lock().expect("mutex poisoned") = None;
}
fn callback(&self, cb: impl Into<ControllerCallback<Self>>) {
*self.0.callback.lock().expect("mutex poisoned") = Some(cb.into());
}
} }
impl Workspace { impl Workspace {
@ -91,6 +124,7 @@ impl Workspace {
users, users,
events: tokio::sync::Mutex::new(ev_rx), events: tokio::sync::Mutex::new(ev_rx),
services, services,
callback: std::sync::Mutex::new(None),
})); }));
ws.fetch_users().await?; ws.fetch_users().await?;
@ -166,18 +200,6 @@ impl Workspace {
} }
} }
/// Await next workspace [Event] and return it when it arrives.
// TODO this method is weird and ugly, can we make it more standard?
pub async fn event(&self) -> ControllerResult<Event> {
self.0
.events
.lock()
.await
.recv()
.await
.ok_or(crate::errors::ControllerError::Unfulfilled)
}
/// Re-fetch the list of available buffers in the workspace. /// Re-fetch the list of available buffers in the workspace.
pub async fn fetch_buffers(&self) -> RemoteResult<()> { pub async fn fetch_buffers(&self) -> RemoteResult<()> {
let mut workspace_client = self.0.services.ws(); let mut workspace_client = self.0.services.ws();
@ -290,7 +312,8 @@ impl Workspace {
/// A filter may be applied, and it may be strict (equality check) or not (starts_with check). /// A filter may be applied, and it may be strict (equality check) or not (starts_with check).
// #[cfg_attr(feature = "js", napi)] // https://github.com/napi-rs/napi-rs/issues/1120 // #[cfg_attr(feature = "js", napi)] // https://github.com/napi-rs/napi-rs/issues/1120
pub fn filetree(&self, filter: Option<&str>, strict: bool) -> Vec<String> { pub fn filetree(&self, filter: Option<&str>, strict: bool) -> Vec<String> {
let mut tree = self.0 let mut tree = self
.0
.filetree .filetree
.iter() .iter()
.filter(|f| { .filter(|f| {