mirror of
https://github.com/hexedtech/codemp.git
synced 2024-11-21 14:54:49 +01:00
Merge branch 'dev' into release/v0.7.2
This commit is contained in:
commit
7ab2650572
22 changed files with 135 additions and 275 deletions
10
dist/java/src/mp/code/BufferController.java
vendored
10
dist/java/src/mp/code/BufferController.java
vendored
|
@ -106,16 +106,6 @@ public final class BufferController {
|
|||
poll(this.ptr);
|
||||
}
|
||||
|
||||
private static native boolean stop(long self);
|
||||
|
||||
/**
|
||||
* Stops the controller. Any further calls to it will fail.
|
||||
* @return true if it was stopped successfully
|
||||
*/
|
||||
public boolean stop() {
|
||||
return stop(this.ptr);
|
||||
}
|
||||
|
||||
private static native void free(long self);
|
||||
|
||||
static {
|
||||
|
|
10
dist/java/src/mp/code/CursorController.java
vendored
10
dist/java/src/mp/code/CursorController.java
vendored
|
@ -83,16 +83,6 @@ public final class CursorController {
|
|||
poll(this.ptr);
|
||||
}
|
||||
|
||||
private static native boolean stop(long self);
|
||||
|
||||
/**
|
||||
* Stops the controller. Any further calls to it will fail.
|
||||
* @return true if it was stopped successfully
|
||||
*/
|
||||
public boolean stop() {
|
||||
return stop(this.ptr);
|
||||
}
|
||||
|
||||
private static native void free(long self);
|
||||
|
||||
static {
|
||||
|
|
6
dist/java/src/mp/code/Workspace.java
vendored
6
dist/java/src/mp/code/Workspace.java
vendored
|
@ -113,14 +113,14 @@ public final class Workspace {
|
|||
return attach_to_buffer(ptr, path);
|
||||
}
|
||||
|
||||
private static native DetachResult detach_from_buffer(long self, String path);
|
||||
private static native boolean detach_from_buffer(long self, String path);
|
||||
|
||||
/**
|
||||
* Detaches from a given buffer.
|
||||
* @param path the path of the buffer to detach from
|
||||
* @return a {@link DetachResult} representing the outcome of the operation
|
||||
* @return a boolean, true only if there are still dangling references preventing controller from stopping
|
||||
*/
|
||||
public DetachResult detachFromBuffer(String path) {
|
||||
public boolean detachFromBuffer(String path) {
|
||||
return detach_from_buffer(this.ptr, path);
|
||||
}
|
||||
|
||||
|
|
14
dist/lua/annotations.lua
vendored
14
dist/lua/annotations.lua
vendored
|
@ -223,7 +223,7 @@ function Workspace:attach(path) end
|
|||
|
||||
---@param path string relative path ("name") of buffer to detach from
|
||||
---@return boolean success
|
||||
---detach from an active buffer, closing all streams. returns false if buffer was no longer active
|
||||
---detach from an active buffer, closing all streams. returns false if there are still dangling references
|
||||
function Workspace:detach(path) end
|
||||
|
||||
---@param filter? string apply a filter to the return elements
|
||||
|
@ -232,6 +232,10 @@ function Workspace:detach(path) end
|
|||
---return the list of available buffers in this workspace, as relative paths from workspace root
|
||||
function Workspace:filetree(filter, strict) end
|
||||
|
||||
---@return string[]
|
||||
---return all names of users currently in this workspace
|
||||
function Workspace:user_list() end
|
||||
|
||||
---@return NilPromise
|
||||
---@async
|
||||
---@nodiscard
|
||||
|
@ -297,10 +301,6 @@ function BufferController:recv() end
|
|||
---block until next text change without returning it
|
||||
function BufferController:poll() end
|
||||
|
||||
---@return boolean success
|
||||
---stop buffer worker and disconnect, returns false if was already stopped
|
||||
function BufferController:stop() end
|
||||
|
||||
---clears any previously registered buffer callback
|
||||
function BufferController:clear_callback() end
|
||||
|
||||
|
@ -354,10 +354,6 @@ function CursorController:recv() end
|
|||
---block until next cursor event without returning it
|
||||
function CursorController:poll() end
|
||||
|
||||
---@return boolean success
|
||||
---stop cursor worker and disconnect, returns false if was already stopped
|
||||
function CursorController:stop() end
|
||||
|
||||
---clears any previously registered cursor callback
|
||||
function CursorController:clear_callback() end
|
||||
|
||||
|
|
2
dist/py/src/codemp/codemp.pyi
vendored
2
dist/py/src/codemp/codemp.pyi
vendored
|
@ -102,7 +102,6 @@ class BufferController:
|
|||
def callback(self,
|
||||
cb: Callable[[BufferController], None]) -> None: ...
|
||||
def clear_callback(self) -> None: ...
|
||||
def stop(self) -> bool: ...
|
||||
|
||||
|
||||
|
||||
|
@ -131,5 +130,4 @@ class CursorController:
|
|||
def callback(self,
|
||||
cb: Callable[[CursorController], None]) -> None: ...
|
||||
def clear_callback(self) -> None: ...
|
||||
def stop(self) -> bool: ...
|
||||
|
||||
|
|
|
@ -5,21 +5,13 @@
|
|||
|
||||
use crate::errors::ControllerResult;
|
||||
|
||||
pub(crate) trait ControllerWorker<T : Sized + Send + Sync> {
|
||||
type Controller : Controller<T>;
|
||||
type Tx;
|
||||
type Rx;
|
||||
|
||||
fn controller(&self) -> Self::Controller;
|
||||
async fn work(self, tx: Self::Tx, rx: Self::Rx);
|
||||
}
|
||||
|
||||
// note that we don't use thiserror's #[from] because we don't want the error structs to contain
|
||||
// these foreign types, and also we want these to be easily constructable
|
||||
|
||||
/// Asynchronous and thread-safe handle to a generic bidirectional stream.
|
||||
///
|
||||
/// This generic trait is implemented by actors managing stream procedures.
|
||||
/// This generic trait is implemented by actors managing stream procedures, and will generally
|
||||
/// imply a background worker.
|
||||
///
|
||||
/// Events can be enqueued for dispatching without blocking with [`Controller::send`].
|
||||
///
|
||||
|
@ -27,6 +19,9 @@ pub(crate) trait ControllerWorker<T : Sized + Send + Sync> {
|
|||
/// provided; if that is not feasible, consider using [`Controller::callback`] or, alternatively,
|
||||
/// [`Controller::poll`] combined with [`Controller::try_recv`].
|
||||
///
|
||||
/// Every [`Controller`]'s worker will stop cleanly when all references to its [`Controller`] have
|
||||
/// been dropped.
|
||||
///
|
||||
/// [`crate::ext::select_buffer`] may provide a useful helper for managing multiple controllers.
|
||||
#[allow(async_fn_in_trait)]
|
||||
#[cfg_attr(feature = "async-trait", async_trait::async_trait)]
|
||||
|
@ -57,15 +52,6 @@ pub trait Controller<T : Sized + Send + Sync> : Sized + Send + Sync {
|
|||
|
||||
/// Attempt to receive a value, return None if nothing is currently available.
|
||||
async fn try_recv(&self) -> ControllerResult<Option<T>>;
|
||||
|
||||
/// Stop underlying worker.
|
||||
///
|
||||
/// After this is called, nothing can be received or sent anymore; however, existing
|
||||
/// controllers will still be accessible until all handles are dropped.
|
||||
///
|
||||
/// Returns true if the stop signal was successfully sent, false if channel was
|
||||
/// closed (probably because worker had already been stopped).
|
||||
fn stop(&self) -> bool;
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -47,7 +47,6 @@ pub(crate) struct BufferControllerInner {
|
|||
pub(crate) last_update: InternallyMutable<diamond_types::LocalVersion>,
|
||||
pub(crate) ops_in: mpsc::UnboundedSender<(TextChange, oneshot::Sender<LocalVersion>)>,
|
||||
pub(crate) poller: mpsc::UnboundedSender<oneshot::Sender<()>>,
|
||||
pub(crate) stopper: mpsc::UnboundedSender<()>, // just exist
|
||||
pub(crate) content_request: mpsc::Sender<oneshot::Sender<String>>,
|
||||
pub(crate) delta_request: mpsc::Sender<DeltaRequest>,
|
||||
pub(crate) callback: watch::Sender<Option<ControllerCallback<BufferController>>>,
|
||||
|
@ -101,8 +100,4 @@ impl Controller<TextChange> for BufferController {
|
|||
tracing::warn!("no active buffer worker to clear callback");
|
||||
}
|
||||
}
|
||||
|
||||
fn stop(&self) -> bool {
|
||||
self.0.stopper.send(()).is_ok()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -5,7 +5,7 @@ use tokio::sync::{mpsc, oneshot, watch};
|
|||
use tonic::Streaming;
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::api::controller::{ControllerCallback, ControllerWorker};
|
||||
use crate::api::controller::ControllerCallback;
|
||||
use crate::api::TextChange;
|
||||
use crate::ext::{IgnorableError, InternallyMutable};
|
||||
|
||||
|
@ -16,21 +16,21 @@ use super::controller::{BufferController, BufferControllerInner};
|
|||
pub(crate) type DeltaOp = (LocalVersion, Option<TextChange>);
|
||||
pub(crate) type DeltaRequest = (LocalVersion, oneshot::Sender<DeltaOp>);
|
||||
|
||||
pub(crate) struct BufferWorker {
|
||||
struct BufferWorker {
|
||||
user_id: Uuid,
|
||||
path: String,
|
||||
latest_version: watch::Sender<diamond_types::LocalVersion>,
|
||||
ops_in: mpsc::UnboundedReceiver<(TextChange, oneshot::Sender<LocalVersion>)>,
|
||||
poller: mpsc::UnboundedReceiver<oneshot::Sender<()>>,
|
||||
pollers: Vec<oneshot::Sender<()>>,
|
||||
content_checkout: mpsc::Receiver<oneshot::Sender<String>>,
|
||||
delta_req: mpsc::Receiver<DeltaRequest>,
|
||||
stop: mpsc::UnboundedReceiver<()>,
|
||||
controller: BufferController,
|
||||
controller: std::sync::Weak<BufferControllerInner>,
|
||||
callback: watch::Receiver<Option<ControllerCallback<BufferController>>>,
|
||||
}
|
||||
|
||||
impl BufferWorker {
|
||||
pub fn new(user_id: Uuid, path: &str) -> Self {
|
||||
impl BufferController {
|
||||
pub(crate) fn spawn(user_id: Uuid, path: &str, tx: mpsc::Sender<Operation>, rx: Streaming<BufferEvent>) -> Self {
|
||||
let init = diamond_types::LocalVersion::default();
|
||||
|
||||
let (latest_version_tx, latest_version_rx) = watch::channel(init.clone());
|
||||
|
@ -42,67 +42,63 @@ impl BufferWorker {
|
|||
|
||||
let (poller_tx, poller_rx) = mpsc::unbounded_channel();
|
||||
|
||||
let (end_tx, end_rx) = mpsc::unbounded_channel();
|
||||
|
||||
let controller = BufferControllerInner {
|
||||
let controller = Arc::new(BufferControllerInner {
|
||||
name: path.to_string(),
|
||||
latest_version: latest_version_rx,
|
||||
last_update: InternallyMutable::new(diamond_types::LocalVersion::default()),
|
||||
ops_in: opin_tx,
|
||||
poller: poller_tx,
|
||||
stopper: end_tx,
|
||||
content_request: req_tx,
|
||||
delta_request: recv_tx,
|
||||
callback: cb_tx,
|
||||
};
|
||||
});
|
||||
|
||||
BufferWorker {
|
||||
let weak = Arc::downgrade(&controller);
|
||||
|
||||
let worker = BufferWorker {
|
||||
user_id,
|
||||
path: path.to_string(),
|
||||
latest_version: latest_version_tx,
|
||||
ops_in: opin_rx,
|
||||
poller: poller_rx,
|
||||
pollers: Vec::new(),
|
||||
stop: end_rx,
|
||||
controller: BufferController(Arc::new(controller)),
|
||||
controller: weak,
|
||||
content_checkout: req_rx,
|
||||
delta_req: recv_rx,
|
||||
callback: cb_rx,
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
impl ControllerWorker<TextChange> for BufferWorker {
|
||||
type Controller = BufferController;
|
||||
type Tx = mpsc::Sender<Operation>;
|
||||
type Rx = Streaming<BufferEvent>;
|
||||
tokio::spawn(async move {
|
||||
BufferController::work(worker, tx, rx).await
|
||||
});
|
||||
|
||||
fn controller(&self) -> BufferController {
|
||||
self.controller.clone()
|
||||
BufferController(controller)
|
||||
}
|
||||
|
||||
async fn work(mut self, tx: Self::Tx, mut rx: Self::Rx) {
|
||||
async fn work(mut worker: BufferWorker, tx: mpsc::Sender<Operation>, mut rx: Streaming<BufferEvent>) {
|
||||
let mut branch = diamond_types::list::Branch::new();
|
||||
let mut oplog = diamond_types::list::OpLog::new();
|
||||
let mut timer = Timer::new(10); // TODO configurable!!
|
||||
tracing::debug!("controller worker started");
|
||||
|
||||
loop {
|
||||
if worker.controller.upgrade().is_none() { break };
|
||||
|
||||
// block until one of these is ready
|
||||
tokio::select! {
|
||||
biased;
|
||||
|
||||
// received stop signal
|
||||
_ = self.stop.recv() => break,
|
||||
|
||||
// received a new poller, add it to collection
|
||||
res = self.poller.recv() => match res {
|
||||
res = worker.poller.recv() => match res {
|
||||
None => break tracing::error!("poller channel closed"),
|
||||
Some(tx) => self.pollers.push(tx),
|
||||
Some(tx) => worker.pollers.push(tx),
|
||||
},
|
||||
|
||||
// received a text change from editor
|
||||
res = self.ops_in.recv() => match res {
|
||||
res = worker.ops_in.recv() => match res {
|
||||
None => break tracing::debug!("stopping: editor closed channel"),
|
||||
Some((change, ack)) => {
|
||||
let agent_id = oplog.get_or_create_agent_id(&self.user_id.to_string());
|
||||
let agent_id = oplog.get_or_create_agent_id(&worker.user_id.to_string());
|
||||
let last_ver = oplog.local_version();
|
||||
// clip to buffer extents
|
||||
let clip_end = std::cmp::min(branch.len(), change.end as usize);
|
||||
|
@ -120,7 +116,7 @@ impl ControllerWorker<TextChange> for BufferWorker {
|
|||
if change.is_delete() || change.is_insert() {
|
||||
tx.send(Operation { data: oplog.encode_from(Default::default(), &last_ver) }).await
|
||||
.unwrap_or_warn("failed to send change!");
|
||||
self.latest_version.send(oplog.local_version())
|
||||
worker.latest_version.send(oplog.local_version())
|
||||
.unwrap_or_warn("failed to update latest version!");
|
||||
}
|
||||
ack.send(branch.local_version()).unwrap_or_warn("controller didn't wait for ack");
|
||||
|
@ -129,18 +125,19 @@ impl ControllerWorker<TextChange> for BufferWorker {
|
|||
|
||||
// received a message from server: add to oplog and update latest version (+unlock pollers)
|
||||
res = rx.message() => match res {
|
||||
Err(_e) => break,
|
||||
Ok(None) => break,
|
||||
Ok(Some(change)) => {
|
||||
match oplog.decode_and_add(&change.op.data) {
|
||||
Err(e) => break tracing::warn!("error receiving from server for buffer {}: {e}", worker.path),
|
||||
Ok(None) => break tracing::info!("disconnected from buffer {}", worker.path),
|
||||
Ok(Some(change)) => match worker.controller.upgrade() {
|
||||
None => break, // clean exit actually, just weird we caught it here
|
||||
Some(controller) => match oplog.decode_and_add(&change.op.data) {
|
||||
Ok(local_version) => {
|
||||
self.latest_version.send(local_version)
|
||||
worker.latest_version.send(local_version)
|
||||
.unwrap_or_warn("failed to update latest version!");
|
||||
for tx in self.pollers.drain(..) {
|
||||
for tx in worker.pollers.drain(..) {
|
||||
tx.send(()).unwrap_or_warn("could not wake up poller");
|
||||
}
|
||||
if let Some(cb) = self.callback.borrow().as_ref() {
|
||||
cb.call(self.controller.clone()); // TODO should we run this on another task/thread?
|
||||
if let Some(cb) = worker.callback.borrow().as_ref() {
|
||||
cb.call(BufferController(controller)); // TODO should we run this on another task/thread?
|
||||
}
|
||||
},
|
||||
Err(e) => tracing::error!("could not deserialize operation from server: {}", e),
|
||||
|
@ -149,7 +146,7 @@ impl ControllerWorker<TextChange> for BufferWorker {
|
|||
},
|
||||
|
||||
// controller is ready to apply change and recv(), calculate it and send it back
|
||||
res = self.delta_req.recv() => match res {
|
||||
res = worker.delta_req.recv() => match res {
|
||||
None => break tracing::error!("no more active controllers: can't send changes"),
|
||||
Some((last_ver, tx)) => {
|
||||
if let Some((lv, Some(dtop))) = oplog.iter_xf_operations_from(&last_ver, oplog.local_version_ref()).next() {
|
||||
|
@ -194,7 +191,7 @@ impl ControllerWorker<TextChange> for BufferWorker {
|
|||
},
|
||||
|
||||
// received a request for full CRDT content
|
||||
res = self.content_checkout.recv() => match res {
|
||||
res = worker.content_checkout.recv() => match res {
|
||||
None => break tracing::error!("no more active controllers: can't update content"),
|
||||
Some(tx) => {
|
||||
branch.merge(&oplog, oplog.local_version_ref());
|
||||
|
@ -204,6 +201,8 @@ impl ControllerWorker<TextChange> for BufferWorker {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
tracing::debug!("controller worker stopped");
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -28,7 +28,6 @@ pub(crate) struct CursorControllerInner {
|
|||
pub(crate) stream: mpsc::Sender<oneshot::Sender<Option<Cursor>>>,
|
||||
pub(crate) poll: mpsc::UnboundedSender<oneshot::Sender<()>>,
|
||||
pub(crate) callback: watch::Sender<Option<ControllerCallback<CursorController>>>,
|
||||
pub(crate) stop: mpsc::UnboundedSender<()>,
|
||||
}
|
||||
|
||||
#[cfg_attr(feature = "async-trait", async_trait::async_trait)]
|
||||
|
@ -81,8 +80,4 @@ impl Controller<Cursor> for CursorController {
|
|||
tracing::warn!("no active cursor worker to clear callback");
|
||||
}
|
||||
}
|
||||
|
||||
fn stop(&self) -> bool {
|
||||
self.0.stop.send(()).is_ok()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -4,104 +4,98 @@ use tokio::sync::{mpsc, oneshot, watch};
|
|||
use tonic::Streaming;
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::{api::{controller::{ControllerCallback, ControllerWorker}, Cursor, User}, ext::IgnorableError};
|
||||
use crate::{api::{controller::ControllerCallback, Cursor, User}, ext::IgnorableError};
|
||||
use codemp_proto::cursor::{CursorPosition, CursorEvent};
|
||||
|
||||
use super::controller::{CursorController, CursorControllerInner};
|
||||
|
||||
pub(crate) struct CursorWorker {
|
||||
struct CursorWorker {
|
||||
op: mpsc::Receiver<CursorPosition>,
|
||||
map: Arc<dashmap::DashMap<Uuid, User>>,
|
||||
stream: mpsc::Receiver<oneshot::Sender<Option<Cursor>>>,
|
||||
poll: mpsc::UnboundedReceiver<oneshot::Sender<()>>,
|
||||
pollers: Vec<oneshot::Sender<()>>,
|
||||
store: std::collections::VecDeque<Cursor>,
|
||||
stop: mpsc::UnboundedReceiver<()>,
|
||||
controller: CursorController,
|
||||
controller: std::sync::Weak<CursorControllerInner>,
|
||||
callback: watch::Receiver<Option<ControllerCallback<CursorController>>>,
|
||||
}
|
||||
|
||||
impl CursorWorker {
|
||||
pub fn new(user_map: Arc<dashmap::DashMap<Uuid, User>>) -> Self {
|
||||
impl CursorController {
|
||||
pub(crate) fn spawn(user_map: Arc<dashmap::DashMap<Uuid, User>>, tx: mpsc::Sender<CursorPosition>, rx: Streaming<CursorEvent>) -> Self {
|
||||
// TODO we should tweak the channel buffer size to better propagate backpressure
|
||||
let (op_tx, op_rx) = mpsc::channel(64);
|
||||
let (stream_tx, stream_rx) = mpsc::channel(1);
|
||||
let (end_tx, end_rx) = mpsc::unbounded_channel();
|
||||
let (cb_tx, cb_rx) = watch::channel(None);
|
||||
let (poll_tx, poll_rx) = mpsc::unbounded_channel();
|
||||
let controller = CursorControllerInner {
|
||||
let controller = Arc::new(CursorControllerInner {
|
||||
op: op_tx,
|
||||
stream: stream_tx,
|
||||
stop: end_tx,
|
||||
callback: cb_tx,
|
||||
poll: poll_tx,
|
||||
};
|
||||
Self {
|
||||
});
|
||||
|
||||
let weak = Arc::downgrade(&controller);
|
||||
|
||||
let worker = CursorWorker {
|
||||
op: op_rx,
|
||||
map: user_map,
|
||||
stream: stream_rx,
|
||||
store: std::collections::VecDeque::default(),
|
||||
stop: end_rx,
|
||||
controller: CursorController(Arc::new(controller)),
|
||||
controller: weak,
|
||||
callback: cb_rx,
|
||||
poll: poll_rx,
|
||||
pollers: Vec::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
impl ControllerWorker<Cursor> for CursorWorker {
|
||||
type Controller = CursorController;
|
||||
type Tx = mpsc::Sender<CursorPosition>;
|
||||
type Rx = Streaming<CursorEvent>;
|
||||
tokio::spawn(async move { CursorController::work(worker, tx, rx).await });
|
||||
|
||||
fn controller(&self) -> CursorController {
|
||||
self.controller.clone()
|
||||
CursorController(controller)
|
||||
}
|
||||
|
||||
async fn work(mut self, tx: Self::Tx, mut rx: Self::Rx) {
|
||||
async fn work(mut worker: CursorWorker, tx: mpsc::Sender<CursorPosition>, mut rx: Streaming<CursorEvent>) {
|
||||
loop {
|
||||
tracing::debug!("cursor worker polling");
|
||||
if worker.controller.upgrade().is_none() { break }; // clean exit: all controllers dropped
|
||||
tokio::select!{
|
||||
biased;
|
||||
|
||||
// received stop signal
|
||||
Some(()) = self.stop.recv() => { break; },
|
||||
|
||||
// new poller
|
||||
Some(poller) = self.poll.recv() => self.pollers.push(poller),
|
||||
Some(poller) = worker.poll.recv() => worker.pollers.push(poller),
|
||||
|
||||
// client moved their cursor
|
||||
Some(op) = self.op.recv() => {
|
||||
Some(op) = worker.op.recv() => {
|
||||
tracing::debug!("received cursor from editor");
|
||||
tx.send(op).await.unwrap_or_warn("could not update cursor");
|
||||
},
|
||||
|
||||
// server sents us a cursor
|
||||
Ok(Some(cur)) = rx.message() => {
|
||||
tracing::debug!("received cursor from server");
|
||||
let mut cursor = Cursor {
|
||||
buffer: cur.position.buffer.path,
|
||||
start: (cur.position.start.row, cur.position.start.col),
|
||||
end: (cur.position.end.row, cur.position.end.col),
|
||||
user: None,
|
||||
};
|
||||
let user_id = Uuid::from(cur.user);
|
||||
if let Some(user) = self.map.get(&user_id) {
|
||||
cursor.user = Some(user.name.clone());
|
||||
}
|
||||
self.store.push_back(cursor);
|
||||
for tx in self.pollers.drain(..) {
|
||||
tx.send(()).unwrap_or_warn("poller dropped before unblocking");
|
||||
}
|
||||
if let Some(cb) = self.callback.borrow().as_ref() {
|
||||
tracing::debug!("running cursor callback");
|
||||
cb.call(self.controller.clone()); // TODO should this run in its own task/thread?
|
||||
}
|
||||
Ok(Some(cur)) = rx.message() => match worker.controller.upgrade() {
|
||||
None => break, // clean exit, just weird that we got it here
|
||||
Some(controller) => {
|
||||
tracing::debug!("received cursor from server");
|
||||
let mut cursor = Cursor {
|
||||
buffer: cur.position.buffer.path,
|
||||
start: (cur.position.start.row, cur.position.start.col),
|
||||
end: (cur.position.end.row, cur.position.end.col),
|
||||
user: None,
|
||||
};
|
||||
let user_id = Uuid::from(cur.user);
|
||||
if let Some(user) = worker.map.get(&user_id) {
|
||||
cursor.user = Some(user.name.clone());
|
||||
}
|
||||
worker.store.push_back(cursor);
|
||||
for tx in worker.pollers.drain(..) {
|
||||
tx.send(()).unwrap_or_warn("poller dropped before unblocking");
|
||||
}
|
||||
if let Some(cb) = worker.callback.borrow().as_ref() {
|
||||
tracing::debug!("running cursor callback");
|
||||
cb.call(CursorController(controller)); // TODO should this run in its own task/thread?
|
||||
}
|
||||
},
|
||||
},
|
||||
|
||||
// client wants to get next cursor event
|
||||
Some(tx) = self.stream.recv() => tx.send(self.store.pop_front())
|
||||
Some(tx) = worker.stream.recv() => tx.send(worker.store.pop_front())
|
||||
.unwrap_or_warn("client gave up receiving"),
|
||||
|
||||
else => break,
|
||||
|
|
|
@ -80,12 +80,6 @@ fn poll(controller: &mut crate::buffer::Controller) -> Result<(), ControllerErro
|
|||
super::tokio().block_on(controller.poll())
|
||||
}
|
||||
|
||||
/// Stop the controller.
|
||||
#[jni(package = "mp.code", class = "BufferController")]
|
||||
fn stop(controller: &mut crate::buffer::Controller) -> bool {
|
||||
controller.stop()
|
||||
}
|
||||
|
||||
/// Called by the Java GC to drop a [crate::buffer::Controller].
|
||||
#[jni(package = "mp.code", class = "BufferController")]
|
||||
fn free(input: jni::sys::jlong) {
|
||||
|
|
|
@ -67,12 +67,6 @@ fn poll(controller: &mut crate::cursor::Controller) -> Result<(), ControllerErro
|
|||
super::tokio().block_on(controller.poll())
|
||||
}
|
||||
|
||||
/// Stop the controller.
|
||||
#[jni(package = "mp.code", class = "CursorController")]
|
||||
fn stop(controller: &mut crate::cursor::Controller) -> bool {
|
||||
controller.stop()
|
||||
}
|
||||
|
||||
/// Called by the Java GC to drop a [crate::cursor::Controller].
|
||||
#[jni(package = "mp.code", class = "CursorController")]
|
||||
fn free(input: jni::sys::jlong) {
|
||||
|
|
|
@ -170,26 +170,6 @@ impl<'j> jni_toolbox::IntoJavaObject<'j> for crate::api::Event {
|
|||
}
|
||||
}
|
||||
|
||||
impl<'j> jni_toolbox::IntoJavaObject<'j> for crate::workspace::DetachResult {
|
||||
const CLASS: &'static str = "mp/code/data/DetachResult";
|
||||
fn into_java_object(self, env: &mut jni::JNIEnv<'j>) -> Result<jni::objects::JObject<'j>, jni::errors::Error> {
|
||||
let ordinal = match self {
|
||||
crate::workspace::DetachResult::NotAttached => 0,
|
||||
crate::workspace::DetachResult::Detaching => 1,
|
||||
crate::workspace::DetachResult::AlreadyDetached => 2
|
||||
};
|
||||
|
||||
let class = env.find_class(Self::CLASS)?;
|
||||
let variants: jni::objects::JObjectArray = env.call_method(
|
||||
class,
|
||||
"getEnumConstants",
|
||||
"()[Ljava/lang/Object;",
|
||||
&[]
|
||||
)?.l()?.into();
|
||||
env.get_object_array_element(variants, ordinal)
|
||||
}
|
||||
}
|
||||
|
||||
impl<'j> jni_toolbox::IntoJavaObject<'j> for crate::api::TextChange {
|
||||
const CLASS: &'static str = "mp/code/data/TextChange";
|
||||
fn into_java_object(self, env: &mut jni::JNIEnv<'j>) -> Result<jni::objects::JObject<'j>, jni::errors::Error> {
|
||||
|
|
|
@ -51,7 +51,7 @@ fn attach_to_buffer(workspace: &mut Workspace, path: String) -> Result<crate::bu
|
|||
|
||||
/// Detach from a buffer.
|
||||
#[jni(package = "mp.code", class = "Workspace")]
|
||||
fn detach_from_buffer(workspace: &mut Workspace, path: String) -> crate::workspace::DetachResult {
|
||||
fn detach_from_buffer(workspace: &mut Workspace, path: String) -> bool {
|
||||
workspace.detach(&path)
|
||||
}
|
||||
|
||||
|
|
|
@ -27,7 +27,7 @@ impl BufferController {
|
|||
}
|
||||
|
||||
#[napi(js_name = "clear_callback")]
|
||||
pub fn js_clear_callback(&self) -> () {
|
||||
pub fn js_clear_callback(&self) {
|
||||
self.clear_callback();
|
||||
}
|
||||
|
||||
|
|
|
@ -18,8 +18,6 @@ impl LuaUserData for CodempBufferController {
|
|||
methods.add_method("recv", |_, this, ()| a_sync! { this => this.recv().await? });
|
||||
methods.add_method("poll", |_, this, ()| a_sync! { this => this.poll().await? });
|
||||
|
||||
methods.add_method("stop", |_, this, ()| Ok(this.stop()));
|
||||
|
||||
methods.add_method("content", |_, this, ()| a_sync! { this => this.content().await? });
|
||||
|
||||
methods.add_method("clear_callback", |_, this, ()| { this.clear_callback(); Ok(()) });
|
||||
|
|
|
@ -19,8 +19,6 @@ impl LuaUserData for CodempCursorController {
|
|||
methods.add_method("recv", |_, this, ()| a_sync! { this => this.recv().await? });
|
||||
methods.add_method("poll", |_, this, ()| a_sync! { this => this.poll().await? });
|
||||
|
||||
methods.add_method("stop", |_, this, ()| Ok(this.stop()));
|
||||
|
||||
methods.add_method("clear_callback", |_, this, ()| { this.clear_callback(); Ok(()) });
|
||||
methods.add_method("callback", |_, this, (cb,):(LuaFunction,)| {
|
||||
this.callback(move |controller: CodempCursorController| super::ext::callback().invoke(cb.clone(), controller));
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
use mlua_codemp_patch as mlua;
|
||||
use mlua::prelude::*;
|
||||
use crate::prelude::*;
|
||||
use crate::workspace::DetachResult;
|
||||
|
||||
use super::ext::a_sync::a_sync;
|
||||
use super::ext::from_lua_serde;
|
||||
|
@ -18,7 +17,7 @@ impl LuaUserData for CodempWorkspace {
|
|||
);
|
||||
|
||||
methods.add_method("detach", |_, this, (name,):(String,)|
|
||||
Ok(matches!(this.detach(&name), DetachResult::Detaching | DetachResult::AlreadyDetached))
|
||||
Ok(this.detach(&name))
|
||||
);
|
||||
|
||||
methods.add_method("delete", |_, this, (name,):(String,)|
|
||||
|
|
|
@ -67,11 +67,6 @@ impl CursorController {
|
|||
fn pyclear_callback(&self) {
|
||||
self.clear_callback();
|
||||
}
|
||||
|
||||
#[pyo3(name = "stop")]
|
||||
fn pystop(&self) -> bool {
|
||||
self.stop()
|
||||
}
|
||||
}
|
||||
|
||||
// need to do manually since Controller is a trait implementation
|
||||
|
@ -137,11 +132,6 @@ impl BufferController {
|
|||
fn pyclear_callback(&self) {
|
||||
self.clear_callback();
|
||||
}
|
||||
|
||||
#[pyo3(name = "stop")]
|
||||
fn pystop(&self) -> bool {
|
||||
self.stop()
|
||||
}
|
||||
}
|
||||
|
||||
// We have to write this manually since
|
||||
|
|
|
@ -172,9 +172,9 @@ impl Config {
|
|||
kwds: Option<Bound<'_, PyDict>>,
|
||||
) -> PyResult<Self> {
|
||||
if let Some(kwgs) = kwds {
|
||||
let host = kwgs.get_item("host")?.map(|e| e.extract().ok()).flatten();
|
||||
let port = kwgs.get_item("port")?.map(|e| e.extract().ok()).flatten();
|
||||
let tls = kwgs.get_item("tls")?.map(|e| e.extract().ok()).flatten();
|
||||
let host = kwgs.get_item("host")?.and_then(|e| e.extract().ok());
|
||||
let port = kwgs.get_item("port")?.and_then(|e| e.extract().ok());
|
||||
let tls = kwgs.get_item("tls")?.and_then(|e| e.extract().ok());
|
||||
|
||||
Ok(Config {
|
||||
username,
|
||||
|
|
|
@ -23,11 +23,7 @@ impl Workspace {
|
|||
|
||||
#[pyo3(name = "detach")]
|
||||
fn pydetach(&self, path: String) -> bool {
|
||||
match self.detach(path.as_str()) {
|
||||
crate::workspace::DetachResult::NotAttached => false,
|
||||
crate::workspace::DetachResult::Detaching => true,
|
||||
crate::workspace::DetachResult::AlreadyDetached => true,
|
||||
}
|
||||
self.detach(path.as_str())
|
||||
}
|
||||
|
||||
#[pyo3(name = "event")]
|
||||
|
|
|
@ -4,9 +4,8 @@
|
|||
//! Buffers are typically organized in a filetree-like reminiscent of POSIX filesystems.
|
||||
|
||||
use crate::{
|
||||
api::{controller::ControllerWorker, Controller, Event, User},
|
||||
buffer::{self, worker::BufferWorker},
|
||||
cursor::{self, worker::CursorWorker},
|
||||
api::{Event, User},
|
||||
buffer, cursor,
|
||||
errors::{ConnectionResult, ControllerResult, RemoteResult},
|
||||
ext::InternallyMutable,
|
||||
network::Services,
|
||||
|
@ -49,9 +48,11 @@ struct WorkspaceInner {
|
|||
user: User, // TODO back-reference to global user id... needed for buffer controllers
|
||||
cursor: cursor::Controller,
|
||||
buffers: DashMap<String, buffer::Controller>,
|
||||
services: Services,
|
||||
// TODO these two are Arced so that the inner worker can hold them without holding the
|
||||
// WorkspaceInner itself, otherwise its impossible to drop Workspace
|
||||
filetree: DashSet<String>,
|
||||
users: Arc<DashMap<Uuid, User>>,
|
||||
services: Services,
|
||||
// TODO can we drop the mutex?
|
||||
events: tokio::sync::Mutex<mpsc::UnboundedReceiver<crate::api::Event>>,
|
||||
}
|
||||
|
@ -79,13 +80,7 @@ impl Workspace {
|
|||
|
||||
let users = Arc::new(DashMap::default());
|
||||
|
||||
let worker = CursorWorker::new(users.clone());
|
||||
let controller = worker.controller();
|
||||
tokio::spawn(async move {
|
||||
tracing::debug!("controller worker started");
|
||||
worker.work(tx, cur_stream).await;
|
||||
tracing::debug!("controller worker stopped");
|
||||
});
|
||||
let controller = cursor::Controller::spawn(users.clone(), tx, cur_stream);
|
||||
|
||||
let ws = Self(Arc::new(WorkspaceInner {
|
||||
name,
|
||||
|
@ -141,14 +136,7 @@ impl Workspace {
|
|||
);
|
||||
let stream = self.0.services.buf().attach(req).await?.into_inner();
|
||||
|
||||
let worker = BufferWorker::new(self.0.user.id, path);
|
||||
let controller = worker.controller();
|
||||
tokio::spawn(async move {
|
||||
tracing::debug!("controller worker started");
|
||||
worker.work(tx, stream).await;
|
||||
tracing::debug!("controller worker stopped");
|
||||
});
|
||||
|
||||
let controller = buffer::Controller::spawn(self.0.user.id, path, tx, stream);
|
||||
self.0.buffers.insert(path.to_string(), controller.clone());
|
||||
|
||||
Ok(controller)
|
||||
|
@ -156,18 +144,19 @@ impl Workspace {
|
|||
|
||||
/// Detach from an active buffer.
|
||||
///
|
||||
/// This option will be carried in background. BufferWorker will be stopped and dropped.
|
||||
/// There may still be some events enqueued in buffers to poll, but the [buffer::Controller] itself won't be
|
||||
/// accessible anymore from [`Workspace`].
|
||||
pub fn detach(&self, path: &str) -> DetachResult {
|
||||
/// This will stop and drop its [`buffer::Controller`].
|
||||
///
|
||||
/// Returns `true` if connectly dropped or wasn't present, `false` if dropped but wasn't last ref
|
||||
///
|
||||
/// If this method returns `false` you have a dangling ref, maybe just waiting for garbage
|
||||
/// collection or maybe preventing the controller from being dropped completely
|
||||
#[allow(clippy::redundant_pattern_matching)] // all cases are clearer this way
|
||||
pub fn detach(&self, path: &str) -> bool {
|
||||
match self.0.buffers.remove(path) {
|
||||
None => DetachResult::NotAttached,
|
||||
Some((_name, controller)) => {
|
||||
if controller.stop() {
|
||||
DetachResult::Detaching
|
||||
} else {
|
||||
DetachResult::AlreadyDetached
|
||||
}
|
||||
None => true, // noop: we werent attached in the first place
|
||||
Some((_name, controller)) => match Arc::into_inner(controller.0) {
|
||||
None => false, // dangling ref! we can't drop this
|
||||
Some(_) => true, // dropping it now
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -241,6 +230,8 @@ impl Workspace {
|
|||
|
||||
/// Delete a buffer.
|
||||
pub async fn delete(&self, path: &str) -> RemoteResult<()> {
|
||||
self.detach(path); // just in case
|
||||
|
||||
let mut workspace_client = self.0.services.ws();
|
||||
workspace_client
|
||||
.delete_buffer(tonic::Request::new(BufferNode {
|
||||
|
@ -248,9 +239,6 @@ impl Workspace {
|
|||
}))
|
||||
.await?;
|
||||
|
||||
if let Some((_name, controller)) = self.0.buffers.remove(path) {
|
||||
controller.stop();
|
||||
}
|
||||
|
||||
self.0.filetree.remove(path);
|
||||
|
||||
|
@ -320,17 +308,24 @@ impl Workspace {
|
|||
tx: mpsc::UnboundedSender<crate::api::Event>,
|
||||
) {
|
||||
// TODO for buffer and cursor controller we invoke the tokio::spawn outside, but here inside..?
|
||||
let inner = self.0.clone();
|
||||
let weak = Arc::downgrade(&self.0);
|
||||
let name = self.id();
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
match stream.message().await {
|
||||
// TODO can we stop responsively rather than poll for Arc being dropped?
|
||||
if weak.upgrade().is_none() { break };
|
||||
let Some(res) = tokio::select!(
|
||||
x = stream.message() => Some(x),
|
||||
_ = tokio::time::sleep(std::time::Duration::from_secs(5)) => None,
|
||||
) else { continue };
|
||||
match res {
|
||||
Err(e) => break tracing::error!("workspace '{}' stream closed: {}", name, e),
|
||||
Ok(None) => break tracing::info!("leaving workspace {}", name),
|
||||
Ok(Some(WorkspaceEvent { event: None })) => {
|
||||
tracing::warn!("workspace {} received empty event", name)
|
||||
}
|
||||
Ok(Some(WorkspaceEvent { event: Some(ev) })) => {
|
||||
let Some(inner) = weak.upgrade() else { break };
|
||||
let update = crate::api::Event::from(&ev);
|
||||
match ev {
|
||||
// user
|
||||
|
@ -350,9 +345,7 @@ impl Workspace {
|
|||
}
|
||||
WorkspaceEventInner::Delete(FileDelete { path }) => {
|
||||
inner.filetree.remove(&path);
|
||||
if let Some((_name, controller)) = inner.buffers.remove(&path) {
|
||||
controller.stop();
|
||||
}
|
||||
let _ = inner.buffers.remove(&path);
|
||||
}
|
||||
}
|
||||
if tx.send(update).is_err() {
|
||||
|
@ -364,28 +357,3 @@ impl Workspace {
|
|||
});
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for WorkspaceInner {
|
||||
fn drop(&mut self) {
|
||||
for entry in self.buffers.iter() {
|
||||
if !entry.value().stop() {
|
||||
tracing::warn!(
|
||||
"could not stop buffer worker {} for workspace {}",
|
||||
entry.value().path(),
|
||||
self.name
|
||||
);
|
||||
}
|
||||
}
|
||||
if !self.cursor.stop() {
|
||||
tracing::warn!("could not stop cursor worker for workspace {}", self.name);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg_attr(any(feature = "py", feature = "py-noabi"), pyo3::pyclass(eq, eq_int))]
|
||||
#[cfg_attr(any(feature = "py", feature = "py-noabi"), derive(PartialEq))]
|
||||
pub enum DetachResult {
|
||||
NotAttached,
|
||||
Detaching,
|
||||
AlreadyDetached,
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue