From ae66f282d453c4bff1aab7a232e303e62f4cfc7a Mon Sep 17 00:00:00 2001 From: zaaarf Date: Thu, 10 Oct 2024 12:04:20 +0200 Subject: [PATCH] feat(java): implemented acking and send/recv separation --- dist/java/src/mp/code/BufferController.java | 26 +++-- dist/java/src/mp/code/CursorController.java | 7 +- dist/java/src/mp/code/data/BufferUpdate.java | 35 ++++++ dist/java/src/mp/code/data/Cursor.java | 34 +----- dist/java/src/mp/code/data/Delta.java | 37 ------ dist/java/src/mp/code/data/Selection.java | 42 +++++++ dist/java/src/mp/code/data/TextChange.java | 9 +- src/api/mod.rs | 2 + src/ffi/java/buffer.rs | 14 ++- src/ffi/java/cursor.rs | 5 +- src/ffi/java/mod.rs | 113 +++++++++++-------- src/ffi/java/workspace.rs | 6 +- 12 files changed, 193 insertions(+), 137 deletions(-) create mode 100644 dist/java/src/mp/code/data/BufferUpdate.java delete mode 100644 dist/java/src/mp/code/data/Delta.java create mode 100644 dist/java/src/mp/code/data/Selection.java diff --git a/dist/java/src/mp/code/BufferController.java b/dist/java/src/mp/code/BufferController.java index 66f4495..99570fa 100644 --- a/dist/java/src/mp/code/BufferController.java +++ b/dist/java/src/mp/code/BufferController.java @@ -1,5 +1,6 @@ package mp.code; +import mp.code.data.BufferUpdate; import mp.code.data.TextChange; import mp.code.exceptions.ControllerException; @@ -42,26 +43,26 @@ public final class BufferController { return get_content(this.ptr); } - private static native TextChange try_recv(long self) throws ControllerException; + private static native BufferUpdate try_recv(long self) throws ControllerException; /** - * Tries to get a {@link TextChange} from the queue if any were present, and returns + * Tries to get a {@link BufferUpdate} from the queue if any were present, and returns * an empty optional otherwise. * @return the first text change in queue, if any are present * @throws ControllerException if the controller was stopped */ - public Optional tryRecv() throws ControllerException { + public Optional tryRecv() throws ControllerException { return Optional.ofNullable(try_recv(this.ptr)); } - private static native TextChange recv(long self) throws ControllerException; + private static native BufferUpdate recv(long self) throws ControllerException; /** - * Blocks until a {@link TextChange} is available and returns it. + * Blocks until a {@link BufferUpdate} is available and returns it. * @return the text change update that occurred * @throws ControllerException if the controller was stopped */ - public TextChange recv() throws ControllerException { + public BufferUpdate recv() throws ControllerException { return recv(this.ptr); } @@ -78,7 +79,7 @@ public final class BufferController { private static native void callback(long self, Consumer cb); /** - * Registers a callback to be invoked whenever a {@link TextChange} occurs. + * Registers a callback to be invoked whenever a {@link BufferUpdate} occurs. * This will not work unless a Java thread has been dedicated to the event loop. * @see Extensions#drive(boolean) */ @@ -106,6 +107,17 @@ public final class BufferController { poll(this.ptr); } + private static native void ack(long self, long[] version); + + /** + * Acknowledges that a certain CRDT version has been correctly applied. + * @param version the version to acknowledge + * @see BufferUpdate#version + */ + public void ack(long[] version) { + ack(this.ptr, version); + } + private static native void free(long self); static { diff --git a/dist/java/src/mp/code/CursorController.java b/dist/java/src/mp/code/CursorController.java index 159e548..6c2cae1 100644 --- a/dist/java/src/mp/code/CursorController.java +++ b/dist/java/src/mp/code/CursorController.java @@ -1,6 +1,7 @@ package mp.code; import mp.code.data.Cursor; +import mp.code.data.Selection; import mp.code.exceptions.ControllerException; import java.util.Optional; @@ -42,13 +43,13 @@ public final class CursorController { return recv(this.ptr); } - private static native void send(long self, Cursor cursor) throws ControllerException; + private static native void send(long self, Selection cursor) throws ControllerException; /** - * Tries to send a {@link Cursor} update. + * Tries to send a {@link Selection} update. * @throws ControllerException if the controller was stopped */ - public void send(Cursor cursor) throws ControllerException { + public void send(Selection cursor) throws ControllerException { send(this.ptr, cursor); } diff --git a/dist/java/src/mp/code/data/BufferUpdate.java b/dist/java/src/mp/code/data/BufferUpdate.java new file mode 100644 index 0000000..9979411 --- /dev/null +++ b/dist/java/src/mp/code/data/BufferUpdate.java @@ -0,0 +1,35 @@ +package mp.code.data; + +import lombok.EqualsAndHashCode; +import lombok.RequiredArgsConstructor; +import lombok.ToString; +import mp.code.Extensions; + +import java.util.OptionalLong; + +/** + * A data class holding information about a buffer update. + */ +@ToString +@EqualsAndHashCode +@RequiredArgsConstructor +@SuppressWarnings("OptionalUsedAsFieldOrParameterType") +public class BufferUpdate { + /** + * The hash of the content after applying it (calculated with {@link Extensions#hash(String)}). + * It is generally meaningless to send, but when received it is an invitation to check the hash + * and forcefully re-sync if necessary. + */ + public final OptionalLong hash; // xxh3 hash + + /** + * The CRDT version after the associated change has been applied. + * You MUST acknowledge that it was applied with {@link mp.code.BufferController#ack(long[])}. + */ + public final long[] version; + + /** + * The {@link TextChange} contained in this buffer update. + */ + public final TextChange change; +} diff --git a/dist/java/src/mp/code/data/Cursor.java b/dist/java/src/mp/code/data/Cursor.java index e132fc9..d52ad58 100644 --- a/dist/java/src/mp/code/data/Cursor.java +++ b/dist/java/src/mp/code/data/Cursor.java @@ -11,37 +11,13 @@ import lombok.ToString; @EqualsAndHashCode @RequiredArgsConstructor public class Cursor { - /** - * The starting row of the cursor position. - * If negative, it is clamped to 0. - */ - public final int startRow; - - /** - * The starting column of the cursor position. - * If negative, it is clamped to 0. - */ - public final int startCol; - - /** - * The ending row of the cursor position. - * If negative, it is clamped to 0. - */ - public final int endRow; - - /** - * The ending column of the cursor position. - * If negative, it is clamped to 0. - */ - public final int endCol; - - /** - * The buffer the cursor is located on. - */ - public final String buffer; - /** * The user who controls the cursor. */ public final String user; + + /** + * The associated selection update. + */ + public final Selection selection; } diff --git a/dist/java/src/mp/code/data/Delta.java b/dist/java/src/mp/code/data/Delta.java deleted file mode 100644 index e04af1c..0000000 --- a/dist/java/src/mp/code/data/Delta.java +++ /dev/null @@ -1,37 +0,0 @@ -package mp.code.data; - -import lombok.Getter; -import mp.code.data.Config; -import mp.code.data.User; -import mp.code.exceptions.ConnectionException; -import mp.code.exceptions.ConnectionRemoteException; - -import java.util.Optional; - -@Getter -public final class Delta { - private final long ptr; - - Delta(long ptr) { - this.ptr = ptr; - Extensions.CLEANER.register(this, () -> free(ptr)); - } - - private static native TextChange get_text_change(long self); - - public mp.code.data.TextChange getTextChange() { - return get_text_change(this.ptr); - } - - private static native void ack_native(long self, boolean success) throws ConnectionException; - - public void ack(boolean success) throws ConnectionException { - return ack_native(this.ptr, success); - } - - private static native void free(long self); - - static { - NativeUtils.loadLibraryIfNeeded(); - } -} diff --git a/dist/java/src/mp/code/data/Selection.java b/dist/java/src/mp/code/data/Selection.java new file mode 100644 index 0000000..cc31cd4 --- /dev/null +++ b/dist/java/src/mp/code/data/Selection.java @@ -0,0 +1,42 @@ +package mp.code.data; + +import lombok.EqualsAndHashCode; +import lombok.RequiredArgsConstructor; +import lombok.ToString; + +/** + * A data class holding information about a cursor selection. + */ +@ToString +@EqualsAndHashCode +@RequiredArgsConstructor +public class Selection { + /** + * The starting row of the cursor position. + * If negative, it is clamped to 0. + */ + public final int startRow; + + /** + * The starting column of the cursor position. + * If negative, it is clamped to 0. + */ + public final int startCol; + + /** + * The ending row of the cursor position. + * If negative, it is clamped to 0. + */ + public final int endRow; + + /** + * The ending column of the cursor position. + * If negative, it is clamped to 0. + */ + public final int endCol; + + /** + * The buffer the cursor is located on. + */ + public final String buffer; +} diff --git a/dist/java/src/mp/code/data/TextChange.java b/dist/java/src/mp/code/data/TextChange.java index 33e7561..3c0a0f9 100644 --- a/dist/java/src/mp/code/data/TextChange.java +++ b/dist/java/src/mp/code/data/TextChange.java @@ -22,7 +22,7 @@ public class TextChange { public final long start; /** - * The endomg position of the change. + * The ending position of the change. * If negative, it is clamped to 0. */ public final long end; @@ -33,13 +33,6 @@ public class TextChange { */ public final String content; - /** - * The hash of the content after applying it (calculated with {@link Extensions#hash(String)}). - * It is generally meaningless to send, but when received it is an invitation to check the hash - * and forcefully re-sync if necessary. - */ - public final OptionalLong hash; // xxh3 hash - /** * Checks if the change represents a deletion. * It does if the starting index is lower than the ending index. diff --git a/src/api/mod.rs b/src/api/mod.rs index 54be1da..158a4d0 100644 --- a/src/api/mod.rs +++ b/src/api/mod.rs @@ -19,9 +19,11 @@ pub mod event; /// data structure for remote users pub mod user; +pub use change::BufferUpdate; pub use change::TextChange; pub use config::Config; pub use controller::Controller; pub use cursor::Cursor; +pub use cursor::Selection; pub use event::Event; pub use user::User; diff --git a/src/ffi/java/buffer.rs b/src/ffi/java/buffer.rs index e65f2b0..ef70838 100644 --- a/src/ffi/java/buffer.rs +++ b/src/ffi/java/buffer.rs @@ -3,7 +3,9 @@ use jni_toolbox::jni; use crate::{ api::{ - change::BufferUpdate, controller::{AsyncReceiver, AsyncSender}, TextChange + controller::{AsyncReceiver, AsyncSender}, + BufferUpdate, + TextChange }, errors::ControllerError, }; @@ -24,9 +26,7 @@ fn get_content(controller: &mut crate::buffer::Controller) -> Result Result, ControllerError> { +fn try_recv(controller: &mut crate::buffer::Controller) -> Result, ControllerError> { super::tokio().block_on(controller.try_recv()) } @@ -98,6 +98,12 @@ fn poll(controller: &mut crate::buffer::Controller) -> Result<(), ControllerErro super::tokio().block_on(controller.poll()) } +/// Acknowledge that a change has been correctly applied. +#[jni(package = "mp.code", class = "BufferController")] +fn ack(controller: &mut crate::buffer::Controller, version: Vec) { + controller.ack(version) +} + /// Called by the Java GC to drop a [crate::buffer::Controller]. #[jni(package = "mp.code", class = "BufferController")] fn free(input: jni::sys::jlong) { diff --git a/src/ffi/java/cursor.rs b/src/ffi/java/cursor.rs index 57fa1d5..2878391 100644 --- a/src/ffi/java/cursor.rs +++ b/src/ffi/java/cursor.rs @@ -2,6 +2,7 @@ use crate::{ api::{ controller::{AsyncReceiver, AsyncSender}, Cursor, + Selection }, errors::ControllerError, }; @@ -24,8 +25,8 @@ fn recv(controller: &mut crate::cursor::Controller) -> Result Result<(), ControllerError> { - controller.send(cursor) +fn send(controller: &mut crate::cursor::Controller, sel: Selection) -> Result<(), ControllerError> { + controller.send(sel) } /// Register a callback for cursor changes. diff --git a/src/ffi/java/mod.rs b/src/ffi/java/mod.rs index 7d4021e..2472cea 100644 --- a/src/ffi/java/mod.rs +++ b/src/ffi/java/mod.rs @@ -79,6 +79,7 @@ macro_rules! null_check { } }; } + pub(crate) use null_check; impl jni_toolbox::JniToolboxError for crate::errors::ConnectionError { @@ -193,6 +194,42 @@ impl<'j> jni_toolbox::IntoJavaObject<'j> for crate::api::Event { } } +impl<'j> jni_toolbox::IntoJavaObject<'j> for crate::api::BufferUpdate { + const CLASS: &'static str = "mp/code/data/BufferUpdate"; + fn into_java_object( + self, + env: &mut jni::JNIEnv<'j>, + ) -> Result, jni::errors::Error> { + let class = env.find_class(Self::CLASS)?; + + let hash_class = env.find_class("java/util/OptionalLong")?; + let hash = if let Some(h) = self.hash { + env.call_static_method( + hash_class, + "of", + "(J)Ljava/util/OptionalLong;", + &[jni::objects::JValueGen::Long(h)], + ) + } else { + env.call_static_method(hash_class, "empty", "()Ljava/util/OptionalLong;", &[]) + }? + .l()?; + + let version = self.version.into_java_object(env)?; + let change = self.change.into_java_object(env)?; + + env.new_object( + class, + "(Ljava/util/OptionalLong;[JLmp/code/data/TextChange;)V", + &[ + jni::objects::JValueGen::Object(&hash), + jni::objects::JValueGen::Object(&version), + jni::objects::JValueGen::Object(&change), + ], + ) + } +} + impl<'j> jni_toolbox::IntoJavaObject<'j> for crate::api::TextChange { const CLASS: &'static str = "mp/code/data/TextChange"; fn into_java_object( @@ -200,11 +237,10 @@ impl<'j> jni_toolbox::IntoJavaObject<'j> for crate::api::TextChange { env: &mut jni::JNIEnv<'j>, ) -> Result, jni::errors::Error> { let content = env.new_string(self.content)?; - let class = env.find_class(Self::CLASS)?; env.new_object( class, - "(JJLjava/lang/String;Ljava/util/OptionalLong;)V", + "(JJLjava/lang/String;)V", &[ jni::objects::JValueGen::Long(self.start.into()), jni::objects::JValueGen::Long(self.end.into()), @@ -220,24 +256,39 @@ impl<'j> jni_toolbox::IntoJavaObject<'j> for crate::api::Cursor { self, env: &mut jni::JNIEnv<'j>, ) -> Result, jni::errors::Error> { - let class = env.find_class("mp/code/data/Cursor")?; - let buffer = env.new_string(&self.buffer)?; - let user = if let Some(user) = self.user { - env.new_string(user)?.into() - } else { - jni::objects::JObject::null() - }; + let class = env.find_class(Self::CLASS)?; + let user = env.new_string(&self.user)?; + let sel = self.sel.into_java_object(env)?; env.new_object( class, - "(IIIILjava/lang/String;Ljava/lang/String;)V", + "(Ljava/lang/String;Lmp/code/data/Selection;)V", &[ - jni::objects::JValueGen::Int(self.start.0), - jni::objects::JValueGen::Int(self.start.1), - jni::objects::JValueGen::Int(self.end.0), - jni::objects::JValueGen::Int(self.end.1), - jni::objects::JValueGen::Object(&buffer), jni::objects::JValueGen::Object(&user), + jni::objects::JValueGen::Object(&sel), + ], + ) + } +} + +impl<'j> jni_toolbox::IntoJavaObject<'j> for crate::api::Selection { + const CLASS: &'static str = "mp/code/data/Selection"; + fn into_java_object( + self, + env: &mut jni::JNIEnv<'j>, + ) -> Result, jni::errors::Error> { + let class = env.find_class(Self::CLASS)?; + let buffer = env.new_string(&self.buffer)?; + + env.new_object( + class, + "(IIIILjava/lang/String;)V", + &[ + jni::objects::JValueGen::Int(self.start_row), + jni::objects::JValueGen::Int(self.start_col), + jni::objects::JValueGen::Int(self.end_row), + jni::objects::JValueGen::Int(self.end_col), + jni::objects::JValueGen::Object(&buffer), ], ) } @@ -261,7 +312,6 @@ from_java_ptr!(crate::Client); from_java_ptr!(crate::Workspace); from_java_ptr!(crate::cursor::Controller); from_java_ptr!(crate::buffer::Controller); -from_java_ptr!(crate::buffer::controller::Delta); impl<'j> jni_toolbox::FromJava<'j> for crate::api::Config { type From = jni::objects::JObject<'j>; @@ -350,7 +400,7 @@ impl<'j> jni_toolbox::FromJava<'j> for crate::api::Config { } } -impl<'j> jni_toolbox::FromJava<'j> for crate::api::Cursor { +impl<'j> jni_toolbox::FromJava<'j> for crate::api::Selection { type From = jni::objects::JObject<'j>; fn from_java( env: &mut jni::JNIEnv<'j>, @@ -371,21 +421,7 @@ impl<'j> jni_toolbox::FromJava<'j> for crate::api::Cursor { unsafe { env.get_string_unchecked(&jfield.into()) }?.into() }; - let user = { - let jfield = env.get_field(&cursor, "user", "Ljava/lang/String;")?.l()?; - if jfield.is_null() { - None - } else { - Some(unsafe { env.get_string_unchecked(&jfield.into()) }?.into()) - } - }; - - Ok(Self { - start: (start_row, start_col), - end: (end_row, end_col), - buffer, - user, - }) + Ok(Self { start_row, start_col, end_row, end_col, buffer }) } } @@ -414,21 +450,10 @@ impl<'j> jni_toolbox::FromJava<'j> for crate::api::TextChange { unsafe { env.get_string_unchecked(&jfield.into()) }?.into() }; - let hash = { - let jfield = env - .get_field(&change, "hash", "Ljava/util/OptionalLong;")? - .l()?; - if env.call_method(&jfield, "isPresent", "()Z", &[])?.z()? { - Some(env.call_method(&jfield, "getAsLong", "()J", &[])?.j()?) - } else { - None - } - }; Ok(Self { start, end, - content, - hash, + content }) } } diff --git a/src/ffi/java/workspace.rs b/src/ffi/java/workspace.rs index 4bd3da8..acdc36f 100644 --- a/src/ffi/java/workspace.rs +++ b/src/ffi/java/workspace.rs @@ -91,7 +91,7 @@ fn delete_buffer(workspace: &mut Workspace, path: String) -> Result<(), RemoteEr super::tokio().block_on(workspace.delete(&path)) } -/// Block and receive a workspace event +/// Block and receive a workspace event. #[jni(package = "mp.code", class = "Workspace")] fn recv(workspace: &mut Workspace) -> Result { super::tokio().block_on(workspace.recv()) @@ -103,13 +103,13 @@ fn try_recv(workspace: &mut Workspace) -> Result, Cont super::tokio().block_on(workspace.try_recv()) } -/// Block until a workspace event is available +/// Block until a workspace event is available. #[jni(package = "mp.code", class = "Workspace")] fn poll(workspace: &mut Workspace) -> Result<(), ControllerError> { super::tokio().block_on(workspace.poll()) } -/// Clear previously registered callback +/// Clear previously registered callback. #[jni(package = "mp.code", class = "Workspace")] fn clear_callback(workspace: &mut Workspace) { workspace.clear_callback();