From 2fe217ad85dd2f5b581846f825dead7a997ff86c Mon Sep 17 00:00:00 2001 From: alemi Date: Thu, 3 Oct 2024 04:06:54 +0200 Subject: [PATCH] chore: autofmt --- src/api/controller.rs | 11 +++--- src/buffer/worker.rs | 71 ++++++++++++++++++++++------------- src/ffi/lua/ext/a_sync.rs | 2 +- src/ffi/lua/workspace.rs | 34 ++++++++--------- src/ffi/python/controllers.rs | 2 +- src/ffi/python/workspace.rs | 2 +- src/lib.rs | 2 +- src/prelude.rs | 11 ++---- src/workspace.rs | 23 ++++++------ 9 files changed, 84 insertions(+), 74 deletions(-) diff --git a/src/api/controller.rs b/src/api/controller.rs index e15d7bb..dda3b3f 100644 --- a/src/api/controller.rs +++ b/src/api/controller.rs @@ -13,7 +13,7 @@ use crate::errors::ControllerResult; /// /// This generic trait is implemented by actors managing stream procedures, and will generally /// imply a background worker. -/// +/// /// Events can be enqueued for dispatching without blocking with [`AsyncSender::send`]. /// /// For receiving events from the server, an asynchronous API with [`AsyncReceiver::recv`] is @@ -26,11 +26,12 @@ use crate::errors::ControllerResult; /// [`crate::ext::select_buffer`] may provide a useful helper for managing multiple controllers. #[allow(async_fn_in_trait)] #[cfg_attr(feature = "async-trait", async_trait::async_trait)] -pub trait Controller : AsyncSender + AsyncReceiver +pub trait Controller: AsyncSender + AsyncReceiver where Tx: Sized + Sync + Send, Rx: Sized + Sync + Send, -{} +{ +} /// Asynchronous and thread-safe handle to send data over a stream. /// See [`Controller`]'s documentation for details. @@ -38,7 +39,7 @@ where /// Details about the receiving end are left to the implementor. #[allow(async_fn_in_trait)] #[cfg_attr(feature = "async-trait", async_trait::async_trait)] -pub trait AsyncSender : Sized + Send + Sync { +pub trait AsyncSender: Sized + Send + Sync { /// Enqueue a new value to be sent to all other users. async fn send(&self, x: T) -> ControllerResult<()>; } @@ -49,7 +50,7 @@ pub trait AsyncSender : Sized + Send + Sync { /// Details about the sender are left to the implementor. #[allow(async_fn_in_trait)] #[cfg_attr(feature = "async-trait", async_trait::async_trait)] -pub trait AsyncReceiver : Sized + Send + Sync { +pub trait AsyncReceiver: Sized + Send + Sync { /// Block until a value is available and returns it. async fn recv(&self) -> ControllerResult { loop { diff --git a/src/buffer/worker.rs b/src/buffer/worker.rs index de669b3..a5a730f 100644 --- a/src/buffer/worker.rs +++ b/src/buffer/worker.rs @@ -142,7 +142,12 @@ impl BufferController { } impl BufferWorker { - async fn handle_editor_change(&mut self, change: TextChange, ack: oneshot::Sender, tx: &mpsc::Sender) { + async fn handle_editor_change( + &mut self, + change: TextChange, + ack: oneshot::Sender, + tx: &mpsc::Sender, + ) { let agent_id = self.oplog.get_or_create_agent_id(&self.user_id.to_string()); let last_ver = self.oplog.local_version(); // clip to buffer extents @@ -151,20 +156,27 @@ impl BufferWorker { // in case we have a "replace" span if change.is_delete() { - self.branch.delete_without_content(&mut self.oplog, agent_id, clip_start..clip_end); + self.branch + .delete_without_content(&mut self.oplog, agent_id, clip_start..clip_end); } if change.is_insert() { - self.branch.insert(&mut self.oplog, agent_id, clip_start, &change.content); + self.branch + .insert(&mut self.oplog, agent_id, clip_start, &change.content); } if change.is_delete() || change.is_insert() { - tx.send(Operation { data: self.oplog.encode_from(Default::default(), &last_ver) }).await - .unwrap_or_warn("failed to send change!"); - self.latest_version.send(self.oplog.local_version()) + tx.send(Operation { + data: self.oplog.encode_from(Default::default(), &last_ver), + }) + .await + .unwrap_or_warn("failed to send change!"); + self.latest_version + .send(self.oplog.local_version()) .unwrap_or_warn("failed to update latest version!"); } - ack.send(self.branch.local_version()).unwrap_or_warn("controller didn't wait for ack"); + ack.send(self.branch.local_version()) + .unwrap_or_warn("controller didn't wait for ack"); } async fn handle_server_change(&mut self, change: BufferEvent) -> bool { @@ -172,7 +184,8 @@ impl BufferWorker { None => true, // clean exit actually, just weird we caught it here Some(controller) => match self.oplog.decode_and_add(&change.op.data) { Ok(local_version) => { - self.latest_version.send(local_version) + self.latest_version + .send(local_version) .unwrap_or_warn("failed to update latest version!"); for tx in self.pollers.drain(..) { tx.send(()).unwrap_or_warn("could not wake up poller"); @@ -181,56 +194,60 @@ impl BufferWorker { cb.call(BufferController(controller)); // TODO should we run this on another task/thread? } false - }, + } Err(e) => { tracing::error!("could not deserialize operation from server: {}", e); true - }, - } + } + }, } } async fn handle_delta_request(&mut self, last_ver: LocalVersion, tx: oneshot::Sender) { - if let Some((lv, Some(dtop))) = self.oplog + if let Some((lv, Some(dtop))) = self + .oplog .iter_xf_operations_from(&last_ver, self.oplog.local_version_ref()) .next() { // x.0.start should always be after lastver! // this step_ver will be the version after we apply the operation // we give it to the controller so that he knows where it's at. - let step_ver = self.oplog.version_union(&[lv.end-1], &last_ver); + let step_ver = self.oplog.version_union(&[lv.end - 1], &last_ver); self.branch.merge(&self.oplog, &step_ver); let new_local_v = self.branch.local_version(); let hash = if self.timer.step() { Some(crate::ext::hash(self.branch.content().to_string())) - } else { None }; + } else { + None + }; let tc = match dtop.kind { diamond_types::list::operation::OpKind::Ins => { - if dtop.end() - dtop.start() != dtop.content_as_str().unwrap_or_default().len() { + if dtop.end() - dtop.start() != dtop.content_as_str().unwrap_or_default().len() + { tracing::error!("[?!?!] Insert span differs from effective content len (TODO remove this error after a bit)"); } crate::api::change::TextChange { start: dtop.start() as u32, end: dtop.start() as u32, content: dtop.content_as_str().unwrap_or_default().to_string(), - hash - } - }, - - diamond_types::list::operation::OpKind::Del => { - crate::api::change::TextChange { - start: dtop.start() as u32, - end: dtop.end() as u32, - content: dtop.content_as_str().unwrap_or_default().to_string(), - hash + hash, } } + + diamond_types::list::operation::OpKind::Del => crate::api::change::TextChange { + start: dtop.start() as u32, + end: dtop.end() as u32, + content: dtop.content_as_str().unwrap_or_default().to_string(), + hash, + }, }; - tx.send((new_local_v, Some(tc))).unwrap_or_warn("could not update ops channel -- is controller dead?"); + tx.send((new_local_v, Some(tc))) + .unwrap_or_warn("could not update ops channel -- is controller dead?"); } else { - tx.send((last_ver, None)).unwrap_or_warn("could not update ops channel -- is controller dead?"); + tx.send((last_ver, None)) + .unwrap_or_warn("could not update ops channel -- is controller dead?"); } } } diff --git a/src/ffi/lua/ext/a_sync.rs b/src/ffi/lua/ext/a_sync.rs index e53181d..02c672b 100644 --- a/src/ffi/lua/ext/a_sync.rs +++ b/src/ffi/lua/ext/a_sync.rs @@ -54,7 +54,7 @@ impl LuaUserData for Promise { Some(x) => { x.abort(); Ok(()) - }, + } }); methods.add_method_mut("and_then", |_, this, (cb,): (LuaFunction,)| { match this.0.take() { diff --git a/src/ffi/lua/workspace.rs b/src/ffi/lua/workspace.rs index dfa707e..15167c9 100644 --- a/src/ffi/lua/workspace.rs +++ b/src/ffi/lua/workspace.rs @@ -33,9 +33,9 @@ impl LuaUserData for CodempWorkspace { Ok(this.buffer_by_name(&name)) }); - methods.add_method("fetch_buffers", |_, this, ()| - a_sync! { this => this.fetch_buffers().await? } - + methods.add_method( + "fetch_buffers", + |_, this, ()| a_sync! { this => this.fetch_buffers().await? }, ); methods.add_method( "fetch_users", @@ -49,24 +49,21 @@ impl LuaUserData for CodempWorkspace { }, ); - methods.add_method("user_list", |_, this, ()| - Ok(this.user_list()) + methods.add_method("user_list", |_, this, ()| Ok(this.user_list())); + + methods.add_method("recv", |_, this, ()| a_sync! { this => this.recv().await? }); + + methods.add_method( + "try_recv", + |_, this, ()| a_sync! { this => this.try_recv().await? }, ); - methods.add_method("recv", |_, this, ()| - a_sync! { this => this.recv().await? } - ); + methods.add_method("poll", |_, this, ()| a_sync! { this => this.poll().await? }); - methods.add_method("try_recv", |_, this, ()| - a_sync! { this => this.try_recv().await? } - ); - - methods.add_method("poll", |_, this, ()| - a_sync! { this => this.poll().await? } - ); - - methods.add_method("callback", |_, this, (cb,):(LuaFunction,)| { - this.callback(move |controller: CodempWorkspace| super::ext::callback().invoke(cb.clone(), controller)); + methods.add_method("callback", |_, this, (cb,): (LuaFunction,)| { + this.callback(move |controller: CodempWorkspace| { + super::ext::callback().invoke(cb.clone(), controller) + }); Ok(()) }); @@ -74,7 +71,6 @@ impl LuaUserData for CodempWorkspace { this.clear_callback(); Ok(()) }); - } fn add_fields>(fields: &mut F) { diff --git a/src/ffi/python/controllers.rs b/src/ffi/python/controllers.rs index e3bfd8d..6ee1e27 100644 --- a/src/ffi/python/controllers.rs +++ b/src/ffi/python/controllers.rs @@ -1,4 +1,4 @@ -use crate::api::controller::{AsyncSender, AsyncReceiver}; +use crate::api::controller::{AsyncReceiver, AsyncSender}; use crate::api::Cursor; use crate::api::TextChange; use crate::buffer::Controller as BufferController; diff --git a/src/ffi/python/workspace.rs b/src/ffi/python/workspace.rs index 40898ca..106495c 100644 --- a/src/ffi/python/workspace.rs +++ b/src/ffi/python/workspace.rs @@ -1,7 +1,7 @@ +use crate::api::controller::AsyncReceiver; use crate::buffer::Controller as BufferController; use crate::cursor::Controller as CursorController; use crate::workspace::Workspace; -use crate::api::controller::AsyncReceiver; use pyo3::exceptions::PyValueError; use pyo3::prelude::*; diff --git a/src/lib.rs b/src/lib.rs index 8f66ca7..40ad39f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -50,7 +50,7 @@ //! # let client = codemp::Client::connect(codemp::api::Config::new("", "")).await.unwrap(); //! # client.create_workspace("").await.unwrap(); //! # let workspace = client.join_workspace("").await.unwrap(); -//! use codemp::api::controller::{AsyncSender, AsyncReceiver}; // needed to access trait methods +//! use codemp::api::controller::{AsyncSender, AsyncReceiver}; // needed to access trait methods //! let cursor = workspace.cursor(); //! let event = cursor.recv().await.expect("disconnected while waiting for event!"); //! println!("user {} moved on buffer {}", event.user.unwrap_or_default(), event.buffer); diff --git a/src/prelude.rs b/src/prelude.rs index 41f3bec..0fec44d 100644 --- a/src/prelude.rs +++ b/src/prelude.rs @@ -2,14 +2,9 @@ //! All-in-one renamed imports with `use codemp::prelude::*`. pub use crate::api::{ - Controller as CodempController, - controller::AsyncSender as CodempAsyncSender, - controller::AsyncReceiver as CodempAsyncReceiver, - TextChange as CodempTextChange, - Cursor as CodempCursor, - User as CodempUser, - Event as CodempEvent, - Config as CodempConfig, + controller::AsyncReceiver as CodempAsyncReceiver, controller::AsyncSender as CodempAsyncSender, + Config as CodempConfig, Controller as CodempController, Cursor as CodempCursor, + Event as CodempEvent, TextChange as CodempTextChange, User as CodempUser, }; pub use crate::{ diff --git a/src/workspace.rs b/src/workspace.rs index a767c51..fe14388 100644 --- a/src/workspace.rs +++ b/src/workspace.rs @@ -4,8 +4,11 @@ //! Buffers are typically organized in a filetree-like reminiscent of POSIX filesystems. use crate::{ - api::{controller::{AsyncReceiver, ControllerCallback}, Event, User}, - buffer, cursor, + api::{ + controller::{AsyncReceiver, ControllerCallback}, + Event, User, + }, + buffer, cursor, errors::{ConnectionResult, ControllerResult, RemoteResult}, ext::InternallyMutable, network::Services, @@ -24,7 +27,7 @@ use codemp_proto::{ use dashmap::{DashMap, DashSet}; use std::{collections::BTreeSet, sync::Arc}; -use tokio::sync::{mpsc::error::TryRecvError, mpsc}; +use tokio::sync::{mpsc, mpsc::error::TryRecvError}; use tonic::Streaming; use uuid::Uuid; @@ -60,12 +63,7 @@ struct WorkspaceInner { impl AsyncReceiver for Workspace { async fn try_recv(&self) -> ControllerResult> { - match self.0 - .events - .lock() - .await - .try_recv() - { + match self.0.events.lock().await.try_recv() { Ok(x) => Ok(Some(x)), Err(TryRecvError::Empty) => Ok(None), Err(TryRecvError::Disconnected) => Err(crate::errors::ControllerError::Stopped), @@ -74,7 +72,9 @@ impl AsyncReceiver for Workspace { async fn poll(&self) -> ControllerResult<()> { loop { - if !self.0.events.lock().await.is_empty() { break Ok(()) } + if !self.0.events.lock().await.is_empty() { + break Ok(()); + } // TODO disgusting, please send help tokio::time::sleep(std::time::Duration::from_millis(200)).await; } @@ -312,7 +312,8 @@ impl Workspace { /// A filter may be applied, and it may be strict (equality check) or not (starts_with check). // #[cfg_attr(feature = "js", napi)] // https://github.com/napi-rs/napi-rs/issues/1120 pub fn filetree(&self, filter: Option<&str>, strict: bool) -> Vec { - let mut tree = self.0 + let mut tree = self + .0 .filetree .iter() .filter(|f| {