Merge pull request #39 from hexedtech/feat/buffer-ack

Feat: Buffer ack-ing
This commit is contained in:
əlemi 2024-10-10 14:13:31 +02:00 committed by GitHub
commit fb35ddb3cd
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
36 changed files with 574 additions and 425 deletions

2
Cargo.lock generated
View file

@ -231,7 +231,7 @@ dependencies = [
[[package]]
name = "codemp"
version = "0.7.2"
version = "0.7.3"
dependencies = [
"async-trait",
"codemp-proto",

View file

@ -41,7 +41,7 @@ tracing-subscriber = { version = "0.3", optional = true }
# glue (java)
lazy_static = { version = "1.5", optional = true }
jni = { version = "0.21", features = ["invocation"], optional = true }
jni-toolbox = { version = "0.2.0", optional = true, features = ["uuid"] }
jni-toolbox = { version = "0.2", optional = true, features = ["uuid"] }
# glue (lua)
mlua-codemp-patch = { version = "0.10.0-beta.2", features = ["module", "send", "serialize"], optional = true }

View file

@ -1,5 +1,6 @@
package mp.code;
import mp.code.data.BufferUpdate;
import mp.code.data.TextChange;
import mp.code.exceptions.ControllerException;
@ -42,26 +43,26 @@ public final class BufferController {
return get_content(this.ptr);
}
private static native TextChange try_recv(long self) throws ControllerException;
private static native BufferUpdate try_recv(long self) throws ControllerException;
/**
* Tries to get a {@link TextChange} from the queue if any were present, and returns
* Tries to get a {@link BufferUpdate} from the queue if any were present, and returns
* an empty optional otherwise.
* @return the first text change in queue, if any are present
* @throws ControllerException if the controller was stopped
*/
public Optional<TextChange> tryRecv() throws ControllerException {
public Optional<BufferUpdate> tryRecv() throws ControllerException {
return Optional.ofNullable(try_recv(this.ptr));
}
private static native TextChange recv(long self) throws ControllerException;
private static native BufferUpdate recv(long self) throws ControllerException;
/**
* Blocks until a {@link TextChange} is available and returns it.
* Blocks until a {@link BufferUpdate} is available and returns it.
* @return the text change update that occurred
* @throws ControllerException if the controller was stopped
*/
public TextChange recv() throws ControllerException {
public BufferUpdate recv() throws ControllerException {
return recv(this.ptr);
}
@ -78,7 +79,7 @@ public final class BufferController {
private static native void callback(long self, Consumer<BufferController> cb);
/**
* Registers a callback to be invoked whenever a {@link TextChange} occurs.
* Registers a callback to be invoked whenever a {@link BufferUpdate} occurs.
* This will not work unless a Java thread has been dedicated to the event loop.
* @see Extensions#drive(boolean)
*/
@ -106,6 +107,17 @@ public final class BufferController {
poll(this.ptr);
}
private static native void ack(long self, long[] version);
/**
* Acknowledges that a certain CRDT version has been correctly applied.
* @param version the version to acknowledge
* @see BufferUpdate#version
*/
public void ack(long[] version) {
ack(this.ptr, version);
}
private static native void free(long self);
static {

View file

@ -1,6 +1,7 @@
package mp.code;
import mp.code.data.Cursor;
import mp.code.data.Selection;
import mp.code.exceptions.ControllerException;
import java.util.Optional;
@ -42,13 +43,13 @@ public final class CursorController {
return recv(this.ptr);
}
private static native void send(long self, Cursor cursor) throws ControllerException;
private static native void send(long self, Selection cursor) throws ControllerException;
/**
* Tries to send a {@link Cursor} update.
* Tries to send a {@link Selection} update.
* @throws ControllerException if the controller was stopped
*/
public void send(Cursor cursor) throws ControllerException {
public void send(Selection cursor) throws ControllerException {
send(this.ptr, cursor);
}

View file

@ -0,0 +1,35 @@
package mp.code.data;
import lombok.EqualsAndHashCode;
import lombok.RequiredArgsConstructor;
import lombok.ToString;
import mp.code.Extensions;
import java.util.OptionalLong;
/**
* A data class holding information about a buffer update.
*/
@ToString
@EqualsAndHashCode
@RequiredArgsConstructor
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
public class BufferUpdate {
/**
* The hash of the content after applying it (calculated with {@link Extensions#hash(String)}).
* It is generally meaningless to send, but when received it is an invitation to check the hash
* and forcefully re-sync if necessary.
*/
public final OptionalLong hash; // xxh3 hash
/**
* The CRDT version after the associated change has been applied.
* You MUST acknowledge that it was applied with {@link mp.code.BufferController#ack(long[])}.
*/
public final long[] version;
/**
* The {@link TextChange} contained in this buffer update.
*/
public final TextChange change;
}

View file

@ -11,37 +11,13 @@ import lombok.ToString;
@EqualsAndHashCode
@RequiredArgsConstructor
public class Cursor {
/**
* The starting row of the cursor position.
* If negative, it is clamped to 0.
*/
public final int startRow;
/**
* The starting column of the cursor position.
* If negative, it is clamped to 0.
*/
public final int startCol;
/**
* The ending row of the cursor position.
* If negative, it is clamped to 0.
*/
public final int endRow;
/**
* The ending column of the cursor position.
* If negative, it is clamped to 0.
*/
public final int endCol;
/**
* The buffer the cursor is located on.
*/
public final String buffer;
/**
* The user who controls the cursor.
*/
public final String user;
/**
* The associated selection update.
*/
public final Selection selection;
}

View file

@ -0,0 +1,42 @@
package mp.code.data;
import lombok.EqualsAndHashCode;
import lombok.RequiredArgsConstructor;
import lombok.ToString;
/**
* A data class holding information about a cursor selection.
*/
@ToString
@EqualsAndHashCode
@RequiredArgsConstructor
public class Selection {
/**
* The starting row of the cursor position.
* If negative, it is clamped to 0.
*/
public final int startRow;
/**
* The starting column of the cursor position.
* If negative, it is clamped to 0.
*/
public final int startCol;
/**
* The ending row of the cursor position.
* If negative, it is clamped to 0.
*/
public final int endRow;
/**
* The ending column of the cursor position.
* If negative, it is clamped to 0.
*/
public final int endCol;
/**
* The buffer the cursor is located on.
*/
public final String buffer;
}

View file

@ -22,7 +22,7 @@ public class TextChange {
public final long start;
/**
* The endomg position of the change.
* The ending position of the change.
* If negative, it is clamped to 0.
*/
public final long end;
@ -33,13 +33,6 @@ public class TextChange {
*/
public final String content;
/**
* The hash of the content after applying it (calculated with {@link Extensions#hash(String)}).
* It is generally meaningless to send, but when received it is an invitation to check the hash
* and forcefully re-sync if necessary.
*/
public final OptionalLong hash; // xxh3 hash
/**
* Checks if the change represents a deletion.
* It does if the starting index is lower than the ending index.

View file

@ -135,28 +135,28 @@ function MaybeCursorPromise:cancel() end
function MaybeCursorPromise:and_then(cb) end
---@class (exact) TextChangePromise : Promise
local TextChangePromise = {}
---@class (exact) BufferUpdatePromise : Promise
local BufferUpdatePromise = {}
--- block until promise is ready and return value
--- @return TextChange
function TextChangePromise:await() end
--- @return BufferUpdate
function BufferUpdatePromise:await() end
--- cancel promise execution
function TextChangePromise:cancel() end
---@param cb fun(x: TextChange) callback to invoke
function BufferUpdatePromise:cancel() end
---@param cb fun(x: BufferUpdate) callback to invoke
---invoke callback asynchronously as soon as promise is ready
function TextChangePromise:and_then(cb) end
function BufferUpdatePromise:and_then(cb) end
---@class (exact) MaybeTextChangePromise : Promise
local MaybeTextChangePromise = {}
---@class (exact) MaybeBufferUpdatePromise : Promise
local MaybeBufferUpdatePromise = {}
--- block until promise is ready and return value
--- @return TextChange | nil
function MaybeTextChangePromise:await() end
--- @return BufferUpdate | nil
function MaybeBufferUpdatePromise:await() end
--- cancel promise execution
function MaybeTextChangePromise:cancel() end
---@param cb fun(x: TextChange | nil) callback to invoke
function MaybeBufferUpdatePromise:cancel() end
---@param cb fun(x: BufferUpdate | nil) callback to invoke
---invoke callback asynchronously as soon as promise is ready
function MaybeTextChangePromise:and_then(cb) end
function MaybeBufferUpdatePromise:and_then(cb) end
-- [[ END ASYNC STUFF ]]
@ -322,9 +322,14 @@ local BufferController = {}
---@field content string text content of change
---@field start integer start index of change
---@field finish integer end index of change
---@field hash integer? optional hash of text buffer after this change, for sync checks
local TextChange = {}
---@class (exact) BufferUpdate
---@field change TextChange text change for this delta
---@field version table<integer> CRDT version after this change
---@field hash integer? optional hash of text buffer after this change, for sync checks
local BufferUpdate = {}
---@param other string text to apply change to
---apply this text change to a string, returning the result
function TextChange:apply(other) end
@ -336,13 +341,13 @@ function TextChange:apply(other) end
---update buffer with a text change; note that to delete content should be empty but not span, while to insert span should be empty but not content (can insert and delete at the same time)
function BufferController:send(change) end
---@return MaybeTextChangePromise
---@return MaybeBufferUpdatePromise
---@async
---@nodiscard
---try to receive text changes, returning nil if none is available
function BufferController:try_recv() end
---@return TextChangePromise
---@return BufferUpdatePromise
---@async
---@nodiscard
---block until next text change and return it
@ -367,6 +372,10 @@ function BufferController:callback(cb) end
---get current content of buffer controller, marking all pending changes as seen
function BufferController:content() end
---@param version [integer] version to ack
---notify controller that this version's change has been correctly applied
function BufferController:ack(version) end
@ -374,14 +383,19 @@ function BufferController:content() end
---handle to a workspace's cursor channel, allowing send/recv operations
local CursorController = {}
---@class Cursor
---@field user string? id of user owning this cursor
---@class Selection
---@field buffer string relative path ("name") of buffer on which this cursor is
---@field start [integer, integer] cursor start position
---@field finish [integer, integer] cursor end position
---a cursor position
---@field start_row integer
---@field start_col integer
---@field end_row integer
---@field end_col integer
---a cursor selected region, as row-col indices
---@param cursor Cursor cursor event to broadcast
---@class Cursor
---@field user string id of user owning this cursor
---@field sel Selection selected region for this user
---@param cursor Selection cursor position to broadcast
---@return NilPromise
---@async
---@nodiscard

View file

@ -92,6 +92,14 @@ class TextChange:
def is_empty(self) -> bool: ...
def apply(self, txt: str) -> str: ...
class BufferUpdate:
"""
A single editor delta event, wrapping a TextChange and the new version
"""
change: TextChange
hash: Optional[int]
version: list[int]
class BufferController:
"""
@ -100,6 +108,7 @@ class BufferController:
"""
def path(self) -> str: ...
def content(self) -> Promise[str]: ...
def ack(self, v: list[int]) -> None: ...
def send(self,
start: int,
end: int,
@ -113,14 +122,20 @@ class BufferController:
class Cursor:
class Selection:
"""
An Editor agnostic cursor position representation
"""
start: Tuple[int, int]
end: Tuple[int, int]
buffer: str
user: Optional[str] # can be an empty string
class Cursor:
"""
A remote cursor event
"""
user: str
sel: Selection
class CursorController:

View file

@ -1,6 +1,25 @@
//! # TextChange
//! A high-level representation of a change within a given buffer.
/// A [`TextChange`] event happening on a buffer.
///
/// Contains the change itself, the new version after this change and an optional `hash` field.
/// This is used for error correction: if provided, it should match the hash of the buffer
/// content **after** applying this change. Note that the `hash` field will not necessarily
/// be provided every time.
#[derive(Clone, Debug, Default)]
#[cfg_attr(feature = "js", napi_derive::napi(object))]
#[cfg_attr(any(feature = "py", feature = "py-noabi"), pyo3::pyclass(get_all))]
#[cfg_attr(feature = "serialize", derive(serde::Serialize, serde::Deserialize))]
pub struct BufferUpdate {
/// Optional content hash after applying this change.
pub hash: Option<i64>,
/// CRDT version after this change has been applied.
pub version: Vec<i64>,
/// The change that has occurred.
pub change: TextChange,
}
/// An editor-friendly representation of a text change in a given buffer.
///
/// It's expressed with a range of characters and a string of content that should replace them,
@ -9,21 +28,22 @@
/// Bulky and large operations will result in a single [`TextChange`] effectively sending the whole
/// new buffer, but smaller changes are efficient and easy to create or apply.
///
/// [`TextChange`] contains an optional `hash` field. This is used for error correction: if
/// provided, it should match the hash of the buffer content **after** applying this change.
/// Note that the `hash` field will not necessarily be provided every time.
///
/// ### Examples
/// To insert 'a' after 4th character we should send a.
/// `TextChange { start: 4, end: 4, content: "a".into(), hash: None }`
/// To insert 'a' after 4th character we should send:
/// ```
/// codemp::api::TextChange { start: 4, end: 4, content: "a".into() };
/// ```
///
/// To delete a the fourth character we should send a.
/// `TextChange { start: 3, end: 4, content: "".into(), hash: None }`
/// To delete the fourth character we should send:
/// ```
/// codemp::api::TextChange { start: 3, end: 4, content: "".into() };
/// ```
///
/// ```no_run
/// ```
/// let change = codemp::api::TextChange {
/// start: 6, end: 11,
/// content: "mom".to_string(), hash: None
/// start: 6,
/// end: 11,
/// content: "mom".to_string()
/// };
/// let before = "hello world!";
/// let after = change.apply(before);
@ -41,8 +61,6 @@ pub struct TextChange {
pub end: u32,
/// New content of text inside span.
pub content: String,
/// Optional content hash after applying this change.
pub hash: Option<i64>,
}
impl TextChange {
@ -90,7 +108,6 @@ mod tests {
start: 5,
end: 5,
content: " cruel".to_string(),
hash: None,
};
let result = change.apply("hello world!");
assert_eq!(result, "hello cruel world!");
@ -102,7 +119,6 @@ mod tests {
start: 5,
end: 11,
content: "".to_string(),
hash: None,
};
let result = change.apply("hello cruel world!");
assert_eq!(result, "hello world!");
@ -114,7 +130,6 @@ mod tests {
start: 5,
end: 11,
content: " not very pleasant".to_string(),
hash: None,
};
let result = change.apply("hello cruel world!");
assert_eq!(result, "hello not very pleasant world!");
@ -126,7 +141,6 @@ mod tests {
start: 100,
end: 110,
content: "a very long string \n which totally matters".to_string(),
hash: None,
};
let result = change.apply("a short text");
assert_eq!(
@ -141,7 +155,6 @@ mod tests {
start: 42,
end: 42,
content: "".to_string(),
hash: None,
};
let result = change.apply("some important text");
assert_eq!(result, "some important text");

View file

@ -1,11 +1,11 @@
//! # Config
//! Data structure defining clients configuration
/// Configuration struct for `codemp` client
/// Configuration struct for the `codemp` client.
///
/// username and password are required fields, while everything else is optional
/// `username` and `password` are required fields, everything else is optional.
///
/// host, port and tls affect all connections to all grpc services
/// `host`, `port` and `tls` affect all connections to all gRPC services; the
/// resulting endpoint is composed like this:
/// http{tls?'s':''}://{host}:{port}
#[derive(Clone, Debug)]
@ -16,20 +16,20 @@
)]
#[cfg_attr(feature = "serialize", derive(serde::Serialize, serde::Deserialize))]
pub struct Config {
/// user identifier used to register, possibly your email
/// User identifier used to register, possibly your email.
pub username: String,
/// user password chosen upon registration
/// User password chosen upon registration.
pub password: String,
/// address of server to connect to, default api.code.mp
/// Address of server to connect to, default api.code.mp.
pub host: Option<String>,
/// port to connect to, default 50053
/// Port to connect to, default 50053.
pub port: Option<u16>,
/// enable or disable tls, default true
/// Enable or disable tls, default true.
pub tls: Option<bool>,
}
impl Config {
/// construct a new Config object, with given username and password
/// Construct a new Config object, with given username and password.
pub fn new(username: impl ToString, password: impl ToString) -> Self {
Self {
username: username.to_string(),

View file

@ -37,11 +37,9 @@ where
/// See [`Controller`]'s documentation for details.
///
/// Details about the receiving end are left to the implementor.
#[allow(async_fn_in_trait)]
#[cfg_attr(feature = "async-trait", async_trait::async_trait)]
pub trait AsyncSender<T: Sized + Send + Sync>: Sized + Send + Sync {
/// Enqueue a new value to be sent to all other users.
async fn send(&self, x: T) -> ControllerResult<()>;
/// Enqueue a new value to be sent to all other users without blocking
fn send(&self, x: T) -> ControllerResult<()>;
}
/// Asynchronous and thread-safe handle to receive data from a stream.

View file

@ -4,19 +4,34 @@
#[cfg(any(feature = "py", feature = "py-noabi"))]
use pyo3::prelude::*;
/// User cursor position in a buffer
/// An event that occurred about a user's cursor.
#[derive(Clone, Debug, Default)]
#[cfg_attr(feature = "js", napi_derive::napi(object))]
#[cfg_attr(any(feature = "py", feature = "py-noabi"), pyclass)]
#[cfg_attr(feature = "serialize", derive(serde::Serialize, serde::Deserialize))]
// #[cfg_attr(feature = "py", pyo3(crate = "reexported::pyo3"))]
pub struct Cursor {
/// Cursor start position in buffer, as 0-indexed row-column tuple.
pub start: (i32, i32),
/// Cursor end position in buffer, as 0-indexed row-column tuple.
#[cfg_attr(feature = "serialize", serde(alias = "finish"))] // Lua uses `end` as keyword
pub end: (i32, i32),
/// User who sent the cursor.
pub user: String,
/// The updated cursor selection.
pub sel: Selection,
}
/// A cursor selection span.
#[derive(Clone, Debug, Default)]
#[cfg_attr(feature = "js", napi_derive::napi(object))]
#[cfg_attr(any(feature = "py", feature = "py-noabi"), pyclass)]
#[cfg_attr(feature = "serialize", derive(serde::Serialize, serde::Deserialize))]
// #[cfg_attr(feature = "py", pyo3(crate = "reexported::pyo3"))]
pub struct Selection {
/// Cursor position starting row in buffer.
pub start_row: i32,
/// Cursor position starting column in buffer.
pub start_col: i32,
/// Cursor position final row in buffer.
pub end_row: i32,
/// Cursor position final column in buffer.
pub end_col: i32,
/// Path of buffer this cursor is on.
pub buffer: String,
/// User display name, if provided.
pub user: Option<String>,
}

View file

@ -19,9 +19,11 @@ pub mod event;
/// data structure for remote users
pub mod user;
pub use change::BufferUpdate;
pub use change::TextChange;
pub use config::Config;
pub use controller::Controller;
pub use cursor::Cursor;
pub use cursor::Selection;
pub use event::Event;
pub use user::User;

View file

@ -7,11 +7,10 @@ use diamond_types::LocalVersion;
use tokio::sync::{mpsc, oneshot, watch};
use crate::api::controller::{AsyncReceiver, AsyncSender, Controller, ControllerCallback};
use crate::api::BufferUpdate;
use crate::api::TextChange;
use crate::errors::ControllerResult;
use crate::ext::InternallyMutable;
use super::worker::DeltaRequest;
use crate::ext::IgnorableError;
/// A [Controller] to asynchronously interact with remote buffers.
///
@ -23,53 +22,59 @@ use super::worker::DeltaRequest;
pub struct BufferController(pub(crate) Arc<BufferControllerInner>);
impl BufferController {
/// Get the buffer path
/// Get the buffer path.
pub fn path(&self) -> &str {
&self.0.name
}
/// Return buffer whole content, updating internal acknowledgement tracker
/// Return buffer whole content, updating internal acknowledgement tracker.
pub async fn content(&self) -> ControllerResult<String> {
let (tx, rx) = oneshot::channel();
self.0.content_request.send(tx).await?;
let content = rx.await?;
self.0
.last_update
.set(self.0.latest_version.borrow().clone());
Ok(content)
}
/// Notify CRDT that changes up to the given version have been merged succesfully.
pub fn ack(&self, version: Vec<i64>) {
let version = version
.into_iter()
.map(|x| usize::from_ne_bytes(x.to_ne_bytes()))
.collect();
self.0
.ack_tx
.send(version)
.unwrap_or_warn("no worker to receive sent ack");
}
}
#[derive(Debug)]
pub(crate) struct BufferControllerInner {
pub(crate) name: String,
pub(crate) latest_version: watch::Receiver<diamond_types::LocalVersion>,
pub(crate) last_update: InternallyMutable<diamond_types::LocalVersion>,
pub(crate) ops_in: mpsc::UnboundedSender<(TextChange, oneshot::Sender<LocalVersion>)>,
pub(crate) local_version: watch::Receiver<diamond_types::LocalVersion>,
pub(crate) ops_in: mpsc::UnboundedSender<TextChange>,
pub(crate) poller: mpsc::UnboundedSender<oneshot::Sender<()>>,
pub(crate) content_request: mpsc::Sender<oneshot::Sender<String>>,
pub(crate) delta_request: mpsc::Sender<DeltaRequest>,
pub(crate) delta_request: mpsc::Sender<(LocalVersion, oneshot::Sender<Option<BufferUpdate>>)>,
pub(crate) callback: watch::Sender<Option<ControllerCallback<BufferController>>>,
pub(crate) ack_tx: mpsc::UnboundedSender<LocalVersion>,
}
#[cfg_attr(feature = "async-trait", async_trait::async_trait)]
impl Controller<TextChange> for BufferController {}
impl Controller<TextChange, BufferUpdate> for BufferController {}
#[cfg_attr(feature = "async-trait", async_trait::async_trait)]
impl AsyncSender<TextChange> for BufferController {
async fn send(&self, op: TextChange) -> ControllerResult<()> {
// we let the worker do the updating to the last version and send it back.
let (tx, rx) = oneshot::channel();
self.0.ops_in.send((op, tx))?;
self.0.last_update.set(rx.await?);
fn send(&self, op: TextChange) -> ControllerResult<()> {
self.0.ops_in.send(op)?;
Ok(())
}
}
#[cfg_attr(feature = "async-trait", async_trait::async_trait)]
impl AsyncReceiver<TextChange> for BufferController {
impl AsyncReceiver<BufferUpdate> for BufferController {
async fn poll(&self) -> ControllerResult<()> {
if self.0.last_update.get() != *self.0.latest_version.borrow() {
if *self.0.local_version.borrow() != *self.0.latest_version.borrow() {
return Ok(());
}
@ -79,8 +84,8 @@ impl AsyncReceiver<TextChange> for BufferController {
Ok(())
}
async fn try_recv(&self) -> ControllerResult<Option<TextChange>> {
let last_update = self.0.last_update.get();
async fn try_recv(&self) -> ControllerResult<Option<BufferUpdate>> {
let last_update = self.0.local_version.borrow().clone();
let latest_version = self.0.latest_version.borrow().clone();
if last_update == latest_version {
@ -89,16 +94,11 @@ impl AsyncReceiver<TextChange> for BufferController {
let (tx, rx) = oneshot::channel();
self.0.delta_request.send((last_update, tx)).await?;
let (v, change) = rx.await?;
self.0.last_update.set(v);
Ok(change)
Ok(rx.await?)
}
fn callback(&self, cb: impl Into<ControllerCallback<BufferController>>) {
if self.0.callback.send(Some(cb.into())).is_err() {
// TODO should we panic? we failed what we were supposed to do
tracing::error!("no active buffer worker to run registered callback!");
}
self.0.callback.send_replace(Some(cb.into()));
}
fn clear_callback(&self) {

View file

@ -7,25 +7,25 @@ use tonic::Streaming;
use uuid::Uuid;
use crate::api::controller::ControllerCallback;
use crate::api::BufferUpdate;
use crate::api::TextChange;
use crate::ext::{IgnorableError, InternallyMutable};
use crate::ext::IgnorableError;
use codemp_proto::buffer::{BufferEvent, Operation};
use super::controller::{BufferController, BufferControllerInner};
pub(crate) type DeltaOp = (LocalVersion, Option<TextChange>);
pub(crate) type DeltaRequest = (LocalVersion, oneshot::Sender<DeltaOp>);
struct BufferWorker {
user_id: Uuid,
path: String,
latest_version: watch::Sender<diamond_types::LocalVersion>,
ops_in: mpsc::UnboundedReceiver<(TextChange, oneshot::Sender<LocalVersion>)>,
local_version: watch::Sender<diamond_types::LocalVersion>,
ack_rx: mpsc::UnboundedReceiver<LocalVersion>,
ops_in: mpsc::UnboundedReceiver<TextChange>,
poller: mpsc::UnboundedReceiver<oneshot::Sender<()>>,
pollers: Vec<oneshot::Sender<()>>,
content_checkout: mpsc::Receiver<oneshot::Sender<String>>,
delta_req: mpsc::Receiver<DeltaRequest>,
delta_req: mpsc::Receiver<(LocalVersion, oneshot::Sender<Option<BufferUpdate>>)>,
controller: std::sync::Weak<BufferControllerInner>,
callback: watch::Receiver<Option<ControllerCallback<BufferController>>>,
oplog: OpLog,
@ -43,7 +43,9 @@ impl BufferController {
let init = diamond_types::LocalVersion::default();
let (latest_version_tx, latest_version_rx) = watch::channel(init.clone());
let (my_version_tx, my_version_rx) = watch::channel(init.clone());
let (opin_tx, opin_rx) = mpsc::unbounded_channel();
let (ack_tx, ack_rx) = mpsc::unbounded_channel();
let (req_tx, req_rx) = mpsc::channel(1);
let (recv_tx, recv_rx) = mpsc::channel(1);
@ -54,12 +56,13 @@ impl BufferController {
let controller = Arc::new(BufferControllerInner {
name: path.to_string(),
latest_version: latest_version_rx,
last_update: InternallyMutable::new(diamond_types::LocalVersion::default()),
local_version: my_version_rx,
ops_in: opin_tx,
poller: poller_tx,
content_request: req_tx,
delta_request: recv_tx,
callback: cb_tx,
ack_tx,
});
let weak = Arc::downgrade(&controller);
@ -68,6 +71,8 @@ impl BufferController {
user_id,
path: path.to_string(),
latest_version: latest_version_tx,
local_version: my_version_tx,
ack_rx,
ops_in: opin_rx,
poller: poller_rx,
pollers: Vec::new(),
@ -106,10 +111,18 @@ impl BufferController {
Some(tx) => worker.pollers.push(tx),
},
// received new change ack, merge editor branch up to that version
res = worker.ack_rx.recv() => match res {
None => break tracing::error!("ack channel closed"),
Some(v) => {
worker.branch.merge(&worker.oplog, &v)
},
},
// received a text change from editor
res = worker.ops_in.recv() => match res {
None => break tracing::debug!("stopping: editor closed channel"),
Some((change, ack)) => worker.handle_editor_change(change, ack, &tx).await,
Some(change) => worker.handle_editor_change(change, &tx).await,
},
// received a message from server: add to oplog and update latest version (+unlock pollers)
@ -142,12 +155,7 @@ impl BufferController {
}
impl BufferWorker {
async fn handle_editor_change(
&mut self,
change: TextChange,
ack: oneshot::Sender<LocalVersion>,
tx: &mpsc::Sender<Operation>,
) {
async fn handle_editor_change(&mut self, change: TextChange, tx: &mpsc::Sender<Operation>) {
let agent_id = self.oplog.get_or_create_agent_id(&self.user_id.to_string());
let last_ver = self.oplog.local_version();
// clip to buffer extents
@ -174,9 +182,10 @@ impl BufferWorker {
self.latest_version
.send(self.oplog.local_version())
.unwrap_or_warn("failed to update latest version!");
self.local_version
.send(self.branch.local_version())
.unwrap_or_warn("failed to update local version!");
}
ack.send(self.branch.local_version())
.unwrap_or_warn("controller didn't wait for ack");
}
async fn handle_server_change(&mut self, change: BufferEvent) -> bool {
@ -203,7 +212,11 @@ impl BufferWorker {
}
}
async fn handle_delta_request(&mut self, last_ver: LocalVersion, tx: oneshot::Sender<DeltaOp>) {
async fn handle_delta_request(
&mut self,
last_ver: LocalVersion,
tx: oneshot::Sender<Option<BufferUpdate>>,
) {
if let Some((lv, Some(dtop))) = self
.oplog
.iter_xf_operations_from(&last_ver, self.oplog.local_version_ref())
@ -228,25 +241,40 @@ impl BufferWorker {
{
tracing::error!("[?!?!] Insert span differs from effective content len (TODO remove this error after a bit)");
}
crate::api::change::TextChange {
start: dtop.start() as u32,
end: dtop.start() as u32,
content: dtop.content_as_str().unwrap_or_default().to_string(),
crate::api::BufferUpdate {
hash,
version: step_ver
.into_iter()
.map(|x| i64::from_ne_bytes(x.to_ne_bytes()))
.collect(), // TODO this is wasteful
change: crate::api::TextChange {
start: dtop.start() as u32,
end: dtop.start() as u32,
content: dtop.content_as_str().unwrap_or_default().to_string(),
},
}
}
diamond_types::list::operation::OpKind::Del => crate::api::change::TextChange {
start: dtop.start() as u32,
end: dtop.end() as u32,
content: dtop.content_as_str().unwrap_or_default().to_string(),
diamond_types::list::operation::OpKind::Del => crate::api::BufferUpdate {
hash,
version: step_ver
.into_iter()
.map(|x| i64::from_ne_bytes(x.to_ne_bytes()))
.collect(), // TODO this is wasteful
change: crate::api::TextChange {
start: dtop.start() as u32,
end: dtop.end() as u32,
content: dtop.content_as_str().unwrap_or_default().to_string(),
},
},
};
tx.send((new_local_v, Some(tc)))
self.local_version
.send(new_local_v)
.unwrap_or_warn("could not update local version");
tx.send(Some(tc))
.unwrap_or_warn("could not update ops channel -- is controller dead?");
} else {
tx.send((last_ver, None))
tx.send(None)
.unwrap_or_warn("could not update ops channel -- is controller dead?");
}
}

View file

@ -8,7 +8,7 @@ use tokio::sync::{mpsc, oneshot, watch};
use crate::{
api::{
controller::{AsyncReceiver, AsyncSender, ControllerCallback},
Controller, Cursor,
Controller, Cursor, Selection,
},
errors::ControllerResult,
};
@ -27,38 +27,38 @@ pub struct CursorController(pub(crate) Arc<CursorControllerInner>);
#[derive(Debug)]
pub(crate) struct CursorControllerInner {
pub(crate) op: mpsc::Sender<CursorPosition>,
pub(crate) op: mpsc::UnboundedSender<CursorPosition>,
pub(crate) stream: mpsc::Sender<oneshot::Sender<Option<Cursor>>>,
pub(crate) poll: mpsc::UnboundedSender<oneshot::Sender<()>>,
pub(crate) callback: watch::Sender<Option<ControllerCallback<CursorController>>>,
}
#[cfg_attr(feature = "async-trait", async_trait::async_trait)]
impl Controller<Cursor> for CursorController {}
impl Controller<Selection, Cursor> for CursorController {}
#[cfg_attr(feature = "async-trait", async_trait::async_trait)]
impl AsyncSender<Cursor> for CursorController {
async fn send(&self, mut cursor: Cursor) -> ControllerResult<()> {
if cursor.start > cursor.end {
std::mem::swap(&mut cursor.start, &mut cursor.end);
impl AsyncSender<Selection> for CursorController {
fn send(&self, mut cursor: Selection) -> ControllerResult<()> {
if cursor.start_row > cursor.end_row
|| (cursor.start_row == cursor.end_row && cursor.start_col > cursor.end_col)
{
std::mem::swap(&mut cursor.start_row, &mut cursor.end_row);
std::mem::swap(&mut cursor.start_col, &mut cursor.end_col);
}
Ok(self
.0
.op
.send(CursorPosition {
buffer: BufferNode {
path: cursor.buffer,
},
start: RowCol {
row: cursor.start.0,
col: cursor.start.1,
},
end: RowCol {
row: cursor.end.0,
col: cursor.end.1,
},
})
.await?)
Ok(self.0.op.send(CursorPosition {
buffer: BufferNode {
path: cursor.buffer,
},
start: RowCol {
row: cursor.start_row,
col: cursor.start_col,
},
end: RowCol {
row: cursor.end_row,
col: cursor.end_col,
},
})?)
}
}

View file

@ -5,7 +5,7 @@ use tonic::Streaming;
use uuid::Uuid;
use crate::{
api::{controller::ControllerCallback, Cursor, User},
api::{controller::ControllerCallback, Cursor, Selection, User},
ext::IgnorableError,
};
use codemp_proto::cursor::{CursorEvent, CursorPosition};
@ -13,7 +13,7 @@ use codemp_proto::cursor::{CursorEvent, CursorPosition};
use super::controller::{CursorController, CursorControllerInner};
struct CursorWorker {
op: mpsc::Receiver<CursorPosition>,
op: mpsc::UnboundedReceiver<CursorPosition>,
map: Arc<dashmap::DashMap<Uuid, User>>,
stream: mpsc::Receiver<oneshot::Sender<Option<Cursor>>>,
poll: mpsc::UnboundedReceiver<oneshot::Sender<()>>,
@ -30,7 +30,7 @@ impl CursorController {
rx: Streaming<CursorEvent>,
) -> Self {
// TODO we should tweak the channel buffer size to better propagate backpressure
let (op_tx, op_rx) = mpsc::channel(64);
let (op_tx, op_rx) = mpsc::unbounded_channel();
let (stream_tx, stream_rx) = mpsc::channel(1);
let (cb_tx, cb_rx) = watch::channel(None);
let (poll_tx, poll_rx) = mpsc::unbounded_channel();
@ -86,16 +86,18 @@ impl CursorController {
None => break, // clean exit, just weird that we got it here
Some(controller) => {
tracing::debug!("received cursor from server");
let mut cursor = Cursor {
buffer: cur.position.buffer.path,
start: (cur.position.start.row, cur.position.start.col),
end: (cur.position.end.row, cur.position.end.col),
user: None,
};
let user_id = Uuid::from(cur.user);
if let Some(user) = worker.map.get(&user_id) {
cursor.user = Some(user.name.clone());
}
let cursor = Cursor {
user: worker.map.get(&user_id).map(|u| u.name.clone()).unwrap_or_default(),
sel: Selection {
buffer: cur.position.buffer.path,
start_row: cur.position.start.row,
start_col: cur.position.start.col,
end_row: cur.position.end.row,
end_col: cur.position.end.col
}
};
worker.store.push_back(cursor);
for tx in worker.pollers.drain(..) {
tx.send(()).unwrap_or_warn("poller dropped before unblocking");

View file

@ -4,7 +4,7 @@ use jni_toolbox::jni;
use crate::{
api::{
controller::{AsyncReceiver, AsyncSender},
TextChange,
BufferUpdate, TextChange,
},
errors::ControllerError,
};
@ -27,13 +27,13 @@ fn get_content(controller: &mut crate::buffer::Controller) -> Result<String, Con
#[jni(package = "mp.code", class = "BufferController")]
fn try_recv(
controller: &mut crate::buffer::Controller,
) -> Result<Option<TextChange>, ControllerError> {
) -> Result<Option<BufferUpdate>, ControllerError> {
super::tokio().block_on(controller.try_recv())
}
/// Block until it receives a [TextChange].
#[jni(package = "mp.code", class = "BufferController")]
fn recv(controller: &mut crate::buffer::Controller) -> Result<TextChange, ControllerError> {
fn recv(controller: &mut crate::buffer::Controller) -> Result<BufferUpdate, ControllerError> {
super::tokio().block_on(controller.recv())
}
@ -43,7 +43,7 @@ fn send(
controller: &mut crate::buffer::Controller,
change: TextChange,
) -> Result<(), ControllerError> {
super::tokio().block_on(controller.send(change))
controller.send(change)
}
/// Register a callback for buffer changes.
@ -99,6 +99,12 @@ fn poll(controller: &mut crate::buffer::Controller) -> Result<(), ControllerErro
super::tokio().block_on(controller.poll())
}
/// Acknowledge that a change has been correctly applied.
#[jni(package = "mp.code", class = "BufferController")]
fn ack(controller: &mut crate::buffer::Controller, version: Vec<i64>) {
controller.ack(version)
}
/// Called by the Java GC to drop a [crate::buffer::Controller].
#[jni(package = "mp.code", class = "BufferController")]
fn free(input: jni::sys::jlong) {

View file

@ -1,7 +1,7 @@
use crate::{
api::{
controller::{AsyncReceiver, AsyncSender},
Cursor,
Cursor, Selection,
},
errors::ControllerError,
};
@ -24,8 +24,8 @@ fn recv(controller: &mut crate::cursor::Controller) -> Result<Cursor, Controller
/// Receive from Java, converts and sends a [Cursor].
#[jni(package = "mp.code", class = "CursorController")]
fn send(controller: &mut crate::cursor::Controller, cursor: Cursor) -> Result<(), ControllerError> {
super::tokio().block_on(controller.send(cursor))
fn send(controller: &mut crate::cursor::Controller, sel: Selection) -> Result<(), ControllerError> {
controller.send(sel)
}
/// Register a callback for cursor changes.

View file

@ -79,6 +79,7 @@ macro_rules! null_check {
}
};
}
pub(crate) use null_check;
impl jni_toolbox::JniToolboxError for crate::errors::ConnectionError {
@ -193,13 +194,13 @@ impl<'j> jni_toolbox::IntoJavaObject<'j> for crate::api::Event {
}
}
impl<'j> jni_toolbox::IntoJavaObject<'j> for crate::api::TextChange {
const CLASS: &'static str = "mp/code/data/TextChange";
impl<'j> jni_toolbox::IntoJavaObject<'j> for crate::api::BufferUpdate {
const CLASS: &'static str = "mp/code/data/BufferUpdate";
fn into_java_object(
self,
env: &mut jni::JNIEnv<'j>,
) -> Result<jni::objects::JObject<'j>, jni::errors::Error> {
let content = env.new_string(self.content)?;
let class = env.find_class(Self::CLASS)?;
let hash_class = env.find_class("java/util/OptionalLong")?;
let hash = if let Some(h) = self.hash {
@ -214,15 +215,36 @@ impl<'j> jni_toolbox::IntoJavaObject<'j> for crate::api::TextChange {
}?
.l()?;
let version = self.version.into_java_object(env)?;
let change = self.change.into_java_object(env)?;
env.new_object(
class,
"(Ljava/util/OptionalLong;[JLmp/code/data/TextChange;)V",
&[
jni::objects::JValueGen::Object(&hash),
jni::objects::JValueGen::Object(&version),
jni::objects::JValueGen::Object(&change),
],
)
}
}
impl<'j> jni_toolbox::IntoJavaObject<'j> for crate::api::TextChange {
const CLASS: &'static str = "mp/code/data/TextChange";
fn into_java_object(
self,
env: &mut jni::JNIEnv<'j>,
) -> Result<jni::objects::JObject<'j>, jni::errors::Error> {
let content = env.new_string(self.content)?;
let class = env.find_class(Self::CLASS)?;
env.new_object(
class,
"(JJLjava/lang/String;Ljava/util/OptionalLong;)V",
"(JJLjava/lang/String;)V",
&[
jni::objects::JValueGen::Long(self.start.into()),
jni::objects::JValueGen::Long(self.end.into()),
jni::objects::JValueGen::Object(&content),
jni::objects::JValueGen::Object(&hash),
],
)
}
@ -234,24 +256,39 @@ impl<'j> jni_toolbox::IntoJavaObject<'j> for crate::api::Cursor {
self,
env: &mut jni::JNIEnv<'j>,
) -> Result<jni::objects::JObject<'j>, jni::errors::Error> {
let class = env.find_class("mp/code/data/Cursor")?;
let buffer = env.new_string(&self.buffer)?;
let user = if let Some(user) = self.user {
env.new_string(user)?.into()
} else {
jni::objects::JObject::null()
};
let class = env.find_class(Self::CLASS)?;
let user = env.new_string(&self.user)?;
let sel = self.sel.into_java_object(env)?;
env.new_object(
class,
"(IIIILjava/lang/String;Ljava/lang/String;)V",
"(Ljava/lang/String;Lmp/code/data/Selection;)V",
&[
jni::objects::JValueGen::Int(self.start.0),
jni::objects::JValueGen::Int(self.start.1),
jni::objects::JValueGen::Int(self.end.0),
jni::objects::JValueGen::Int(self.end.1),
jni::objects::JValueGen::Object(&buffer),
jni::objects::JValueGen::Object(&user),
jni::objects::JValueGen::Object(&sel),
],
)
}
}
impl<'j> jni_toolbox::IntoJavaObject<'j> for crate::api::Selection {
const CLASS: &'static str = "mp/code/data/Selection";
fn into_java_object(
self,
env: &mut jni::JNIEnv<'j>,
) -> Result<jni::objects::JObject<'j>, jni::errors::Error> {
let class = env.find_class(Self::CLASS)?;
let buffer = env.new_string(&self.buffer)?;
env.new_object(
class,
"(IIIILjava/lang/String;)V",
&[
jni::objects::JValueGen::Int(self.start_row),
jni::objects::JValueGen::Int(self.start_col),
jni::objects::JValueGen::Int(self.end_row),
jni::objects::JValueGen::Int(self.end_col),
jni::objects::JValueGen::Object(&buffer),
],
)
}
@ -363,7 +400,7 @@ impl<'j> jni_toolbox::FromJava<'j> for crate::api::Config {
}
}
impl<'j> jni_toolbox::FromJava<'j> for crate::api::Cursor {
impl<'j> jni_toolbox::FromJava<'j> for crate::api::Selection {
type From = jni::objects::JObject<'j>;
fn from_java(
env: &mut jni::JNIEnv<'j>,
@ -384,20 +421,12 @@ impl<'j> jni_toolbox::FromJava<'j> for crate::api::Cursor {
unsafe { env.get_string_unchecked(&jfield.into()) }?.into()
};
let user = {
let jfield = env.get_field(&cursor, "user", "Ljava/lang/String;")?.l()?;
if jfield.is_null() {
None
} else {
Some(unsafe { env.get_string_unchecked(&jfield.into()) }?.into())
}
};
Ok(Self {
start: (start_row, start_col),
end: (end_row, end_col),
start_row,
start_col,
end_row,
end_col,
buffer,
user,
})
}
}
@ -427,21 +456,10 @@ impl<'j> jni_toolbox::FromJava<'j> for crate::api::TextChange {
unsafe { env.get_string_unchecked(&jfield.into()) }?.into()
};
let hash = {
let jfield = env
.get_field(&change, "hash", "Ljava/util/OptionalLong;")?
.l()?;
if env.call_method(&jfield, "isPresent", "()Z", &[])?.z()? {
Some(env.call_method(&jfield, "getAsLong", "()J", &[])?.j()?)
} else {
None
}
};
Ok(Self {
start,
end,
content,
hash,
})
}
}

View file

@ -91,7 +91,7 @@ fn delete_buffer(workspace: &mut Workspace, path: String) -> Result<(), RemoteEr
super::tokio().block_on(workspace.delete(&path))
}
/// Block and receive a workspace event
/// Block and receive a workspace event.
#[jni(package = "mp.code", class = "Workspace")]
fn recv(workspace: &mut Workspace) -> Result<crate::api::Event, ControllerError> {
super::tokio().block_on(workspace.recv())
@ -103,13 +103,13 @@ fn try_recv(workspace: &mut Workspace) -> Result<Option<crate::api::Event>, Cont
super::tokio().block_on(workspace.try_recv())
}
/// Block until a workspace event is available
/// Block until a workspace event is available.
#[jni(package = "mp.code", class = "Workspace")]
fn poll(workspace: &mut Workspace) -> Result<(), ControllerError> {
super::tokio().block_on(workspace.poll())
}
/// Clear previously registered callback
/// Clear previously registered callback.
#[jni(package = "mp.code", class = "Workspace")]
fn clear_callback(workspace: &mut Workspace) {
workspace.clear_callback();

View file

@ -1,5 +1,5 @@
use crate::api::controller::{AsyncReceiver, AsyncSender};
use crate::api::TextChange;
use crate::api::{BufferUpdate, TextChange};
use crate::buffer::controller::BufferController;
use napi::threadsafe_function::{
ErrorStrategy::Fatal, ThreadSafeCallContext, ThreadsafeFunction, ThreadsafeFunctionCallMode,
@ -51,20 +51,20 @@ impl BufferController {
/// Return next buffer event if present
#[napi(js_name = "try_recv")]
pub async fn js_try_recv(&self) -> napi::Result<Option<TextChange>> {
pub async fn js_try_recv(&self) -> napi::Result<Option<BufferUpdate>> {
Ok(self.try_recv().await?)
}
/// Wait for next buffer event and return it
#[napi(js_name = "recv")]
pub async fn js_recv(&self) -> napi::Result<TextChange> {
pub async fn js_recv(&self) -> napi::Result<BufferUpdate> {
Ok(self.recv().await?)
}
/// Send a buffer update to workspace
#[napi(js_name = "send")]
pub async fn js_send(&self, op: TextChange) -> napi::Result<()> {
Ok(self.send(op).await?)
pub fn js_send(&self, op: TextChange) -> napi::Result<()> {
Ok(self.send(op)?)
}
/// Return buffer whole content

View file

@ -6,41 +6,6 @@ use napi::threadsafe_function::{
};
use napi_derive::napi;
#[napi(object, js_name = "Cursor")]
pub struct JsCursor {
/// range of text change, as char indexes in buffer previous state
pub start_row: i32,
pub start_col: i32,
pub end_row: i32,
pub end_col: i32,
pub buffer: String,
pub user: Option<String>,
}
impl From<JsCursor> for crate::api::Cursor {
fn from(value: JsCursor) -> Self {
crate::api::Cursor {
start: (value.start_row, value.start_col),
end: (value.end_row, value.end_col),
buffer: value.buffer,
user: value.user,
}
}
}
impl From<crate::api::Cursor> for JsCursor {
fn from(value: crate::api::Cursor) -> Self {
JsCursor {
start_row: value.start.0,
start_col: value.start.1,
end_row: value.end.0,
end_col: value.end.1,
buffer: value.buffer,
user: value.user.map(|x| x.to_string()),
}
}
}
#[napi]
impl CursorController {
/// Register a callback to be called on receive.
@ -74,19 +39,19 @@ impl CursorController {
/// Send a new cursor event to remote
#[napi(js_name = "send")]
pub async fn js_send(&self, pos: JsCursor) -> napi::Result<()> {
Ok(self.send(crate::api::Cursor::from(pos)).await?)
pub fn js_send(&self, sel: crate::api::Selection) -> napi::Result<()> {
Ok(self.send(sel)?)
}
/// Get next cursor event if available without blocking
#[napi(js_name = "try_recv")]
pub async fn js_try_recv(&self) -> napi::Result<Option<JsCursor>> {
Ok(self.try_recv().await?.map(JsCursor::from))
pub async fn js_try_recv(&self) -> napi::Result<Option<crate::api::Cursor>> {
Ok(self.try_recv().await?.map(crate::api::Cursor::from))
}
/// Block until next
#[napi(js_name = "recv")]
pub async fn js_recv(&self) -> napi::Result<JsCursor> {
pub async fn js_recv(&self) -> napi::Result<crate::api::Cursor> {
Ok(self.recv().await?.into())
}
}

View file

@ -8,8 +8,6 @@ use napi::threadsafe_function::{
};
use napi_derive::napi;
use super::client::JsUser;
#[napi(object, js_name = "Event")]
pub struct JsEvent {
pub r#type: String,
@ -148,12 +146,15 @@ impl Workspace {
/// List users attached to a specific buffer
#[napi(js_name = "list_buffer_users")]
pub async fn js_list_buffer_users(&self, path: String) -> napi::Result<Vec<JsUser>> {
pub async fn js_list_buffer_users(
&self,
path: String,
) -> napi::Result<Vec<crate::ffi::js::client::JsUser>> {
Ok(self
.list_buffer_users(&path)
.await?
.into_iter()
.map(JsUser::from)
.map(super::client::JsUser::from)
.collect())
}
}

View file

@ -11,10 +11,9 @@ impl LuaUserData for CodempBufferController {
Ok(format!("{:?}", this))
});
methods.add_method(
"send",
|_, this, (change,): (CodempTextChange,)| a_sync! { this => this.send(change).await? },
);
methods.add_method("send", |_, this, (change,): (CodempTextChange,)| {
Ok(this.send(change)?)
});
methods.add_method(
"try_recv",
@ -22,21 +21,20 @@ impl LuaUserData for CodempBufferController {
);
methods.add_method("recv", |_, this, ()| a_sync! { this => this.recv().await? });
methods.add_method("poll", |_, this, ()| a_sync! { this => this.poll().await? });
methods.add_method_mut("ack", |_, this, (version,): (Vec<i64>,)| {
Ok(this.ack(version))
});
methods.add_method(
"content",
|_, this, ()| a_sync! { this => this.content().await? },
);
methods.add_method("clear_callback", |_, this, ()| {
this.clear_callback();
Ok(())
});
methods.add_method("clear_callback", |_, this, ()| Ok(this.clear_callback()));
methods.add_method("callback", |_, this, (cb,): (LuaFunction,)| {
this.callback(move |controller: CodempBufferController| {
Ok(this.callback(move |controller: CodempBufferController| {
super::ext::callback().invoke(cb.clone(), controller)
});
Ok(())
}))
});
}
}
@ -47,7 +45,6 @@ impl LuaUserData for CodempTextChange {
fields.add_field_method_get("content", |_, this| Ok(this.content.clone()));
fields.add_field_method_get("start", |_, this| Ok(this.start));
fields.add_field_method_get("end", |_, this| Ok(this.end));
fields.add_field_method_get("hash", |_, this| Ok(this.hash));
// add a 'finish' accessor too because in Lua 'end' is reserved
fields.add_field_method_get("finish", |_, this| Ok(this.end));
}
@ -59,3 +56,18 @@ impl LuaUserData for CodempTextChange {
methods.add_method("apply", |_, this, (txt,): (String,)| Ok(this.apply(&txt)));
}
}
from_lua_serde! { CodempBufferUpdate }
impl LuaUserData for CodempBufferUpdate {
fn add_fields<F: LuaUserDataFields<Self>>(fields: &mut F) {
fields.add_field_method_get("hash", |_, this| Ok(this.hash));
fields.add_field_method_get("version", |_, this| Ok(this.version.clone()));
fields.add_field_method_get("change", |_, this| Ok(this.change.clone()));
}
fn add_methods<M: LuaUserDataMethods<Self>>(methods: &mut M) {
methods.add_meta_method(LuaMetaMethod::ToString, |_, this, ()| {
Ok(format!("{:?}", this))
});
}
}

View file

@ -4,7 +4,6 @@ use mlua_codemp_patch as mlua;
use super::ext::a_sync::a_sync;
use super::ext::from_lua_serde;
use super::ext::lua_tuple;
impl LuaUserData for CodempCursorController {
fn add_methods<M: LuaUserDataMethods<Self>>(methods: &mut M) {
@ -12,10 +11,9 @@ impl LuaUserData for CodempCursorController {
Ok(format!("{:?}", this))
});
methods.add_method(
"send",
|_, this, (cursor,): (CodempCursor,)| a_sync! { this => this.send(cursor).await? },
);
methods.add_method("send", |_, this, (cursor,): (CodempSelection,)| {
Ok(this.send(cursor)?)
});
methods.add_method(
"try_recv",
|_, this, ()| a_sync! { this => this.try_recv().await? },
@ -23,33 +21,42 @@ impl LuaUserData for CodempCursorController {
methods.add_method("recv", |_, this, ()| a_sync! { this => this.recv().await? });
methods.add_method("poll", |_, this, ()| a_sync! { this => this.poll().await? });
methods.add_method("clear_callback", |_, this, ()| {
this.clear_callback();
Ok(())
});
methods.add_method("clear_callback", |_, this, ()| Ok(this.clear_callback()));
methods.add_method("callback", |_, this, (cb,): (LuaFunction,)| {
this.callback(move |controller: CodempCursorController| {
Ok(this.callback(move |controller: CodempCursorController| {
super::ext::callback().invoke(cb.clone(), controller)
});
Ok(())
}))
});
}
}
from_lua_serde! { CodempCursor }
impl LuaUserData for CodempCursor {
fn add_fields<F: LuaUserDataFields<Self>>(fields: &mut F) {
fields.add_field_method_get("user", |_, this| Ok(this.user.clone()));
fields.add_field_method_get("sel", |_, this| Ok(this.sel.clone()));
}
fn add_methods<M: LuaUserDataMethods<Self>>(methods: &mut M) {
methods.add_meta_method(LuaMetaMethod::ToString, |_, this, ()| {
Ok(format!("{:?}", this))
});
}
}
from_lua_serde! { CodempSelection }
impl LuaUserData for CodempSelection {
fn add_fields<F: LuaUserDataFields<Self>>(fields: &mut F) {
fields.add_field_method_get("user", |_, this| Ok(this.user.clone()));
fields.add_field_method_get("buffer", |_, this| Ok(this.buffer.clone()));
fields.add_field_method_get("start", |lua, this| lua_tuple(lua, this.start));
fields.add_field_method_get("end", |lua, this| lua_tuple(lua, this.end));
// add a 'finish' accessor too because in Lua 'end' is reserved
fields.add_field_method_get("finish", |lua, this| lua_tuple(lua, this.end));
fields.add_field_method_get("start_row", |_, this| Ok(this.start_row));
fields.add_field_method_get("start_col", |_, this| Ok(this.start_col));
fields.add_field_method_get("end_row", |_, this| Ok(this.end_row));
fields.add_field_method_get("end_col", |_, this| Ok(this.end_col));
}
fn add_methods<M: LuaUserDataMethods<Self>>(methods: &mut M) {
methods.add_meta_method(LuaMetaMethod::ToString, |_, this, ()| {
Ok(format!("{:?}", this))
});
}
}

View file

@ -51,10 +51,7 @@ impl LuaUserData for Promise {
});
methods.add_method_mut("cancel", |_, this, ()| match this.0.take() {
None => Err(LuaError::runtime("Promise already awaited")),
Some(x) => {
x.abort();
Ok(())
}
Some(x) => Ok(x.abort()),
});
methods.add_method_mut("and_then", |_, this, (cb,): (LuaFunction,)| {
match this.0.take() {

View file

@ -62,55 +62,57 @@ pub(crate) enum LuaCallback {
Invoke(LuaFunction, CallbackArg),
}
pub(crate) enum CallbackArg {
Nil,
Str(String),
VecStr(Vec<String>),
Client(CodempClient),
CursorController(CodempCursorController),
BufferController(CodempBufferController),
Workspace(CodempWorkspace),
Event(CodempEvent),
MaybeEvent(Option<CodempEvent>),
Cursor(CodempCursor),
MaybeCursor(Option<CodempCursor>),
TextChange(CodempTextChange),
MaybeTextChange(Option<CodempTextChange>),
}
impl IntoLua for CallbackArg {
// TODO this basically calls .into_lua() on all enum variants
// i wish i could do this with a Box<dyn IntoLua> or an impl IntoLua
// but IntoLua requires Sized so it can't be made into an object
fn into_lua(self, lua: &Lua) -> LuaResult<LuaValue> {
match self {
CallbackArg::Nil => Ok(LuaValue::Nil),
CallbackArg::Str(x) => x.into_lua(lua),
CallbackArg::Client(x) => x.into_lua(lua),
CallbackArg::CursorController(x) => x.into_lua(lua),
CallbackArg::BufferController(x) => x.into_lua(lua),
CallbackArg::Workspace(x) => x.into_lua(lua),
CallbackArg::VecStr(x) => x.into_lua(lua),
CallbackArg::Event(x) => x.into_lua(lua),
CallbackArg::MaybeEvent(x) => x.into_lua(lua),
CallbackArg::Cursor(x) => x.into_lua(lua),
CallbackArg::MaybeCursor(x) => x.into_lua(lua),
CallbackArg::TextChange(x) => x.into_lua(lua),
CallbackArg::MaybeTextChange(x) => x.into_lua(lua),
macro_rules! callback_args {
($($name:ident : $t:ty ,)*) => {
pub(crate) enum CallbackArg {
Nil,
$(
$name($t),
)*
}
}
impl IntoLua for CallbackArg {
fn into_lua(self, lua: &Lua) -> LuaResult<LuaValue> {
match self {
Self::Nil => Ok(LuaValue::Nil),
$(
Self::$name(x) => x.into_lua(lua),
)*
}
}
}
impl From<()> for CallbackArg {
fn from(_value: ()) -> Self {
Self::Nil
}
}
$(
impl From<$t> for CallbackArg {
fn from(value: $t) -> Self {
Self::$name(value)
}
}
)*
};
}
impl From<()> for CallbackArg { fn from(_: ()) -> Self { CallbackArg::Nil } }
impl From<String> for CallbackArg { fn from(value: String) -> Self { CallbackArg::Str(value) } }
impl From<CodempClient> for CallbackArg { fn from(value: CodempClient) -> Self { CallbackArg::Client(value) } }
impl From<CodempCursorController> for CallbackArg { fn from(value: CodempCursorController) -> Self { CallbackArg::CursorController(value) } }
impl From<CodempBufferController> for CallbackArg { fn from(value: CodempBufferController) -> Self { CallbackArg::BufferController(value) } }
impl From<CodempWorkspace> for CallbackArg { fn from(value: CodempWorkspace) -> Self { CallbackArg::Workspace(value) } }
impl From<Vec<String>> for CallbackArg { fn from(value: Vec<String>) -> Self { CallbackArg::VecStr(value) } }
impl From<CodempEvent> for CallbackArg { fn from(value: CodempEvent) -> Self { CallbackArg::Event(value) } }
impl From<CodempCursor> for CallbackArg { fn from(value: CodempCursor) -> Self { CallbackArg::Cursor(value) } }
impl From<Option<CodempCursor>> for CallbackArg { fn from(value: Option<CodempCursor>) -> Self { CallbackArg::MaybeCursor(value) } }
impl From<CodempTextChange> for CallbackArg { fn from(value: CodempTextChange) -> Self { CallbackArg::TextChange(value) } }
impl From<Option<CodempTextChange>> for CallbackArg { fn from(value: Option<CodempTextChange>) -> Self { CallbackArg::MaybeTextChange(value) } }
impl From<Option<CodempEvent>> for CallbackArg { fn from(value: Option<CodempEvent>) -> Self { CallbackArg::MaybeEvent(value) } }
callback_args! {
Str: String,
VecStr: Vec<String>,
Client: CodempClient,
CursorController: CodempCursorController,
BufferController: CodempBufferController,
Workspace: CodempWorkspace,
Event: CodempEvent,
MaybeEvent: Option<CodempEvent>,
Cursor: CodempCursor,
MaybeCursor: Option<CodempCursor>,
Selection: CodempSelection,
MaybeSelection: Option<CodempSelection>,
TextChange: CodempTextChange,
MaybeTextChange: Option<CodempTextChange>,
BufferUpdate: CodempBufferUpdate,
MaybeBufferUpdate: Option<CodempBufferUpdate>,
}

View file

@ -2,19 +2,9 @@ pub mod a_sync;
pub mod callback;
pub mod log;
use mlua::prelude::*;
use mlua_codemp_patch as mlua;
pub(crate) use a_sync::tokio;
pub(crate) use callback::callback;
pub(crate) fn lua_tuple<T: IntoLua>(lua: &Lua, (a, b): (T, T)) -> LuaResult<LuaTable> {
let table = lua.create_table()?;
table.set(1, a)?;
table.set(2, b)?;
Ok(table)
}
macro_rules! from_lua_serde {
($($t:ty)*) => {
$(

View file

@ -61,16 +61,12 @@ impl LuaUserData for CodempWorkspace {
methods.add_method("poll", |_, this, ()| a_sync! { this => this.poll().await? });
methods.add_method("callback", |_, this, (cb,): (LuaFunction,)| {
this.callback(move |controller: CodempWorkspace| {
Ok(this.callback(move |controller: CodempWorkspace| {
super::ext::callback().invoke(cb.clone(), controller)
});
Ok(())
}))
});
methods.add_method("clear_callbacl", |_, this, ()| {
this.clear_callback();
Ok(())
});
methods.add_method("clear_callbacl", |_, this, ()| Ok(this.clear_callback()));
}
fn add_fields<F: LuaUserDataFields<Self>>(fields: &mut F) {

View file

@ -43,6 +43,8 @@
//! `JNIException`s are however unchecked: there is nothing you can do to recover from them, as they usually represent a severe error in the glue code. If they arise, it's probably a bug.
//!
#![allow(clippy::unit_arg)]
/// java bindings, built with [jni]
#[cfg(feature = "java")]
pub mod java;

View file

@ -1,6 +1,6 @@
use crate::api::controller::{AsyncReceiver, AsyncSender};
use crate::api::Cursor;
use crate::api::TextChange;
use crate::api::{Cursor, Selection};
use crate::buffer::Controller as BufferController;
use crate::cursor::Controller as CursorController;
use pyo3::exceptions::PyValueError;
@ -15,19 +15,21 @@ impl CursorController {
#[pyo3(name = "send")]
fn pysend(
&self,
py: Python,
_py: Python,
path: String,
start: (i32, i32),
end: (i32, i32),
) -> PyResult<Promise> {
let pos = Cursor {
start,
end,
) -> PyResult<()> {
let pos = Selection {
start_row: start.0,
start_col: start.1,
end_row: end.0,
end_col: end.1,
buffer: path,
user: None,
};
let this = self.clone();
a_sync_allow_threads!(py, this.send(pos).await)
this.send(pos)?;
Ok(())
}
#[pyo3(name = "try_recv")]
@ -84,15 +86,15 @@ impl BufferController {
}
#[pyo3(name = "send")]
fn pysend(&self, py: Python, start: u32, end: u32, txt: String) -> PyResult<Promise> {
fn pysend(&self, _py: Python, start: u32, end: u32, txt: String) -> PyResult<()> {
let op = TextChange {
start,
end,
content: txt,
hash: None,
};
let this = self.clone();
a_sync_allow_threads!(py, this.send(op).await)
this.send(op)?;
Ok(())
}
#[pyo3(name = "try_recv")]
@ -141,21 +143,21 @@ impl BufferController {
impl Cursor {
#[getter(start)]
fn pystart(&self) -> (i32, i32) {
self.start
(self.sel.start_row, self.sel.start_col)
}
#[getter(end)]
fn pyend(&self) -> (i32, i32) {
self.end
(self.sel.end_row, self.sel.end_col)
}
#[getter(buffer)]
fn pybuffer(&self) -> String {
self.buffer.clone()
self.sel.buffer.clone()
}
#[getter(user)]
fn pyuser(&self) -> Option<String> {
self.user.clone()
Some(self.user.clone())
}
}

View file

@ -53,7 +53,7 @@
//! use codemp::api::controller::{AsyncSender, AsyncReceiver}; // needed to access trait methods
//! let cursor = workspace.cursor();
//! let event = cursor.recv().await.expect("disconnected while waiting for event!");
//! println!("user {} moved on buffer {}", event.user.unwrap_or_default(), event.buffer);
//! println!("user {} moved on buffer {}", event.user, event.sel.buffer);
//! # };
//! ```
//!
@ -69,8 +69,12 @@
//! # use codemp::api::controller::{AsyncSender, AsyncReceiver};
//! let buffer = workspace.attach("/some/file.txt").await.expect("failed to attach");
//! buffer.content(); // force-sync
//! if let Some(change) = buffer.try_recv().await.unwrap() {
//! println!("content: {}, span: {}-{}", change.content, change.start, change.end);
//! if let Some(mut update) = buffer.try_recv().await.unwrap() {
//! println!(
//! "content: {}, span: {}-{}",
//! update.change.content, update.change.start, update.change.end
//! );
//! buffer.ack(update.version);
//! } // if None, no changes are currently available
//! # };
//! ```

View file

@ -3,8 +3,9 @@
pub use crate::api::{
controller::AsyncReceiver as CodempAsyncReceiver, controller::AsyncSender as CodempAsyncSender,
Config as CodempConfig, Controller as CodempController, Cursor as CodempCursor,
Event as CodempEvent, TextChange as CodempTextChange, User as CodempUser,
BufferUpdate as CodempBufferUpdate, Config as CodempConfig, Controller as CodempController,
Cursor as CodempCursor, Event as CodempEvent, Selection as CodempSelection,
TextChange as CodempTextChange, User as CodempUser,
};
pub use crate::{