mirror of
https://github.com/hexedtech/codemp.git
synced 2024-11-22 15:24:48 +01:00
chore: merge branch 'workspace' into dev
This commit is contained in:
commit
089a4bb43f
22 changed files with 668 additions and 408 deletions
3
.gitignore
vendored
3
.gitignore
vendored
|
@ -7,3 +7,6 @@ Cargo.lock
|
|||
/client/vscode/*.vsix
|
||||
/client/vscode/codemp.node
|
||||
|
||||
.cargo
|
||||
|
||||
.vscode/
|
24
Cargo.toml
24
Cargo.toml
|
@ -10,30 +10,30 @@ name = "codemp"
|
|||
# core
|
||||
tracing = "0.1"
|
||||
# woot
|
||||
codemp-woot = { git = "ssh://git@github.com/codewithotherpeopleandchangenamelater/woot.git", tag = "v0.1.0", optional = true }
|
||||
codemp-woot = { git = "ssh://git@github.com/codewithotherpeopleandchangenamelater/woot.git", features = ["serde"], tag = "v0.1.2", optional = true }
|
||||
# proto
|
||||
uuid = { version = "1.3.1", features = ["v4"], optional = true }
|
||||
tonic = { version = "0.9", features = ["tls", "tls-roots"], optional = true }
|
||||
prost = { version = "0.11.8", optional = true }
|
||||
# api
|
||||
similar = { version = "2.2", features = ["inline"], optional = true }
|
||||
tokio = { version = "1.0", features = ["macros", "rt-multi-thread", "sync", "full"], optional = true }
|
||||
tokio = { version = "1.0", features = ["macros", "rt-multi-thread", "sync"], optional = true }
|
||||
async-trait = { version = "0.1", optional = true }
|
||||
# client
|
||||
md5 = { version = "0.7.0", optional = true }
|
||||
uuid = { version = "1.3.1", features = ["v4"], optional = true }
|
||||
serde_json = { version = "1", optional = true }
|
||||
tokio-stream = { version = "0.1", optional = true }
|
||||
# global
|
||||
lazy_static = { version = "1.4", optional = true }
|
||||
serde = { version = "1.0.193", features = ["derive"] }
|
||||
dashmap = { version = "5.5.3", optional = true }
|
||||
postcard = { version = "1.0.8", optional = true }
|
||||
|
||||
[build-dependencies]
|
||||
tonic-build = "0.9"
|
||||
|
||||
[features]
|
||||
default = ["client"]
|
||||
api = ["woot", "dep:similar", "dep:tokio", "dep:async-trait"]
|
||||
woot = ["dep:codemp-woot"]
|
||||
proto = ["dep:prost", "dep:tonic"]
|
||||
client = ["proto", "api", "dep:tokio", "dep:tokio-stream", "dep:uuid", "dep:md5", "dep:serde_json"]
|
||||
global = ["client", "dep:lazy_static"]
|
||||
sync = ["client"]
|
||||
default = []
|
||||
api = ["dep:async-trait"]
|
||||
woot = ["dep:codemp-woot", "dep:similar"]
|
||||
proto = ["dep:prost", "dep:tonic", "dep:uuid"]
|
||||
client = ["proto", "api", "dep:tokio", "dep:tokio-stream", "dep:uuid", "dep:md5", "dep:serde_json", "dep:dashmap", "dep:postcard"]
|
||||
server = ["proto", "woot"]
|
||||
|
|
19
build.rs
19
build.rs
|
@ -1,5 +1,18 @@
|
|||
fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
tonic_build::compile_protos("proto/buffer.proto")?;
|
||||
tonic_build::compile_protos("proto/cursor.proto")?;
|
||||
tonic_build::configure()
|
||||
// .build_client(cfg!(feature = "client"))
|
||||
// .build_server(cfg!(feature = "server")) // FIXME if false, build fails????
|
||||
// .build_transport(cfg!(feature = "proto"))
|
||||
.compile(
|
||||
&[
|
||||
"proto/common.proto",
|
||||
"proto/cursor.proto",
|
||||
"proto/files.proto",
|
||||
"proto/auth.proto",
|
||||
"proto/workspace.proto",
|
||||
"proto/buffer.proto",
|
||||
],
|
||||
&["proto"],
|
||||
)?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
|
20
proto/auth.proto
Normal file
20
proto/auth.proto
Normal file
|
@ -0,0 +1,20 @@
|
|||
syntax = "proto2";
|
||||
|
||||
package auth;
|
||||
|
||||
// authenticates users, issuing tokens
|
||||
service Auth {
|
||||
// send credentials and join a workspace, returns ready to use token
|
||||
rpc Login (WorkspaceJoinRequest) returns (Token);
|
||||
}
|
||||
|
||||
message Token {
|
||||
required string token = 1;
|
||||
}
|
||||
|
||||
// TODO one-request-to-do-it-all from login to workspace access
|
||||
message WorkspaceJoinRequest {
|
||||
required string username = 1;
|
||||
required string password = 2;
|
||||
optional string workspace_id = 3;
|
||||
}
|
|
@ -1,60 +1,20 @@
|
|||
syntax = "proto3";
|
||||
syntax = "proto2";
|
||||
|
||||
package codemp.buffer;
|
||||
import "common.proto";
|
||||
|
||||
package buffer;
|
||||
|
||||
// handle buffer changes, keep in sync users
|
||||
service Buffer {
|
||||
// attach to a buffer and receive operations
|
||||
rpc Attach (BufferPayload) returns (stream RawOp);
|
||||
// send an operation for a specific buffer
|
||||
rpc Edit (OperationRequest) returns (BufferEditResponse);
|
||||
// create a new buffer
|
||||
rpc Create (BufferPayload) returns (BufferCreateResponse);
|
||||
// get contents of buffer
|
||||
rpc Sync (BufferPayload) returns (BufferResponse);
|
||||
// attach to a buffer and receive operations
|
||||
rpc Attach (stream Operation) returns (stream BufferEvent);
|
||||
}
|
||||
|
||||
// empty request
|
||||
message BufferCreateResponse {}
|
||||
|
||||
// empty request
|
||||
message BufferEditResponse {}
|
||||
|
||||
// raw wire operation sequence event
|
||||
message RawOp {
|
||||
// operation seq serialized to json
|
||||
string opseq = 1;
|
||||
|
||||
// user id that has executed the operation
|
||||
string user = 2;
|
||||
message Operation {
|
||||
required bytes data = 1;
|
||||
}
|
||||
|
||||
// client buffer operation request
|
||||
message OperationRequest {
|
||||
// buffer path to operate onto
|
||||
string path = 1;
|
||||
|
||||
// buffer hash of source state
|
||||
string hash = 2;
|
||||
|
||||
// raw operation sequence
|
||||
RawOp op = 3;
|
||||
}
|
||||
|
||||
// generic buffer operation request
|
||||
message BufferPayload {
|
||||
// buffer path to operate onto
|
||||
string path = 1;
|
||||
|
||||
// user id that is requesting the operation
|
||||
string user = 2;
|
||||
|
||||
// optional buffer full content for replacing
|
||||
optional string content = 3;
|
||||
}
|
||||
|
||||
// response from server with buffer content
|
||||
message BufferResponse {
|
||||
// current buffer content
|
||||
string content = 1;
|
||||
message BufferEvent {
|
||||
required Operation op = 1;
|
||||
required common.Identity user = 2;
|
||||
}
|
||||
|
|
18
proto/common.proto
Normal file
18
proto/common.proto
Normal file
|
@ -0,0 +1,18 @@
|
|||
syntax = "proto2";
|
||||
|
||||
package common;
|
||||
|
||||
|
||||
// a wrapper payload representing an uuid
|
||||
message Identity {
|
||||
// uuid bytes, as string
|
||||
required string id = 1;
|
||||
}
|
||||
|
||||
// a collection of identities
|
||||
message IdentityList {
|
||||
repeated Identity users = 1;
|
||||
}
|
||||
|
||||
//generic Empty message
|
||||
message Empty { }
|
|
@ -1,44 +1,36 @@
|
|||
syntax = "proto3";
|
||||
syntax = "proto2";
|
||||
|
||||
package codemp.cursor;
|
||||
package cursor;
|
||||
import "common.proto";
|
||||
import "files.proto";
|
||||
|
||||
// handle cursor events and broadcast to all users
|
||||
service Cursor {
|
||||
// send cursor movement to server
|
||||
rpc Moved (CursorEvent) returns (MovedResponse);
|
||||
// attach to a workspace and receive cursor events
|
||||
rpc Listen (UserIdentity) returns (stream CursorEvent);
|
||||
// subscribe to a workspace's cursor events
|
||||
rpc Attach (stream cursor.CursorPosition) returns (stream cursor.CursorEvent);
|
||||
}
|
||||
|
||||
// empty request
|
||||
message MovedResponse {}
|
||||
|
||||
// a tuple indicating row and column
|
||||
message RowCol {
|
||||
int32 row = 1;
|
||||
int32 col = 2;
|
||||
required int32 row = 1;
|
||||
required int32 col = 2;
|
||||
}
|
||||
|
||||
// cursor position object
|
||||
message CursorPosition {
|
||||
// path of current buffer this cursor is into
|
||||
string buffer = 1;
|
||||
required files.BufferNode buffer = 1;
|
||||
// cursor start position
|
||||
RowCol start = 2;
|
||||
required RowCol start = 2;
|
||||
// cursor end position
|
||||
RowCol end = 3;
|
||||
required RowCol end = 3;
|
||||
}
|
||||
|
||||
// cursor event, with user id and cursor position
|
||||
message CursorEvent {
|
||||
// user moving the cursor
|
||||
string user = 1;
|
||||
required common.Identity user = 1;
|
||||
// new cursor position
|
||||
CursorPosition position = 2;
|
||||
}
|
||||
|
||||
// payload identifying user for cursor attaching
|
||||
message UserIdentity {
|
||||
// user identifier
|
||||
string id = 1;
|
||||
required CursorPosition position = 2;
|
||||
}
|
||||
|
|
11
proto/files.proto
Normal file
11
proto/files.proto
Normal file
|
@ -0,0 +1,11 @@
|
|||
syntax = "proto2";
|
||||
|
||||
package files;
|
||||
|
||||
message BufferNode {
|
||||
required string path = 1;
|
||||
}
|
||||
|
||||
message BufferTree {
|
||||
repeated BufferNode buffers = 1;
|
||||
}
|
54
proto/workspace.proto
Normal file
54
proto/workspace.proto
Normal file
|
@ -0,0 +1,54 @@
|
|||
syntax = "proto2";
|
||||
|
||||
package workspace;
|
||||
|
||||
import "common.proto";
|
||||
import "files.proto";
|
||||
import "auth.proto";
|
||||
|
||||
service Workspace {
|
||||
rpc Attach (common.Empty) returns (stream WorkspaceEvent);
|
||||
|
||||
rpc CreateBuffer (files.BufferNode) returns (common.Empty);
|
||||
rpc AccessBuffer (files.BufferNode) returns (BufferCredentials);
|
||||
rpc DeleteBuffer (files.BufferNode) returns (common.Empty);
|
||||
|
||||
rpc ListBuffers (common.Empty) returns (files.BufferTree);
|
||||
rpc ListUsers (common.Empty) returns (common.IdentityList);
|
||||
rpc ListBufferUsers (files.BufferNode) returns (common.IdentityList);
|
||||
}
|
||||
|
||||
message WorkspaceEvent {
|
||||
message UserJoin {
|
||||
required common.Identity user = 1;
|
||||
}
|
||||
message UserLeave {
|
||||
required common.Identity user = 1;
|
||||
}
|
||||
message FileCreate {
|
||||
required string path = 1;
|
||||
}
|
||||
message FileRename {
|
||||
required string before = 1;
|
||||
required string after = 2;
|
||||
}
|
||||
message FileDelete {
|
||||
required string path = 1;
|
||||
}
|
||||
|
||||
oneof event {
|
||||
UserJoin join = 1;
|
||||
UserLeave leave = 2;
|
||||
FileCreate create = 3;
|
||||
FileRename rename = 4;
|
||||
FileDelete delete = 5;
|
||||
}
|
||||
}
|
||||
|
||||
// TODO this is very ugly because we can't just return a new token (which is already smelly but whatev), we also need to tell the underlying id so that
|
||||
// the client can put it as metadata while attaching, because it can't really know the underlying id that the server is using for each buffer without
|
||||
// parsing the token itself. meehhhhhh, this bleeds underlying implementation to the upper levels, how can we avoid this??
|
||||
message BufferCredentials {
|
||||
required common.Identity id = 1;
|
||||
required auth.Token token = 2;
|
||||
}
|
|
@ -3,6 +3,9 @@
|
|||
//! an editor-friendly representation of a text change in a buffer
|
||||
//! to easily interface with codemp from various editors
|
||||
|
||||
#[cfg(feature = "woot")]
|
||||
use crate::woot::{WootResult, woot::Woot, crdt::{TextEditor, CRDT, Op}};
|
||||
|
||||
/// an editor-friendly representation of a text change in a buffer
|
||||
///
|
||||
/// this represent a range in the previous state of the string and a new content which should be
|
||||
|
@ -27,6 +30,7 @@ pub struct TextChange {
|
|||
}
|
||||
|
||||
impl TextChange {
|
||||
#[cfg(feature = "woot")]
|
||||
/// create a new TextChange from the difference of given strings
|
||||
pub fn from_diff(before: &str, after: &str) -> TextChange {
|
||||
let diff = similar::TextDiff::from_chars(before, after);
|
||||
|
@ -57,6 +61,34 @@ impl TextChange {
|
|||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "woot")]
|
||||
/// consume the [TextChange], transforming it into a Vec of [woot::crdt::Op]
|
||||
pub fn transform(self, woot: &Woot) -> WootResult<Vec<Op>> {
|
||||
let mut out = Vec::new();
|
||||
if self.is_empty() { return Ok(out); } // no-op
|
||||
let view = woot.view();
|
||||
let Some(span) = view.get(self.span.clone()) else {
|
||||
return Err(crate::woot::WootError::OutOfBounds);
|
||||
};
|
||||
let diff = similar::TextDiff::from_chars(span, &self.content);
|
||||
for (i, diff) in diff.iter_all_changes().enumerate() {
|
||||
match diff.tag() {
|
||||
similar::ChangeTag::Equal => {},
|
||||
similar::ChangeTag::Delete => match woot.delete_one(self.span.start + i) {
|
||||
Err(e) => tracing::error!("could not create deletion: {}", e),
|
||||
Ok(op) => out.push(op),
|
||||
},
|
||||
similar::ChangeTag::Insert => {
|
||||
match woot.insert(self.span.start + i, diff.value()) {
|
||||
Ok(mut op) => out.append(&mut op),
|
||||
Err(e) => tracing::error!("could not create insertion: {}", e),
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
Ok(out)
|
||||
}
|
||||
|
||||
/// returns true if this TextChange deletes existing text
|
||||
pub fn is_deletion(&self) -> bool {
|
||||
!self.span.is_empty()
|
||||
|
@ -82,11 +114,12 @@ impl TextChange {
|
|||
|
||||
/// convert from byte index to row and column
|
||||
/// txt must be the whole content of the buffer, in order to count lines
|
||||
pub fn index_to_rowcol(txt: &str, index: usize) -> crate::proto::RowCol {
|
||||
#[cfg(feature = "proto")]
|
||||
pub fn index_to_rowcol(txt: &str, index: usize) -> crate::proto::cursor::RowCol {
|
||||
// FIXME might panic, use .get()
|
||||
let row = txt[..index].matches('\n').count() as i32;
|
||||
let col = txt[..index].split('\n').last().unwrap_or("").len() as i32;
|
||||
crate::proto::RowCol { row, col }
|
||||
crate::proto::cursor::RowCol { row, col }
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -19,8 +19,7 @@ pub(crate) trait ControllerWorker<T : Sized + Send + Sync> {
|
|||
///
|
||||
/// this generic trait is implemented by actors managing stream procedures.
|
||||
/// events can be enqueued for dispatching without blocking ([Controller::send]), and an async blocking
|
||||
/// api ([Controller::recv]) is provided to wait for server events. Additional sync blocking
|
||||
/// ([Controller::blocking_recv]) is implemented if feature `sync` is enabled.
|
||||
/// api ([Controller::recv]) is provided to wait for server events.
|
||||
///
|
||||
/// * if possible, prefer a pure [Controller::recv] consumer, awaiting for events
|
||||
/// * if async is not feasible a [Controller::poll]/[Controller::try_recv] approach is possible
|
||||
|
@ -58,11 +57,4 @@ pub trait Controller<T : Sized + Send + Sync> : Sized + Send + Sync {
|
|||
|
||||
/// attempt to receive a value without blocking, return None if nothing is available
|
||||
fn try_recv(&self) -> Result<Option<T>>;
|
||||
|
||||
/// sync variant of [Self::recv], blocking invoking thread
|
||||
/// this calls [Controller::recv] inside a [tokio::runtime::Runtime::block_on]
|
||||
#[cfg(feature = "sync")]
|
||||
fn blocking_recv(&self, rt: &tokio::runtime::Handle) -> Result<T> {
|
||||
rt.block_on(self.recv())
|
||||
}
|
||||
}
|
||||
|
|
|
@ -9,6 +9,9 @@
|
|||
/// buffer controller implementation
|
||||
pub mod controller;
|
||||
|
||||
/// assorted helpers to handle buffer controllers
|
||||
pub mod tools;
|
||||
|
||||
pub(crate) mod worker;
|
||||
|
||||
pub use controller::BufferController as Controller;
|
||||
|
|
|
@ -1,24 +1,21 @@
|
|||
use std::collections::hash_map::DefaultHasher;
|
||||
use std::hash::{Hash, Hasher};
|
||||
|
||||
use similar::{TextDiff, ChangeTag};
|
||||
use tokio::sync::{watch, mpsc, oneshot};
|
||||
use tonic::transport::Channel;
|
||||
use tonic::{async_trait, Streaming};
|
||||
use woot::crdt::{Op, CRDT, TextEditor};
|
||||
use uuid::Uuid;
|
||||
use woot::crdt::{Op, CRDT};
|
||||
use woot::woot::Woot;
|
||||
|
||||
use crate::errors::IgnorableError;
|
||||
use crate::proto::{OperationRequest, RawOp};
|
||||
use crate::proto::buffer_client::BufferClient;
|
||||
use crate::api::controller::ControllerWorker;
|
||||
use crate::api::TextChange;
|
||||
use crate::proto::buffer::{BufferEvent, Operation};
|
||||
|
||||
use super::controller::BufferController;
|
||||
|
||||
|
||||
pub(crate) struct BufferControllerWorker {
|
||||
uid: String,
|
||||
pub(crate) struct BufferWorker {
|
||||
_user_id: Uuid,
|
||||
name: String,
|
||||
buffer: Woot,
|
||||
content: watch::Sender<String>,
|
||||
|
@ -36,17 +33,17 @@ struct ClonableHandlesForController {
|
|||
content: watch::Receiver<String>,
|
||||
}
|
||||
|
||||
impl BufferControllerWorker {
|
||||
pub fn new(uid: String, path: &str) -> Self {
|
||||
impl BufferWorker {
|
||||
pub fn new(user_id: Uuid, path: &str) -> Self {
|
||||
let (txt_tx, txt_rx) = watch::channel("".to_string());
|
||||
let (op_tx, op_rx) = mpsc::unbounded_channel();
|
||||
let (end_tx, end_rx) = mpsc::unbounded_channel();
|
||||
let (poller_tx, poller_rx) = mpsc::unbounded_channel();
|
||||
let mut hasher = DefaultHasher::new();
|
||||
uid.hash(&mut hasher);
|
||||
user_id.hash(&mut hasher);
|
||||
let site_id = hasher.finish() as usize;
|
||||
BufferControllerWorker {
|
||||
uid,
|
||||
BufferWorker {
|
||||
_user_id: user_id,
|
||||
name: path.to_string(),
|
||||
buffer: Woot::new(site_id % (2<<10), ""), // TODO remove the modulo, only for debugging!
|
||||
content: txt_tx,
|
||||
|
@ -62,26 +59,13 @@ impl BufferControllerWorker {
|
|||
stop: end_rx,
|
||||
}
|
||||
}
|
||||
|
||||
async fn send_op(&self, tx: &mut BufferClient<Channel>, outbound: &Op) -> crate::Result<()> {
|
||||
let opseq = serde_json::to_string(outbound).expect("could not serialize opseq");
|
||||
let req = OperationRequest {
|
||||
path: self.name.clone(),
|
||||
hash: format!("{:x}", md5::compute(self.buffer.view())),
|
||||
op: Some(RawOp {
|
||||
opseq, user: self.uid.clone(),
|
||||
}),
|
||||
};
|
||||
let _ = tx.edit(req).await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl ControllerWorker<TextChange> for BufferControllerWorker {
|
||||
impl ControllerWorker<TextChange> for BufferWorker {
|
||||
type Controller = BufferController;
|
||||
type Tx = BufferClient<Channel>;
|
||||
type Rx = Streaming<RawOp>;
|
||||
type Tx = mpsc::Sender<Operation>;
|
||||
type Rx = Streaming<BufferEvent>;
|
||||
|
||||
fn subscribe(&self) -> BufferController {
|
||||
BufferController::new(
|
||||
|
@ -93,7 +77,7 @@ impl ControllerWorker<TextChange> for BufferControllerWorker {
|
|||
)
|
||||
}
|
||||
|
||||
async fn work(mut self, mut tx: Self::Tx, mut rx: Self::Rx) {
|
||||
async fn work(mut self, tx: Self::Tx, mut rx: Self::Rx) {
|
||||
loop {
|
||||
// block until one of these is ready
|
||||
tokio::select! {
|
||||
|
@ -110,49 +94,22 @@ impl ControllerWorker<TextChange> for BufferControllerWorker {
|
|||
|
||||
// received a text change from editor
|
||||
res = self.operations.recv() => match res {
|
||||
None => break,
|
||||
Some(change) => {
|
||||
if !change.is_empty() {
|
||||
let view = self.buffer.view();
|
||||
match view.get(change.span.clone()) {
|
||||
None => tracing::error!("received illegal span from client: {:?} but buffer is of len {}", change.span, view.len()),
|
||||
Some(span) => {
|
||||
let diff = TextDiff::from_chars(span, &change.content);
|
||||
|
||||
let mut i = 0;
|
||||
let mut ops = Vec::new();
|
||||
for diff in diff.iter_all_changes() {
|
||||
match diff.tag() {
|
||||
ChangeTag::Equal => i += 1,
|
||||
ChangeTag::Delete => match self.buffer.delete(change.span.start + i) {
|
||||
Ok(op) => ops.push(op),
|
||||
Err(e) => tracing::error!("could not apply deletion: {}", e),
|
||||
},
|
||||
ChangeTag::Insert => {
|
||||
for c in diff.value().chars() {
|
||||
match self.buffer.insert(change.span.start + i, c) {
|
||||
Ok(op) => {
|
||||
ops.push(op);
|
||||
i += 1;
|
||||
},
|
||||
Err(e) => tracing::error!("could not apply insertion: {}", e),
|
||||
}
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
for op in ops {
|
||||
match self.send_op(&mut tx, &op).await {
|
||||
Err(e) => tracing::error!("server refused to broadcast {}: {}", op, e),
|
||||
Ok(()) => {
|
||||
self.content.send(self.buffer.view()).unwrap_or_warn("could not send buffer update");
|
||||
},
|
||||
}
|
||||
}
|
||||
},
|
||||
None => break tracing::debug!("stopping: editor closed channel"),
|
||||
Some(change) => match change.transform(&self.buffer) {
|
||||
Err(e) => break tracing::error!("could not apply operation from client: {}", e),
|
||||
Ok(ops) => {
|
||||
for op in ops {
|
||||
self.buffer.merge(op.clone());
|
||||
let operation = Operation {
|
||||
data: postcard::to_extend(&op, Vec::new()).unwrap(),
|
||||
};
|
||||
if let Err(e) = tx.send(operation).await {
|
||||
tracing::error!("server refused to broadcast {}: {}", op, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
self.content.send(self.buffer.view())
|
||||
.unwrap_or_warn("could not send buffer update");
|
||||
},
|
||||
}
|
||||
},
|
||||
|
||||
|
@ -160,8 +117,8 @@ impl ControllerWorker<TextChange> for BufferControllerWorker {
|
|||
res = rx.message() => match res {
|
||||
Err(_e) => break,
|
||||
Ok(None) => break,
|
||||
Ok(Some(change)) => match serde_json::from_str::<Op>(&change.opseq) {
|
||||
Ok(op) => {
|
||||
Ok(Some(change)) => match postcard::from_bytes::<Op>(&change.op.data) {
|
||||
Ok(op) => { // TODO here in change we receive info about the author, maybe propagate?
|
||||
self.buffer.merge(op);
|
||||
self.content.send(self.buffer.view()).unwrap_or_warn("could not send buffer update");
|
||||
for tx in self.pollers.drain(..) {
|
||||
|
|
224
src/client.rs
224
src/client.rs
|
@ -2,150 +2,154 @@
|
|||
//!
|
||||
//! codemp client manager, containing grpc services
|
||||
|
||||
use std::{sync::Arc, collections::BTreeMap};
|
||||
use std::sync::Arc;
|
||||
|
||||
use tonic::transport::Channel;
|
||||
use dashmap::DashMap;
|
||||
use tokio::sync::mpsc;
|
||||
use tonic::service::interceptor::InterceptedService;
|
||||
use tonic::service::Interceptor;
|
||||
use tonic::transport::{Channel, Endpoint};
|
||||
use tonic::IntoRequest;
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::proto::auth::auth_client::AuthClient;
|
||||
use crate::{
|
||||
cursor::{worker::CursorControllerWorker, controller::CursorController},
|
||||
api::controller::ControllerWorker,
|
||||
cursor::worker::CursorWorker,
|
||||
proto::{
|
||||
buffer_client::BufferClient, cursor_client::CursorClient, UserIdentity, BufferPayload,
|
||||
common::Empty,
|
||||
buffer::buffer_client::BufferClient,
|
||||
cursor::cursor_client::CursorClient,
|
||||
auth::{Token, WorkspaceJoinRequest},
|
||||
workspace::workspace_client::WorkspaceClient,
|
||||
},
|
||||
Error, api::controller::ControllerWorker,
|
||||
buffer::{controller::BufferController, worker::BufferControllerWorker},
|
||||
workspace::Workspace
|
||||
};
|
||||
|
||||
|
||||
/// codemp client manager
|
||||
///
|
||||
/// contains all required grpc services and the unique user id
|
||||
/// will disconnect when dropped
|
||||
/// can be used to interact with server
|
||||
pub struct Client {
|
||||
id: String,
|
||||
client: Services,
|
||||
workspace: Option<Workspace>,
|
||||
user_id: Uuid,
|
||||
token_tx: Arc<tokio::sync::watch::Sender<Token>>,
|
||||
workspaces: Arc<DashMap<String, Arc<Workspace>>>,
|
||||
services: Arc<Services>
|
||||
}
|
||||
|
||||
struct Services {
|
||||
buffer: BufferClient<Channel>,
|
||||
cursor: CursorClient<Channel>,
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct ClientInterceptor {
|
||||
token: tokio::sync::watch::Receiver<Token>
|
||||
}
|
||||
|
||||
struct Workspace {
|
||||
cursor: Arc<CursorController>,
|
||||
buffers: BTreeMap<String, Arc<BufferController>>,
|
||||
impl Interceptor for ClientInterceptor {
|
||||
fn call(&mut self, mut request: tonic::Request<()>) -> Result<tonic::Request<()>, tonic::Status> {
|
||||
if let Ok(token) = self.token.borrow().token.parse() {
|
||||
request.metadata_mut().insert("auth", token);
|
||||
}
|
||||
|
||||
Ok(request)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub(crate) struct Services {
|
||||
pub(crate) workspace: WorkspaceClient<InterceptedService<Channel, ClientInterceptor>>,
|
||||
pub(crate) buffer: BufferClient<InterceptedService<Channel, ClientInterceptor>>,
|
||||
pub(crate) cursor: CursorClient<InterceptedService<Channel, ClientInterceptor>>,
|
||||
pub(crate) auth: AuthClient<Channel>,
|
||||
}
|
||||
|
||||
// TODO meno losco
|
||||
fn parse_codemp_connection_string(string: &str) -> (String, String) {
|
||||
let url = string.replace("codemp://", "");
|
||||
let (host, workspace) = url.split_once('/').unwrap();
|
||||
(format!("http://{}", host), workspace.to_string())
|
||||
}
|
||||
|
||||
impl Client {
|
||||
/// instantiate and connect a new client
|
||||
pub async fn new(dst: &str) -> Result<Self, tonic::transport::Error> {
|
||||
let buffer = BufferClient::connect(dst.to_string()).await?;
|
||||
let cursor = CursorClient::connect(dst.to_string()).await?;
|
||||
let id = uuid::Uuid::new_v4().to_string();
|
||||
pub async fn new(dest: &str) -> crate::Result<Self> {
|
||||
let (_host, _workspace_id) = parse_codemp_connection_string(dest);
|
||||
|
||||
Ok(Client { id, client: Services { buffer, cursor}, workspace: None })
|
||||
}
|
||||
let channel = Endpoint::from_shared(dest.to_string())?
|
||||
.connect()
|
||||
.await?;
|
||||
|
||||
/// return a reference to current cursor controller, if currently in a workspace
|
||||
pub fn get_cursor(&self) -> Option<Arc<CursorController>> {
|
||||
Some(self.workspace.as_ref()?.cursor.clone())
|
||||
}
|
||||
|
||||
/// leave current workspace if in one, disconnecting buffer and cursor controllers
|
||||
pub fn leave_workspace(&mut self) {
|
||||
// TODO need to stop tasks?
|
||||
self.workspace = None
|
||||
}
|
||||
|
||||
/// disconnect from a specific buffer
|
||||
pub fn disconnect_buffer(&mut self, path: &str) -> bool {
|
||||
match &mut self.workspace {
|
||||
Some(w) => w.buffers.remove(path).is_some(),
|
||||
None => false,
|
||||
}
|
||||
}
|
||||
|
||||
/// get a new reference to a buffer controller, if any is active to given path
|
||||
pub fn get_buffer(&self, path: &str) -> Option<Arc<BufferController>> {
|
||||
self.workspace.as_ref()?.buffers.get(path).cloned()
|
||||
}
|
||||
|
||||
/// join a workspace, starting a cursorcontroller and returning a new reference to it
|
||||
///
|
||||
/// to interact with such workspace [crate::api::Controller::send] cursor events or
|
||||
/// [crate::api::Controller::recv] for events on the associated [crate::cursor::Controller].
|
||||
pub async fn join(&mut self, _session: &str) -> crate::Result<Arc<CursorController>> {
|
||||
// TODO there is no real workspace handling in codemp server so it behaves like one big global
|
||||
// session. I'm still creating this to start laying out the proper use flow
|
||||
let stream = self.client.cursor.listen(UserIdentity { id: "".into() }).await?.into_inner();
|
||||
|
||||
let controller = CursorControllerWorker::new(self.id.clone());
|
||||
let client = self.client.cursor.clone();
|
||||
|
||||
let handle = Arc::new(controller.subscribe());
|
||||
|
||||
tokio::spawn(async move {
|
||||
tracing::debug!("cursor worker started");
|
||||
controller.work(client, stream).await;
|
||||
tracing::debug!("cursor worker stopped");
|
||||
});
|
||||
|
||||
self.workspace = Some(
|
||||
Workspace {
|
||||
cursor: handle.clone(),
|
||||
buffers: BTreeMap::new()
|
||||
}
|
||||
let (token_tx, token_rx) = tokio::sync::watch::channel(
|
||||
Token { token: "".to_string() }
|
||||
);
|
||||
|
||||
Ok(handle)
|
||||
let inter = ClientInterceptor { token: token_rx };
|
||||
|
||||
let buffer = BufferClient::with_interceptor(channel.clone(), inter.clone());
|
||||
let cursor = CursorClient::with_interceptor(channel.clone(), inter.clone());
|
||||
let workspace = WorkspaceClient::with_interceptor(channel.clone(), inter.clone());
|
||||
let auth = AuthClient::new(channel);
|
||||
|
||||
let user_id = uuid::Uuid::new_v4();
|
||||
|
||||
Ok(Client {
|
||||
user_id,
|
||||
token_tx: Arc::new(token_tx),
|
||||
workspaces: Arc::new(DashMap::default()),
|
||||
services: Arc::new(Services { workspace, buffer, cursor, auth })
|
||||
})
|
||||
}
|
||||
|
||||
/// create a new buffer in current workspace, with optional given content
|
||||
pub async fn create(&mut self, path: &str, content: Option<&str>) -> crate::Result<()> {
|
||||
if let Some(_workspace) = &self.workspace {
|
||||
self.client.buffer
|
||||
.create(BufferPayload {
|
||||
user: self.id.clone(),
|
||||
path: path.to_string(),
|
||||
content: content.map(|x| x.to_string()),
|
||||
}).await?;
|
||||
|
||||
Ok(())
|
||||
} else {
|
||||
Err(Error::InvalidState { msg: "join a workspace first".into() })
|
||||
}
|
||||
pub async fn login(&self, username: String, password: String, workspace_id: Option<String>) -> crate::Result<()> {
|
||||
Ok(self.token_tx.send(
|
||||
self.services.auth.clone()
|
||||
.login(WorkspaceJoinRequest { username, password, workspace_id})
|
||||
.await?
|
||||
.into_inner()
|
||||
)?)
|
||||
}
|
||||
|
||||
/// attach to a buffer, starting a buffer controller and returning a new reference to it
|
||||
///
|
||||
/// to interact with such buffer use [crate::api::Controller::send] or
|
||||
/// [crate::api::Controller::recv] to exchange [crate::api::TextChange]
|
||||
pub async fn attach(&mut self, path: &str) -> crate::Result<Arc<BufferController>> {
|
||||
if let Some(workspace) = &mut self.workspace {
|
||||
let mut client = self.client.buffer.clone();
|
||||
let req = BufferPayload {
|
||||
path: path.to_string(), user: self.id.clone(), content: None
|
||||
};
|
||||
/// join a workspace, returns an [tokio::sync::RwLock] to interact with it
|
||||
pub async fn join_workspace(&mut self, workspace: &str) -> crate::Result<Arc<Workspace>> {
|
||||
let ws_stream = self.services.workspace.clone().attach(Empty{}.into_request()).await?.into_inner();
|
||||
|
||||
let stream = client.attach(req).await?.into_inner();
|
||||
let (tx, rx) = mpsc::channel(256);
|
||||
let cur_stream = self.services.cursor.clone()
|
||||
.attach(tokio_stream::wrappers::ReceiverStream::new(rx))
|
||||
.await?
|
||||
.into_inner();
|
||||
|
||||
let controller = BufferControllerWorker::new(self.id.clone(), path);
|
||||
let handler = Arc::new(controller.subscribe());
|
||||
let worker = CursorWorker::default();
|
||||
let controller = Arc::new(worker.subscribe());
|
||||
tokio::spawn(async move {
|
||||
tracing::debug!("controller worker started");
|
||||
worker.work(tx, cur_stream).await;
|
||||
tracing::debug!("controller worker stopped");
|
||||
});
|
||||
|
||||
let _path = path.to_string();
|
||||
tokio::spawn(async move {
|
||||
tracing::debug!("buffer[{}] worker started", _path);
|
||||
controller.work(client, stream).await;
|
||||
tracing::debug!("buffer[{}] worker stopped", _path);
|
||||
});
|
||||
let ws = Arc::new(Workspace::new(
|
||||
workspace.to_string(),
|
||||
self.user_id,
|
||||
self.token_tx.clone(),
|
||||
controller,
|
||||
self.services.clone()
|
||||
));
|
||||
|
||||
workspace.buffers.insert(path.to_string(), handler.clone());
|
||||
ws.fetch_users().await?;
|
||||
ws.fetch_buffers().await?;
|
||||
|
||||
Ok(handler)
|
||||
} else {
|
||||
Err(Error::InvalidState { msg: "join a workspace first".into() })
|
||||
}
|
||||
ws.run_actor(ws_stream);
|
||||
|
||||
self.workspaces.insert(workspace.to_string(), ws.clone());
|
||||
|
||||
Ok(ws)
|
||||
}
|
||||
|
||||
pub fn get_workspace(&self, id: &str) -> Option<Arc<Workspace>> {
|
||||
self.workspaces.get(id).map(|x| x.clone())
|
||||
}
|
||||
|
||||
/// accessor for user id
|
||||
pub fn user_id(&self) -> Uuid {
|
||||
self.user_id
|
||||
}
|
||||
}
|
||||
|
|
|
@ -5,7 +5,7 @@
|
|||
use tokio::sync::{mpsc, broadcast::{self, error::{TryRecvError, RecvError}}, Mutex, watch};
|
||||
use tonic::async_trait;
|
||||
|
||||
use crate::{proto::{CursorPosition, CursorEvent}, Error, api::Controller, errors::IgnorableError};
|
||||
use crate::{api::Controller, errors::IgnorableError, proto::cursor::{CursorEvent, CursorPosition}};
|
||||
|
||||
/// the cursor controller implementation
|
||||
///
|
||||
|
@ -20,8 +20,7 @@ use crate::{proto::{CursorPosition, CursorEvent}, Error, api::Controller, errors
|
|||
/// upon dropping this handle will stop the associated worker
|
||||
#[derive(Debug)]
|
||||
pub struct CursorController {
|
||||
uid: String,
|
||||
op: mpsc::UnboundedSender<CursorEvent>,
|
||||
op: mpsc::UnboundedSender<CursorPosition>,
|
||||
last_op: Mutex<watch::Receiver<CursorEvent>>,
|
||||
stream: Mutex<broadcast::Receiver<CursorEvent>>,
|
||||
stop: mpsc::UnboundedSender<()>,
|
||||
|
@ -35,13 +34,12 @@ impl Drop for CursorController {
|
|||
|
||||
impl CursorController {
|
||||
pub(crate) fn new(
|
||||
uid: String,
|
||||
op: mpsc::UnboundedSender<CursorEvent>,
|
||||
op: mpsc::UnboundedSender<CursorPosition>,
|
||||
last_op: Mutex<watch::Receiver<CursorEvent>>,
|
||||
stream: Mutex<broadcast::Receiver<CursorEvent>>,
|
||||
stop: mpsc::UnboundedSender<()>,
|
||||
) -> Self {
|
||||
CursorController { uid, op, last_op, stream, stop }
|
||||
CursorController { op, last_op, stream, stop }
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -51,14 +49,11 @@ impl Controller<CursorEvent> for CursorController {
|
|||
|
||||
/// enqueue a cursor event to be broadcast to current workspace
|
||||
/// will automatically invert cursor start/end if they are inverted
|
||||
fn send(&self, mut cursor: CursorPosition) -> Result<(), Error> {
|
||||
if cursor.start() > cursor.end() {
|
||||
fn send(&self, mut cursor: CursorPosition) -> crate::Result<()> {
|
||||
if cursor.start > cursor.end {
|
||||
std::mem::swap(&mut cursor.start, &mut cursor.end);
|
||||
}
|
||||
Ok(self.op.send(CursorEvent {
|
||||
user: self.uid.clone(),
|
||||
position: Some(cursor),
|
||||
})?)
|
||||
Ok(self.op.send(cursor)?)
|
||||
}
|
||||
|
||||
/// try to receive without blocking, but will still block on stream mutex
|
||||
|
@ -67,7 +62,7 @@ impl Controller<CursorEvent> for CursorController {
|
|||
match stream.try_recv() {
|
||||
Ok(x) => Ok(Some(x)),
|
||||
Err(TryRecvError::Empty) => Ok(None),
|
||||
Err(TryRecvError::Closed) => Err(Error::Channel { send: false }),
|
||||
Err(TryRecvError::Closed) => Err(crate::Error::Channel { send: false }),
|
||||
Err(TryRecvError::Lagged(n)) => {
|
||||
tracing::warn!("cursor channel lagged, skipping {} events", n);
|
||||
Ok(stream.try_recv().ok())
|
||||
|
@ -78,11 +73,11 @@ impl Controller<CursorEvent> for CursorController {
|
|||
// TODO is this cancelable? so it can be used in tokio::select!
|
||||
// TODO is the result type overkill? should be an option?
|
||||
/// get next cursor event from current workspace, or block until one is available
|
||||
async fn recv(&self) -> Result<CursorEvent, Error> {
|
||||
async fn recv(&self) -> crate::Result<CursorEvent> {
|
||||
let mut stream = self.stream.lock().await;
|
||||
match stream.recv().await {
|
||||
Ok(x) => Ok(x),
|
||||
Err(RecvError::Closed) => Err(Error::Channel { send: false }),
|
||||
Err(RecvError::Closed) => Err(crate::Error::Channel { send: false }),
|
||||
Err(RecvError::Lagged(n)) => {
|
||||
tracing::error!("cursor channel lagged behind, skipping {} events", n);
|
||||
Ok(stream.recv().await.expect("could not receive after lagging"))
|
||||
|
|
|
@ -12,7 +12,7 @@ pub mod controller;
|
|||
|
||||
pub use controller::CursorController as Controller;
|
||||
|
||||
use crate::proto::{RowCol, CursorPosition};
|
||||
use crate::proto::cursor::RowCol;
|
||||
|
||||
impl From::<RowCol> for (i32, i32) {
|
||||
fn from(pos: RowCol) -> (i32, i32) {
|
||||
|
@ -26,25 +26,6 @@ impl From::<(i32, i32)> for RowCol {
|
|||
}
|
||||
}
|
||||
|
||||
impl RowCol {
|
||||
/// create a RowCol and wrap into an Option, to help build protocol packets
|
||||
pub fn wrap(row: i32, col: i32) -> Option<RowCol> {
|
||||
Some(RowCol { row, col })
|
||||
}
|
||||
}
|
||||
|
||||
impl CursorPosition {
|
||||
/// extract start position, defaulting to (0,0), to help build protocol packets
|
||||
pub fn start(&self) -> RowCol {
|
||||
self.start.clone().unwrap_or((0, 0).into())
|
||||
}
|
||||
|
||||
/// extract end position, defaulting to (0,0), to help build protocol packets
|
||||
pub fn end(&self) -> RowCol {
|
||||
self.end.clone().unwrap_or((0, 0).into())
|
||||
}
|
||||
}
|
||||
|
||||
impl PartialOrd for RowCol {
|
||||
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
|
||||
match self.row.partial_cmp(&other.row) {
|
||||
|
|
|
@ -1,16 +1,15 @@
|
|||
use std::sync::Arc;
|
||||
|
||||
use tokio::sync::{mpsc, broadcast::{self}, Mutex, watch};
|
||||
use tonic::{Streaming, transport::Channel, async_trait};
|
||||
use tonic::{Streaming, async_trait};
|
||||
|
||||
use crate::{proto::{cursor_client::CursorClient, CursorEvent}, errors::IgnorableError, api::controller::ControllerWorker};
|
||||
use crate::{api::controller::ControllerWorker, errors::IgnorableError, proto::cursor::{CursorPosition, CursorEvent}};
|
||||
|
||||
use super::controller::CursorController;
|
||||
|
||||
pub(crate) struct CursorControllerWorker {
|
||||
uid: String,
|
||||
producer: mpsc::UnboundedSender<CursorEvent>,
|
||||
op: mpsc::UnboundedReceiver<CursorEvent>,
|
||||
pub(crate) struct CursorWorker {
|
||||
producer: mpsc::UnboundedSender<CursorPosition>,
|
||||
op: mpsc::UnboundedReceiver<CursorPosition>,
|
||||
changed: watch::Sender<CursorEvent>,
|
||||
last_op: watch::Receiver<CursorEvent>,
|
||||
channel: Arc<broadcast::Sender<CursorEvent>>,
|
||||
|
@ -18,14 +17,13 @@ pub(crate) struct CursorControllerWorker {
|
|||
stop_control: mpsc::UnboundedSender<()>,
|
||||
}
|
||||
|
||||
impl CursorControllerWorker {
|
||||
pub(crate) fn new(uid: String) -> Self {
|
||||
impl Default for CursorWorker {
|
||||
fn default() -> Self {
|
||||
let (op_tx, op_rx) = mpsc::unbounded_channel();
|
||||
let (cur_tx, _cur_rx) = broadcast::channel(64);
|
||||
let (end_tx, end_rx) = mpsc::unbounded_channel();
|
||||
let (change_tx, change_rx) = watch::channel(CursorEvent::default());
|
||||
Self {
|
||||
uid,
|
||||
producer: op_tx,
|
||||
op: op_rx,
|
||||
changed: change_tx,
|
||||
|
@ -38,14 +36,13 @@ impl CursorControllerWorker {
|
|||
}
|
||||
|
||||
#[async_trait]
|
||||
impl ControllerWorker<CursorEvent> for CursorControllerWorker {
|
||||
impl ControllerWorker<CursorEvent> for CursorWorker {
|
||||
type Controller = CursorController;
|
||||
type Tx = CursorClient<Channel>;
|
||||
type Tx = mpsc::Sender<CursorPosition>;
|
||||
type Rx = Streaming<CursorEvent>;
|
||||
|
||||
fn subscribe(&self) -> CursorController {
|
||||
CursorController::new(
|
||||
self.uid.clone(),
|
||||
self.producer.clone(),
|
||||
Mutex::new(self.last_op.clone()),
|
||||
Mutex::new(self.channel.subscribe()),
|
||||
|
@ -53,19 +50,17 @@ impl ControllerWorker<CursorEvent> for CursorControllerWorker {
|
|||
)
|
||||
}
|
||||
|
||||
async fn work(mut self, mut tx: Self::Tx, mut rx: Self::Rx) {
|
||||
async fn work(mut self, tx: Self::Tx, mut rx: Self::Rx) {
|
||||
loop {
|
||||
tokio::select!{
|
||||
Ok(Some(cur)) = rx.message() => {
|
||||
if cur.user == self.uid { continue }
|
||||
self.channel.send(cur.clone()).unwrap_or_warn("could not broadcast event");
|
||||
self.changed.send(cur).unwrap_or_warn("could not update last event");
|
||||
},
|
||||
Some(op) = self.op.recv() => { tx.moved(op).await.unwrap_or_warn("could not update cursor"); },
|
||||
Some(op) = self.op.recv() => { tx.send(op).await.unwrap_or_warn("could not update cursor"); },
|
||||
Some(()) = self.stop.recv() => { break; },
|
||||
else => break,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -109,6 +109,13 @@ impl<T> From<tokio::sync::mpsc::error::SendError<T>> for Error {
|
|||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "client")]
|
||||
impl<T> From<tokio::sync::watch::error::SendError<T>> for Error {
|
||||
fn from(_value: tokio::sync::watch::error::SendError<T>) -> Self {
|
||||
Error::Channel { send: true }
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "client")]
|
||||
impl From<tokio::sync::broadcast::error::RecvError> for Error {
|
||||
fn from(_value: tokio::sync::broadcast::error::RecvError) -> Self {
|
||||
|
|
173
src/lib.rs
173
src/lib.rs
|
@ -5,8 +5,8 @@
|
|||
//! > the core library of the codemp project, driving all editor plugins
|
||||
//!
|
||||
//! ## structure
|
||||
//! The main entrypoint is the [Instance] object, that maintains a connection and can
|
||||
//! be used to join workspaces or attach to buffers. It contains the underlying [client::Client] and
|
||||
//! The main entrypoint is the [Client] object, that maintains a connection and can
|
||||
//! be used to join workspaces or attach to buffers. It contains the underlying [Workspace] and
|
||||
//! stores active controllers.
|
||||
//!
|
||||
//! Some actions will return structs implementing the [api::Controller] trait. These can be polled
|
||||
|
@ -21,67 +21,79 @@
|
|||
//! immediately but instead deferred until compatible.
|
||||
//!
|
||||
//! ## features
|
||||
//! * `woot` : include the underlying CRDT library and re-exports it (default enabled)
|
||||
//! * `api` : include traits for core interfaces under [api] (default enabled)
|
||||
//! * `proto` : include GRCP protocol definitions under [proto] (default enabled)
|
||||
//! * `client`: include the local [client] implementation (default enabled)
|
||||
//! * `global`: provide a lazy_static global INSTANCE in [instance::global]
|
||||
//! * `sync` : wraps the [instance::a_sync::Instance] holder into a sync variant: [instance::sync::Instance]
|
||||
//! * `api` : include traits for core interfaces under [api] (default enabled)
|
||||
//! * `woot` : include the underlying CRDT library and re-exports it (default enabled)
|
||||
//! * `proto` : include GRCP protocol definitions under [proto] (default enabled)
|
||||
//! * `client` : include the local [client] implementation (default enabled)
|
||||
//!
|
||||
//! ## examples
|
||||
//! while the [client::Client] itself is the core structure implementing all methods, plugins will mostly
|
||||
//! interact with [Instance] managers.
|
||||
//! most methods are split between the [Client] itself and the current [Workspace]
|
||||
//!
|
||||
//! ### async
|
||||
//! this library is natively async and thus async usage should be preferred if possible with
|
||||
//! [instance::a_sync::Instance]
|
||||
//!
|
||||
//! ```rust,no_run
|
||||
//! use codemp::api::{Controller, TextChange};
|
||||
//! # use codemp::instance::a_sync::Instance;
|
||||
//!
|
||||
//! # async fn async_example() -> codemp::Result<()> {
|
||||
//! let session = Instance::default(); // create global session
|
||||
//! session.connect("http://alemi.dev:50051").await?; // connect to remote server
|
||||
//! // creating a client session will immediately attempt to connect
|
||||
//! let mut session = codemp::Client::new("http://alemi.dev:50053").await?;
|
||||
//!
|
||||
//! // join a remote workspace, obtaining a cursor controller
|
||||
//! let cursor = session.join("some_workspace").await?;
|
||||
//! cursor.send( // move cursor
|
||||
//! codemp::proto::CursorPosition {
|
||||
//! // login first, obtaining a new token granting access to 'some_workspace'
|
||||
//! session.login(
|
||||
//! "username".to_string(),
|
||||
//! "password".to_string(),
|
||||
//! Some("some_workspace".to_string())
|
||||
//! ).await?;
|
||||
//!
|
||||
//! // join a remote workspace, obtaining a workspace handle
|
||||
//! let workspace = session.join_workspace("some_workspace").await?;
|
||||
//!
|
||||
//! workspace.cursor().send( // move cursor
|
||||
//! codemp::proto::cursor::CursorPosition {
|
||||
//! buffer: "test.txt".into(),
|
||||
//! start: Some(codemp::proto::RowCol { row: 0, col: 0 }),
|
||||
//! end: Some(codemp::proto::RowCol { row: 0, col: 1 }),
|
||||
//! start: codemp::proto::cursor::RowCol { row: 0, col: 0 },
|
||||
//! end: codemp::proto::cursor::RowCol { row: 0, col: 1 },
|
||||
//! }
|
||||
//! )?;
|
||||
//! let op = cursor.recv().await?; // listen for event
|
||||
//! let op = workspace.cursor().recv().await?; // receive event from server
|
||||
//! println!("received cursor event: {:?}", op);
|
||||
//!
|
||||
//! // attach to a new buffer and execute operations on it
|
||||
//! session.create("test.txt", None).await?; // create new buffer
|
||||
//! let buffer = session.attach("test.txt").await?; // attach to it
|
||||
//! workspace.create("test.txt").await?; // create new buffer
|
||||
//! let buffer = workspace.attach("test.txt").await?; // attach to it
|
||||
//! let local_change = TextChange { span: 0..0, content: "hello!".into() };
|
||||
//! buffer.send(local_change)?; // insert some text
|
||||
//! let remote_change = buffer.recv().await?;
|
||||
//! let remote_change = buffer.recv().await?; // await remote change
|
||||
//! #
|
||||
//! # Ok(())
|
||||
//! # }
|
||||
//! ```
|
||||
//!
|
||||
//! it's always possible to get a [Workspace] reference using [Client::get_workspace]
|
||||
//!
|
||||
//! ### sync
|
||||
//! if async is not viable, including the feature `sync` will provide a sync-only [instance::sync::Instance] variant
|
||||
//! if async is not viable, a solution might be keeping a global tokio runtime and blocking on it:
|
||||
//!
|
||||
//! ```rust,no_run
|
||||
//! # use codemp::instance::sync::Instance;
|
||||
//! # use std::sync::Arc;
|
||||
//! # use codemp::api::Controller;
|
||||
//! #
|
||||
//! # fn sync_example() -> codemp::Result<()> {
|
||||
//! let session = Instance::default(); // instantiate sync variant
|
||||
//! session.connect("http://alemi.dev:50051")?; // connect to server
|
||||
//! let rt = tokio::runtime::Runtime::new().unwrap();
|
||||
//! let mut session = rt.block_on( // using block_on allows calling async code
|
||||
//! codemp::Client::new("http://alemi.dev:50051")
|
||||
//! )?;
|
||||
//!
|
||||
//! rt.block_on(session.login(
|
||||
//! "username".to_string(),
|
||||
//! "password".to_string(),
|
||||
//! Some("some_workspace".to_string())
|
||||
//! ))?;
|
||||
//!
|
||||
//! let workspace = rt.block_on(session.join_workspace("some_workspace"))?;
|
||||
//!
|
||||
//! // attach to buffer and blockingly receive events
|
||||
//! let buffer = session.attach("test.txt")?; // attach to buffer, must already exist
|
||||
//! while let Ok(op) = buffer.blocking_recv(session.rt()) { // must pass runtime
|
||||
//! let buffer = rt.block_on(workspace.attach("test.txt"))?; // attach to buffer, must already exist
|
||||
//! while let Ok(op) = rt.block_on(buffer.recv()) { // must pass runtime
|
||||
//! println!("received buffer event: {:?}", op);
|
||||
//! }
|
||||
//! #
|
||||
|
@ -89,30 +101,6 @@
|
|||
//! # }
|
||||
//! ```
|
||||
//!
|
||||
//! ### global
|
||||
//! if instantiating the [Instance] manager is not possible, adding the feature `global` will
|
||||
//! provide a static lazyly-allocated global reference: [struct@instance::global::INSTANCE].
|
||||
//!
|
||||
//! ```rust,no_run
|
||||
//! # use codemp::instance::sync::Instance;
|
||||
//! # use std::sync::Arc;
|
||||
//! use codemp::prelude::*; // prelude includes everything with "Codemp" in front
|
||||
//! # fn global_example() -> codemp::Result<()> {
|
||||
//! CODEMP_INSTANCE.connect("http://alemi.dev:50051")?; // connect to server
|
||||
//! let cursor = CODEMP_INSTANCE.join("some_workspace")?; // join workspace
|
||||
//! std::thread::spawn(move || {
|
||||
//! loop {
|
||||
//! match cursor.try_recv() {
|
||||
//! Ok(Some(event)) => println!("received cursor event: {:?}", event), // there might be more
|
||||
//! Ok(None) => std::thread::sleep(std::time::Duration::from_millis(10)), // wait for more
|
||||
//! Err(e) => break, // channel closed
|
||||
//! }
|
||||
//! }
|
||||
//! });
|
||||
//! # Ok(())
|
||||
//! # }
|
||||
//! ```
|
||||
//!
|
||||
//! ## references
|
||||
//!
|
||||
//! ![another cool pic coz why not](https://alemi.dev/img/about-slice-2.png)
|
||||
|
@ -148,11 +136,9 @@ pub mod errors;
|
|||
#[cfg(feature = "client")]
|
||||
pub mod client;
|
||||
|
||||
pub mod tools;
|
||||
|
||||
/// client wrapper to handle memory persistence
|
||||
/// workspace operations
|
||||
#[cfg(feature = "client")]
|
||||
pub mod instance;
|
||||
pub mod workspace;
|
||||
|
||||
/// all-in-one imports : `use codemp::prelude::*;`
|
||||
pub mod prelude;
|
||||
|
@ -165,17 +151,70 @@ pub use woot;
|
|||
#[cfg(feature = "proto")]
|
||||
#[allow(non_snake_case)]
|
||||
pub mod proto {
|
||||
tonic::include_proto!("codemp.buffer");
|
||||
tonic::include_proto!("codemp.cursor");
|
||||
}
|
||||
pub mod common {
|
||||
tonic::include_proto!("common");
|
||||
|
||||
impl From<uuid::Uuid> for Identity {
|
||||
fn from(id: uuid::Uuid) -> Self {
|
||||
Identity { id: id.to_string() }
|
||||
}
|
||||
}
|
||||
|
||||
impl From<&uuid::Uuid> for Identity {
|
||||
fn from(id: &uuid::Uuid) -> Self {
|
||||
Identity { id: id.to_string() }
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Identity> for uuid::Uuid {
|
||||
fn from(value: Identity) -> Self {
|
||||
uuid::Uuid::parse_str(&value.id).expect("invalid uuid in identity")
|
||||
}
|
||||
}
|
||||
|
||||
impl From<&Identity> for uuid::Uuid {
|
||||
fn from(value: &Identity) -> Self {
|
||||
uuid::Uuid::parse_str(&value.id).expect("invalid uuid in identity")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
pub mod files {
|
||||
tonic::include_proto!("files");
|
||||
|
||||
impl From<String> for BufferNode {
|
||||
fn from(value: String) -> Self {
|
||||
BufferNode { path: value }
|
||||
}
|
||||
}
|
||||
|
||||
impl From<&str> for BufferNode {
|
||||
fn from(value: &str) -> Self {
|
||||
BufferNode { path: value.to_string() }
|
||||
}
|
||||
}
|
||||
|
||||
impl From<BufferNode> for String {
|
||||
fn from(value: BufferNode) -> Self {
|
||||
value.path
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub mod buffer { tonic::include_proto!("buffer"); }
|
||||
pub mod cursor { tonic::include_proto!("cursor"); }
|
||||
pub mod workspace { tonic::include_proto!("workspace"); }
|
||||
pub mod auth { tonic::include_proto!("auth"); }
|
||||
}
|
||||
|
||||
pub use errors::Error;
|
||||
pub use errors::Result;
|
||||
|
||||
#[cfg(all(feature = "client", feature = "sync"))]
|
||||
pub use instance::sync::Instance;
|
||||
#[cfg(feature = "client")]
|
||||
pub use client::Client;
|
||||
|
||||
#[cfg(feature = "client")]
|
||||
pub use workspace::Workspace;
|
||||
|
||||
#[cfg(all(feature = "client", not(feature = "sync")))]
|
||||
pub use instance::a_sync::Instance;
|
||||
|
||||
|
|
|
@ -18,18 +18,17 @@ pub use crate::api::{
|
|||
|
||||
#[cfg(feature = "client")]
|
||||
pub use crate::{
|
||||
Instance as CodempInstance,
|
||||
// Instance as CodempInstance,
|
||||
client::Client as CodempClient,
|
||||
workspace::Workspace as CodempWorkspace,
|
||||
workspace::UserInfo as CodempUserInfo,
|
||||
cursor::Controller as CodempCursorController,
|
||||
buffer::Controller as CodempBufferController,
|
||||
};
|
||||
|
||||
#[cfg(feature = "proto")]
|
||||
pub use crate::{
|
||||
proto::CursorPosition as CodempCursorPosition,
|
||||
proto::CursorEvent as CodempCursorEvent,
|
||||
proto::RowCol as CodempRowCol,
|
||||
proto::cursor::CursorPosition as CodempCursorPosition,
|
||||
proto::cursor::CursorEvent as CodempCursorEvent,
|
||||
proto::cursor::RowCol as CodempRowCol,
|
||||
};
|
||||
|
||||
#[cfg(feature = "global")]
|
||||
pub use crate::instance::global::INSTANCE as CODEMP_INSTANCE;
|
||||
|
|
184
src/workspace.rs
Normal file
184
src/workspace.rs
Normal file
|
@ -0,0 +1,184 @@
|
|||
use std::{collections::BTreeSet, sync::Arc};
|
||||
use tokio::sync::mpsc;
|
||||
use dashmap::{DashMap, DashSet};
|
||||
use tonic::Streaming;
|
||||
use uuid::Uuid;
|
||||
use crate::{
|
||||
api::controller::ControllerWorker, buffer::{self, worker::BufferWorker}, client::Services, cursor,
|
||||
proto::{auth::Token, common::{Identity, Empty}, files::BufferNode, workspace::{WorkspaceEvent, workspace_event::{Event as WorkspaceEventInner, FileCreate, FileDelete, FileRename, UserJoin, UserLeave}}}
|
||||
};
|
||||
|
||||
//TODO may contain more info in the future
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct UserInfo {
|
||||
pub uuid: Uuid
|
||||
}
|
||||
|
||||
pub struct Workspace {
|
||||
id: String,
|
||||
user_id: Uuid, // reference to global user id
|
||||
token: Arc<tokio::sync::watch::Sender<Token>>,
|
||||
cursor: Arc<cursor::Controller>,
|
||||
buffers: Arc<DashMap<String, Arc<buffer::Controller>>>,
|
||||
pub(crate) filetree: Arc<DashSet<String>>,
|
||||
pub(crate) users: Arc<DashMap<Uuid, UserInfo>>,
|
||||
services: Arc<Services>
|
||||
}
|
||||
|
||||
impl Workspace {
|
||||
/// create a new buffer and perform initial fetch operations
|
||||
pub(crate) fn new(
|
||||
id: String,
|
||||
user_id: Uuid,
|
||||
token: Arc<tokio::sync::watch::Sender<Token>>,
|
||||
cursor: Arc<cursor::Controller>,
|
||||
services: Arc<Services>
|
||||
) -> Self {
|
||||
Workspace {
|
||||
id,
|
||||
user_id,
|
||||
token,
|
||||
cursor,
|
||||
buffers: Arc::new(DashMap::default()),
|
||||
filetree: Arc::new(DashSet::default()),
|
||||
users: Arc::new(DashMap::default()),
|
||||
services
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn run_actor(&self, mut stream: Streaming<WorkspaceEvent>) {
|
||||
let users = self.users.clone();
|
||||
let filetree = self.filetree.clone();
|
||||
let name = self.id();
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
match stream.message().await {
|
||||
Err(e) => break tracing::error!("workspace '{}' stream closed: {}", name, e),
|
||||
Ok(None) => break tracing::info!("leaving workspace {}", name),
|
||||
Ok(Some(WorkspaceEvent { event: None })) => tracing::warn!("workspace {} received empty event", name),
|
||||
Ok(Some(WorkspaceEvent { event: Some(ev) })) => match ev {
|
||||
WorkspaceEventInner::Join(UserJoin { user }) => { users.insert(user.clone().into(), UserInfo { uuid: user.into() }); },
|
||||
WorkspaceEventInner::Leave(UserLeave { user }) => { users.remove(&user.into()); },
|
||||
WorkspaceEventInner::Create(FileCreate { path }) => { filetree.insert(path); },
|
||||
WorkspaceEventInner::Rename(FileRename { before, after }) => { filetree.remove(&before); filetree.insert(after); },
|
||||
WorkspaceEventInner::Delete(FileDelete { path }) => { filetree.remove(&path); },
|
||||
},
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/// create a new buffer in current workspace
|
||||
pub async fn create(&self, path: &str) -> crate::Result<()> {
|
||||
let mut workspace_client = self.services.workspace.clone();
|
||||
workspace_client.create_buffer(
|
||||
tonic::Request::new(BufferNode { path: path.to_string() })
|
||||
).await?;
|
||||
|
||||
// add to filetree
|
||||
self.filetree.insert(path.to_string());
|
||||
|
||||
// fetch buffers
|
||||
self.fetch_buffers().await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// attach to a buffer, starting a buffer controller and returning a new reference to it
|
||||
///
|
||||
/// to interact with such buffer use [crate::api::Controller::send] or
|
||||
/// [crate::api::Controller::recv] to exchange [crate::api::TextChange]
|
||||
pub async fn attach(&self, path: &str) -> crate::Result<Arc<buffer::Controller>> {
|
||||
let mut worskspace_client = self.services.workspace.clone();
|
||||
let request = tonic::Request::new(BufferNode { path: path.to_string() });
|
||||
let credentials = worskspace_client.access_buffer(request).await?.into_inner();
|
||||
self.token.send(credentials.token)?;
|
||||
|
||||
let (tx, rx) = mpsc::channel(256);
|
||||
let mut req = tonic::Request::new(tokio_stream::wrappers::ReceiverStream::new(rx));
|
||||
req.metadata_mut().insert("path", tonic::metadata::MetadataValue::try_from(credentials.id.id).expect("could not represent path as byte sequence"));
|
||||
let stream = self.services.buffer.clone().attach(req).await?.into_inner();
|
||||
|
||||
let worker = BufferWorker::new(self.user_id, path);
|
||||
let controller = Arc::new(worker.subscribe());
|
||||
tokio::spawn(async move {
|
||||
tracing::debug!("controller worker started");
|
||||
worker.work(tx, stream).await;
|
||||
tracing::debug!("controller worker stopped");
|
||||
});
|
||||
|
||||
self.buffers.insert(path.to_string(), controller.clone());
|
||||
|
||||
Ok(controller)
|
||||
}
|
||||
|
||||
/// fetch a list of all buffers in a workspace
|
||||
pub async fn fetch_buffers(&self) -> crate::Result<()> {
|
||||
let mut workspace_client = self.services.workspace.clone();
|
||||
let buffers = workspace_client.list_buffers(
|
||||
tonic::Request::new(Empty {})
|
||||
).await?.into_inner().buffers;
|
||||
|
||||
self.filetree.clear();
|
||||
for b in buffers {
|
||||
self.filetree.insert(b.path);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// fetch a list of all users in a workspace
|
||||
pub async fn fetch_users(&self) -> crate::Result<()> {
|
||||
let mut workspace_client = self.services.workspace.clone();
|
||||
let users = BTreeSet::from_iter(workspace_client.list_users(
|
||||
tonic::Request::new(Empty {})
|
||||
).await?.into_inner().users.into_iter().map(Uuid::from));
|
||||
|
||||
self.users.clear();
|
||||
for u in users {
|
||||
self.users.insert(u, UserInfo { uuid: u });
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// get a list of the users attached to a specific buffer
|
||||
///
|
||||
/// TODO: discuss implementation details
|
||||
pub async fn list_buffer_users(&self, path: &str) -> crate::Result<Vec<Identity>> {
|
||||
let mut workspace_client = self.services.workspace.clone();
|
||||
let buffer_users = workspace_client.list_buffer_users(
|
||||
tonic::Request::new(BufferNode { path: path.to_string() })
|
||||
).await?.into_inner().users;
|
||||
|
||||
Ok(buffer_users)
|
||||
}
|
||||
|
||||
/// delete a buffer
|
||||
pub async fn delete(&self, path: &str) -> crate::Result<()> {
|
||||
let mut workspace_client = self.services.workspace.clone();
|
||||
workspace_client.delete_buffer(
|
||||
tonic::Request::new(BufferNode { path: path.to_string() })
|
||||
).await?;
|
||||
|
||||
self.filetree.remove(path);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// get the id of the workspace
|
||||
pub fn id(&self) -> String { self.id.clone() }
|
||||
|
||||
/// return a reference to current cursor controller, if currently in a workspace
|
||||
pub fn cursor(&self) -> Arc<cursor::Controller> { self.cursor.clone() }
|
||||
|
||||
/// get a new reference to a buffer controller, if any is active to given path
|
||||
pub fn buffer_by_name(&self, path: &str) -> Option<Arc<buffer::Controller>> {
|
||||
self.buffers.get(path).map(|x| x.clone())
|
||||
}
|
||||
|
||||
/// get the currently cached "filetree"
|
||||
pub fn filetree(&self) -> Vec<String> {
|
||||
self.filetree.iter().map(|f| f.clone()).collect()
|
||||
}
|
||||
}
|
Loading…
Reference in a new issue