feat(java): implemented acking and send/recv separation

This commit is contained in:
zaaarf 2024-10-10 12:04:20 +02:00 committed by alemi.dev
parent 51cff040ed
commit ae66f282d4
12 changed files with 193 additions and 137 deletions

View file

@ -1,5 +1,6 @@
package mp.code; package mp.code;
import mp.code.data.BufferUpdate;
import mp.code.data.TextChange; import mp.code.data.TextChange;
import mp.code.exceptions.ControllerException; import mp.code.exceptions.ControllerException;
@ -42,26 +43,26 @@ public final class BufferController {
return get_content(this.ptr); return get_content(this.ptr);
} }
private static native TextChange try_recv(long self) throws ControllerException; private static native BufferUpdate try_recv(long self) throws ControllerException;
/** /**
* Tries to get a {@link TextChange} from the queue if any were present, and returns * Tries to get a {@link BufferUpdate} from the queue if any were present, and returns
* an empty optional otherwise. * an empty optional otherwise.
* @return the first text change in queue, if any are present * @return the first text change in queue, if any are present
* @throws ControllerException if the controller was stopped * @throws ControllerException if the controller was stopped
*/ */
public Optional<TextChange> tryRecv() throws ControllerException { public Optional<BufferUpdate> tryRecv() throws ControllerException {
return Optional.ofNullable(try_recv(this.ptr)); return Optional.ofNullable(try_recv(this.ptr));
} }
private static native TextChange recv(long self) throws ControllerException; private static native BufferUpdate recv(long self) throws ControllerException;
/** /**
* Blocks until a {@link TextChange} is available and returns it. * Blocks until a {@link BufferUpdate} is available and returns it.
* @return the text change update that occurred * @return the text change update that occurred
* @throws ControllerException if the controller was stopped * @throws ControllerException if the controller was stopped
*/ */
public TextChange recv() throws ControllerException { public BufferUpdate recv() throws ControllerException {
return recv(this.ptr); return recv(this.ptr);
} }
@ -78,7 +79,7 @@ public final class BufferController {
private static native void callback(long self, Consumer<BufferController> cb); private static native void callback(long self, Consumer<BufferController> cb);
/** /**
* Registers a callback to be invoked whenever a {@link TextChange} occurs. * Registers a callback to be invoked whenever a {@link BufferUpdate} occurs.
* This will not work unless a Java thread has been dedicated to the event loop. * This will not work unless a Java thread has been dedicated to the event loop.
* @see Extensions#drive(boolean) * @see Extensions#drive(boolean)
*/ */
@ -106,6 +107,17 @@ public final class BufferController {
poll(this.ptr); poll(this.ptr);
} }
private static native void ack(long self, long[] version);
/**
* Acknowledges that a certain CRDT version has been correctly applied.
* @param version the version to acknowledge
* @see BufferUpdate#version
*/
public void ack(long[] version) {
ack(this.ptr, version);
}
private static native void free(long self); private static native void free(long self);
static { static {

View file

@ -1,6 +1,7 @@
package mp.code; package mp.code;
import mp.code.data.Cursor; import mp.code.data.Cursor;
import mp.code.data.Selection;
import mp.code.exceptions.ControllerException; import mp.code.exceptions.ControllerException;
import java.util.Optional; import java.util.Optional;
@ -42,13 +43,13 @@ public final class CursorController {
return recv(this.ptr); return recv(this.ptr);
} }
private static native void send(long self, Cursor cursor) throws ControllerException; private static native void send(long self, Selection cursor) throws ControllerException;
/** /**
* Tries to send a {@link Cursor} update. * Tries to send a {@link Selection} update.
* @throws ControllerException if the controller was stopped * @throws ControllerException if the controller was stopped
*/ */
public void send(Cursor cursor) throws ControllerException { public void send(Selection cursor) throws ControllerException {
send(this.ptr, cursor); send(this.ptr, cursor);
} }

View file

@ -0,0 +1,35 @@
package mp.code.data;
import lombok.EqualsAndHashCode;
import lombok.RequiredArgsConstructor;
import lombok.ToString;
import mp.code.Extensions;
import java.util.OptionalLong;
/**
* A data class holding information about a buffer update.
*/
@ToString
@EqualsAndHashCode
@RequiredArgsConstructor
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
public class BufferUpdate {
/**
* The hash of the content after applying it (calculated with {@link Extensions#hash(String)}).
* It is generally meaningless to send, but when received it is an invitation to check the hash
* and forcefully re-sync if necessary.
*/
public final OptionalLong hash; // xxh3 hash
/**
* The CRDT version after the associated change has been applied.
* You MUST acknowledge that it was applied with {@link mp.code.BufferController#ack(long[])}.
*/
public final long[] version;
/**
* The {@link TextChange} contained in this buffer update.
*/
public final TextChange change;
}

View file

@ -11,37 +11,13 @@ import lombok.ToString;
@EqualsAndHashCode @EqualsAndHashCode
@RequiredArgsConstructor @RequiredArgsConstructor
public class Cursor { public class Cursor {
/**
* The starting row of the cursor position.
* If negative, it is clamped to 0.
*/
public final int startRow;
/**
* The starting column of the cursor position.
* If negative, it is clamped to 0.
*/
public final int startCol;
/**
* The ending row of the cursor position.
* If negative, it is clamped to 0.
*/
public final int endRow;
/**
* The ending column of the cursor position.
* If negative, it is clamped to 0.
*/
public final int endCol;
/**
* The buffer the cursor is located on.
*/
public final String buffer;
/** /**
* The user who controls the cursor. * The user who controls the cursor.
*/ */
public final String user; public final String user;
/**
* The associated selection update.
*/
public final Selection selection;
} }

View file

@ -1,37 +0,0 @@
package mp.code.data;
import lombok.Getter;
import mp.code.data.Config;
import mp.code.data.User;
import mp.code.exceptions.ConnectionException;
import mp.code.exceptions.ConnectionRemoteException;
import java.util.Optional;
@Getter
public final class Delta {
private final long ptr;
Delta(long ptr) {
this.ptr = ptr;
Extensions.CLEANER.register(this, () -> free(ptr));
}
private static native TextChange get_text_change(long self);
public mp.code.data.TextChange getTextChange() {
return get_text_change(this.ptr);
}
private static native void ack_native(long self, boolean success) throws ConnectionException;
public void ack(boolean success) throws ConnectionException {
return ack_native(this.ptr, success);
}
private static native void free(long self);
static {
NativeUtils.loadLibraryIfNeeded();
}
}

View file

@ -0,0 +1,42 @@
package mp.code.data;
import lombok.EqualsAndHashCode;
import lombok.RequiredArgsConstructor;
import lombok.ToString;
/**
* A data class holding information about a cursor selection.
*/
@ToString
@EqualsAndHashCode
@RequiredArgsConstructor
public class Selection {
/**
* The starting row of the cursor position.
* If negative, it is clamped to 0.
*/
public final int startRow;
/**
* The starting column of the cursor position.
* If negative, it is clamped to 0.
*/
public final int startCol;
/**
* The ending row of the cursor position.
* If negative, it is clamped to 0.
*/
public final int endRow;
/**
* The ending column of the cursor position.
* If negative, it is clamped to 0.
*/
public final int endCol;
/**
* The buffer the cursor is located on.
*/
public final String buffer;
}

View file

@ -22,7 +22,7 @@ public class TextChange {
public final long start; public final long start;
/** /**
* The endomg position of the change. * The ending position of the change.
* If negative, it is clamped to 0. * If negative, it is clamped to 0.
*/ */
public final long end; public final long end;
@ -33,13 +33,6 @@ public class TextChange {
*/ */
public final String content; public final String content;
/**
* The hash of the content after applying it (calculated with {@link Extensions#hash(String)}).
* It is generally meaningless to send, but when received it is an invitation to check the hash
* and forcefully re-sync if necessary.
*/
public final OptionalLong hash; // xxh3 hash
/** /**
* Checks if the change represents a deletion. * Checks if the change represents a deletion.
* It does if the starting index is lower than the ending index. * It does if the starting index is lower than the ending index.

View file

@ -19,9 +19,11 @@ pub mod event;
/// data structure for remote users /// data structure for remote users
pub mod user; pub mod user;
pub use change::BufferUpdate;
pub use change::TextChange; pub use change::TextChange;
pub use config::Config; pub use config::Config;
pub use controller::Controller; pub use controller::Controller;
pub use cursor::Cursor; pub use cursor::Cursor;
pub use cursor::Selection;
pub use event::Event; pub use event::Event;
pub use user::User; pub use user::User;

View file

@ -3,7 +3,9 @@ use jni_toolbox::jni;
use crate::{ use crate::{
api::{ api::{
change::BufferUpdate, controller::{AsyncReceiver, AsyncSender}, TextChange controller::{AsyncReceiver, AsyncSender},
BufferUpdate,
TextChange
}, },
errors::ControllerError, errors::ControllerError,
}; };
@ -24,9 +26,7 @@ fn get_content(controller: &mut crate::buffer::Controller) -> Result<String, Con
/// Try to fetch a [TextChange], or return null if there's nothing. /// Try to fetch a [TextChange], or return null if there's nothing.
#[jni(package = "mp.code", class = "BufferController")] #[jni(package = "mp.code", class = "BufferController")]
fn try_recv( fn try_recv(controller: &mut crate::buffer::Controller) -> Result<Option<BufferUpdate>, ControllerError> {
controller: &mut crate::buffer::Controller,
) -> Result<Option<BufferUpdate>, ControllerError> {
super::tokio().block_on(controller.try_recv()) super::tokio().block_on(controller.try_recv())
} }
@ -98,6 +98,12 @@ fn poll(controller: &mut crate::buffer::Controller) -> Result<(), ControllerErro
super::tokio().block_on(controller.poll()) super::tokio().block_on(controller.poll())
} }
/// Acknowledge that a change has been correctly applied.
#[jni(package = "mp.code", class = "BufferController")]
fn ack(controller: &mut crate::buffer::Controller, version: Vec<i64>) {
controller.ack(version)
}
/// Called by the Java GC to drop a [crate::buffer::Controller]. /// Called by the Java GC to drop a [crate::buffer::Controller].
#[jni(package = "mp.code", class = "BufferController")] #[jni(package = "mp.code", class = "BufferController")]
fn free(input: jni::sys::jlong) { fn free(input: jni::sys::jlong) {

View file

@ -2,6 +2,7 @@ use crate::{
api::{ api::{
controller::{AsyncReceiver, AsyncSender}, controller::{AsyncReceiver, AsyncSender},
Cursor, Cursor,
Selection
}, },
errors::ControllerError, errors::ControllerError,
}; };
@ -24,8 +25,8 @@ fn recv(controller: &mut crate::cursor::Controller) -> Result<Cursor, Controller
/// Receive from Java, converts and sends a [Cursor]. /// Receive from Java, converts and sends a [Cursor].
#[jni(package = "mp.code", class = "CursorController")] #[jni(package = "mp.code", class = "CursorController")]
fn send(controller: &mut crate::cursor::Controller, cursor: Cursor) -> Result<(), ControllerError> { fn send(controller: &mut crate::cursor::Controller, sel: Selection) -> Result<(), ControllerError> {
controller.send(cursor) controller.send(sel)
} }
/// Register a callback for cursor changes. /// Register a callback for cursor changes.

View file

@ -79,6 +79,7 @@ macro_rules! null_check {
} }
}; };
} }
pub(crate) use null_check; pub(crate) use null_check;
impl jni_toolbox::JniToolboxError for crate::errors::ConnectionError { impl jni_toolbox::JniToolboxError for crate::errors::ConnectionError {
@ -193,6 +194,42 @@ impl<'j> jni_toolbox::IntoJavaObject<'j> for crate::api::Event {
} }
} }
impl<'j> jni_toolbox::IntoJavaObject<'j> for crate::api::BufferUpdate {
const CLASS: &'static str = "mp/code/data/BufferUpdate";
fn into_java_object(
self,
env: &mut jni::JNIEnv<'j>,
) -> Result<jni::objects::JObject<'j>, jni::errors::Error> {
let class = env.find_class(Self::CLASS)?;
let hash_class = env.find_class("java/util/OptionalLong")?;
let hash = if let Some(h) = self.hash {
env.call_static_method(
hash_class,
"of",
"(J)Ljava/util/OptionalLong;",
&[jni::objects::JValueGen::Long(h)],
)
} else {
env.call_static_method(hash_class, "empty", "()Ljava/util/OptionalLong;", &[])
}?
.l()?;
let version = self.version.into_java_object(env)?;
let change = self.change.into_java_object(env)?;
env.new_object(
class,
"(Ljava/util/OptionalLong;[JLmp/code/data/TextChange;)V",
&[
jni::objects::JValueGen::Object(&hash),
jni::objects::JValueGen::Object(&version),
jni::objects::JValueGen::Object(&change),
],
)
}
}
impl<'j> jni_toolbox::IntoJavaObject<'j> for crate::api::TextChange { impl<'j> jni_toolbox::IntoJavaObject<'j> for crate::api::TextChange {
const CLASS: &'static str = "mp/code/data/TextChange"; const CLASS: &'static str = "mp/code/data/TextChange";
fn into_java_object( fn into_java_object(
@ -200,11 +237,10 @@ impl<'j> jni_toolbox::IntoJavaObject<'j> for crate::api::TextChange {
env: &mut jni::JNIEnv<'j>, env: &mut jni::JNIEnv<'j>,
) -> Result<jni::objects::JObject<'j>, jni::errors::Error> { ) -> Result<jni::objects::JObject<'j>, jni::errors::Error> {
let content = env.new_string(self.content)?; let content = env.new_string(self.content)?;
let class = env.find_class(Self::CLASS)?; let class = env.find_class(Self::CLASS)?;
env.new_object( env.new_object(
class, class,
"(JJLjava/lang/String;Ljava/util/OptionalLong;)V", "(JJLjava/lang/String;)V",
&[ &[
jni::objects::JValueGen::Long(self.start.into()), jni::objects::JValueGen::Long(self.start.into()),
jni::objects::JValueGen::Long(self.end.into()), jni::objects::JValueGen::Long(self.end.into()),
@ -220,24 +256,39 @@ impl<'j> jni_toolbox::IntoJavaObject<'j> for crate::api::Cursor {
self, self,
env: &mut jni::JNIEnv<'j>, env: &mut jni::JNIEnv<'j>,
) -> Result<jni::objects::JObject<'j>, jni::errors::Error> { ) -> Result<jni::objects::JObject<'j>, jni::errors::Error> {
let class = env.find_class("mp/code/data/Cursor")?; let class = env.find_class(Self::CLASS)?;
let buffer = env.new_string(&self.buffer)?; let user = env.new_string(&self.user)?;
let user = if let Some(user) = self.user { let sel = self.sel.into_java_object(env)?;
env.new_string(user)?.into()
} else {
jni::objects::JObject::null()
};
env.new_object( env.new_object(
class, class,
"(IIIILjava/lang/String;Ljava/lang/String;)V", "(Ljava/lang/String;Lmp/code/data/Selection;)V",
&[ &[
jni::objects::JValueGen::Int(self.start.0),
jni::objects::JValueGen::Int(self.start.1),
jni::objects::JValueGen::Int(self.end.0),
jni::objects::JValueGen::Int(self.end.1),
jni::objects::JValueGen::Object(&buffer),
jni::objects::JValueGen::Object(&user), jni::objects::JValueGen::Object(&user),
jni::objects::JValueGen::Object(&sel),
],
)
}
}
impl<'j> jni_toolbox::IntoJavaObject<'j> for crate::api::Selection {
const CLASS: &'static str = "mp/code/data/Selection";
fn into_java_object(
self,
env: &mut jni::JNIEnv<'j>,
) -> Result<jni::objects::JObject<'j>, jni::errors::Error> {
let class = env.find_class(Self::CLASS)?;
let buffer = env.new_string(&self.buffer)?;
env.new_object(
class,
"(IIIILjava/lang/String;)V",
&[
jni::objects::JValueGen::Int(self.start_row),
jni::objects::JValueGen::Int(self.start_col),
jni::objects::JValueGen::Int(self.end_row),
jni::objects::JValueGen::Int(self.end_col),
jni::objects::JValueGen::Object(&buffer),
], ],
) )
} }
@ -261,7 +312,6 @@ from_java_ptr!(crate::Client);
from_java_ptr!(crate::Workspace); from_java_ptr!(crate::Workspace);
from_java_ptr!(crate::cursor::Controller); from_java_ptr!(crate::cursor::Controller);
from_java_ptr!(crate::buffer::Controller); from_java_ptr!(crate::buffer::Controller);
from_java_ptr!(crate::buffer::controller::Delta);
impl<'j> jni_toolbox::FromJava<'j> for crate::api::Config { impl<'j> jni_toolbox::FromJava<'j> for crate::api::Config {
type From = jni::objects::JObject<'j>; type From = jni::objects::JObject<'j>;
@ -350,7 +400,7 @@ impl<'j> jni_toolbox::FromJava<'j> for crate::api::Config {
} }
} }
impl<'j> jni_toolbox::FromJava<'j> for crate::api::Cursor { impl<'j> jni_toolbox::FromJava<'j> for crate::api::Selection {
type From = jni::objects::JObject<'j>; type From = jni::objects::JObject<'j>;
fn from_java( fn from_java(
env: &mut jni::JNIEnv<'j>, env: &mut jni::JNIEnv<'j>,
@ -371,21 +421,7 @@ impl<'j> jni_toolbox::FromJava<'j> for crate::api::Cursor {
unsafe { env.get_string_unchecked(&jfield.into()) }?.into() unsafe { env.get_string_unchecked(&jfield.into()) }?.into()
}; };
let user = { Ok(Self { start_row, start_col, end_row, end_col, buffer })
let jfield = env.get_field(&cursor, "user", "Ljava/lang/String;")?.l()?;
if jfield.is_null() {
None
} else {
Some(unsafe { env.get_string_unchecked(&jfield.into()) }?.into())
}
};
Ok(Self {
start: (start_row, start_col),
end: (end_row, end_col),
buffer,
user,
})
} }
} }
@ -414,21 +450,10 @@ impl<'j> jni_toolbox::FromJava<'j> for crate::api::TextChange {
unsafe { env.get_string_unchecked(&jfield.into()) }?.into() unsafe { env.get_string_unchecked(&jfield.into()) }?.into()
}; };
let hash = {
let jfield = env
.get_field(&change, "hash", "Ljava/util/OptionalLong;")?
.l()?;
if env.call_method(&jfield, "isPresent", "()Z", &[])?.z()? {
Some(env.call_method(&jfield, "getAsLong", "()J", &[])?.j()?)
} else {
None
}
};
Ok(Self { Ok(Self {
start, start,
end, end,
content, content
hash,
}) })
} }
} }

View file

@ -91,7 +91,7 @@ fn delete_buffer(workspace: &mut Workspace, path: String) -> Result<(), RemoteEr
super::tokio().block_on(workspace.delete(&path)) super::tokio().block_on(workspace.delete(&path))
} }
/// Block and receive a workspace event /// Block and receive a workspace event.
#[jni(package = "mp.code", class = "Workspace")] #[jni(package = "mp.code", class = "Workspace")]
fn recv(workspace: &mut Workspace) -> Result<crate::api::Event, ControllerError> { fn recv(workspace: &mut Workspace) -> Result<crate::api::Event, ControllerError> {
super::tokio().block_on(workspace.recv()) super::tokio().block_on(workspace.recv())
@ -103,13 +103,13 @@ fn try_recv(workspace: &mut Workspace) -> Result<Option<crate::api::Event>, Cont
super::tokio().block_on(workspace.try_recv()) super::tokio().block_on(workspace.try_recv())
} }
/// Block until a workspace event is available /// Block until a workspace event is available.
#[jni(package = "mp.code", class = "Workspace")] #[jni(package = "mp.code", class = "Workspace")]
fn poll(workspace: &mut Workspace) -> Result<(), ControllerError> { fn poll(workspace: &mut Workspace) -> Result<(), ControllerError> {
super::tokio().block_on(workspace.poll()) super::tokio().block_on(workspace.poll())
} }
/// Clear previously registered callback /// Clear previously registered callback.
#[jni(package = "mp.code", class = "Workspace")] #[jni(package = "mp.code", class = "Workspace")]
fn clear_callback(workspace: &mut Workspace) { fn clear_callback(workspace: &mut Workspace) {
workspace.clear_callback(); workspace.clear_callback();