feat: workspaces and new library structure

Co-authored-by: alemi <me@alemi.dev>
Co-authored-by: frelodev <frelodev@gmail.com>
This commit is contained in:
zaaarf 2024-01-25 02:13:45 +01:00
parent 1657521356
commit 94a7786812
15 changed files with 380 additions and 204 deletions

View file

@ -10,7 +10,7 @@ name = "codemp"
# core # core
tracing = "0.1" tracing = "0.1"
# woot # woot
codemp-woot = { git = "ssh://git@github.com/codewithotherpeopleandchangenamelater/woot.git", tag = "v0.1.0", optional = true } codemp-woot = { git = "ssh://git@github.com/codewithotherpeopleandchangenamelater/woot.git", features = ["serde"], tag = "v0.1.0", optional = true }
# proto # proto
tonic = { version = "0.9", features = ["tls", "tls-roots"], optional = true } tonic = { version = "0.9", features = ["tls", "tls-roots"], optional = true }
prost = { version = "0.11.8", optional = true } prost = { version = "0.11.8", optional = true }
@ -26,12 +26,13 @@ tokio-stream = { version = "0.1", optional = true }
# global # global
lazy_static = { version = "1.4", optional = true } lazy_static = { version = "1.4", optional = true }
serde = { version = "1.0.193", features = ["derive"] } serde = { version = "1.0.193", features = ["derive"] }
postcard = "1.0.8"
[build-dependencies] [build-dependencies]
tonic-build = "0.9" tonic-build = "0.9"
[features] [features]
default = ["transport", "dep:serde_json"] default = ["client"]
api = ["woot", "dep:similar", "dep:tokio", "dep:async-trait"] api = ["woot", "dep:similar", "dep:tokio", "dep:async-trait"]
woot = ["dep:codemp-woot"] woot = ["dep:codemp-woot"]
transport = ["dep:prost", "dep:tonic"] transport = ["dep:prost", "dep:tonic"]
@ -39,3 +40,4 @@ client = ["transport", "api", "dep:tokio", "dep:tokio-stream", "dep:uuid", "d
server = ["transport"] server = ["transport"]
global = ["client", "dep:lazy_static"] global = ["client", "dep:lazy_static"]
sync = ["client"] sync = ["client"]
backport = [] # TODO remove!

View file

@ -6,8 +6,6 @@ import "user.proto";
// handle cursor events and broadcast to all users // handle cursor events and broadcast to all users
service Cursor { service Cursor {
// send cursor movement to server // subscribe to a workspace's cursor events
rpc Moved (cursor.CursorEvent) returns (cursor.MovedResponse); rpc Attach (stream cursor.CursorEvent) returns (stream cursor.CursorEvent);
// attach to a workspace and receive cursor events
rpc Listen (user.UserIdentity) returns (stream cursor.CursorEvent);
} }

View file

@ -3,11 +3,11 @@ syntax = "proto2";
package files; package files;
message BufferNode{ message BufferNode {
required string path = 1; required string path = 1;
} }
message BufferTree{ message BufferTree {
repeated BufferNode buffers = 1; repeated BufferNode buffers = 1;
} }

View file

@ -4,7 +4,7 @@ package user;
// payload identifying user // payload identifying user
message UserIdentity{ message UserIdentity {
// user identifier // user identifier
required string id = 1; required string id = 1;
} }

View file

@ -4,14 +4,12 @@ package workspace;
import "user.proto"; import "user.proto";
import "files.proto"; import "files.proto";
message Empty {} message Empty {}
message TreeRequest {} // empty message TreeRequest {} // empty
message UserRequest {} message UserRequest {}
message CursorResponse{} message CursorResponse {}
message UserListRequest{} message UserListRequest {}
message WorkspaceUserList { message WorkspaceUserList {
repeated user.UserIdentity user = 1; repeated user.UserIdentity user = 1;
@ -21,19 +19,16 @@ message WorkspaceMessage {
required int32 id = 1; required int32 id = 1;
} }
message JoinRequest{ message JoinRequest {
required string username=1; required string username=1;
required string password=2; required string password=2;
} }
message AttachRequest{ message AttachRequest {
required string bufferAttach = 1; required string id = 1;
} }
message Token {
message Token{
required string token = 1; required string token = 1;
} }
@ -44,7 +39,7 @@ enum FileEventType {
} }
message FileEvent { message FileEvent {
required string buffer = 1; required string bufferbuffertree = 1;
required FileEventType type = 2; required FileEventType type = 2;
} }
@ -56,28 +51,18 @@ enum UserEventType {
message UserEvent { message UserEvent {
required user.UserIdentity user = 1; required user.UserIdentity user = 1;
required UserEventType type = 2; required UserEventType type = 2;
} }
message BufferPayload { message BufferPayload {
// buffer path to operate onto // buffer path to operate onto
required string path = 1; required string path = 1;
// user id that is requesting the operation
required user.UserIdentity user = 2;
} }
message BufferListRequest{ message BufferListRequest{
} }
message UserList {
message UserList{
repeated user.UserIdentity users = 1; repeated user.UserIdentity users = 1;
} }

View file

@ -3,6 +3,8 @@
//! an editor-friendly representation of a text change in a buffer //! an editor-friendly representation of a text change in a buffer
//! to easily interface with codemp from various editors //! to easily interface with codemp from various editors
use crate::proto::cursor::RowCol;
/// an editor-friendly representation of a text change in a buffer /// an editor-friendly representation of a text change in a buffer
/// ///
/// this represent a range in the previous state of the string and a new content which should be /// this represent a range in the previous state of the string and a new content which should be

View file

@ -3,22 +3,20 @@ use std::hash::{Hash, Hasher};
use similar::{TextDiff, ChangeTag}; use similar::{TextDiff, ChangeTag};
use tokio::sync::{watch, mpsc, oneshot}; use tokio::sync::{watch, mpsc, oneshot};
use tonic::transport::Channel;
use tonic::{async_trait, Streaming}; use tonic::{async_trait, Streaming};
use uuid::Uuid;
use woot::crdt::{Op, CRDT, TextEditor}; use woot::crdt::{Op, CRDT, TextEditor};
use woot::woot::Woot; use woot::woot::Woot;
use crate::errors::IgnorableError; use crate::errors::IgnorableError;
use crate::proto::{OperationRequest, RawOp};
use crate::proto::buffer_client::BufferClient;
use crate::api::controller::ControllerWorker; use crate::api::controller::ControllerWorker;
use crate::api::TextChange; use crate::api::TextChange;
use crate::proto::buffer_service::Operation;
use super::controller::BufferController; use super::controller::BufferController;
pub(crate) struct BufferWorker {
pub(crate) struct BufferControllerWorker { _user_id: Uuid,
uid: String,
name: String, name: String,
buffer: Woot, buffer: Woot,
content: watch::Sender<String>, content: watch::Sender<String>,
@ -36,17 +34,17 @@ struct ClonableHandlesForController {
content: watch::Receiver<String>, content: watch::Receiver<String>,
} }
impl BufferControllerWorker { impl BufferWorker {
pub fn new(uid: String, path: &str) -> Self { pub fn new(user_id: Uuid, path: &str) -> Self {
let (txt_tx, txt_rx) = watch::channel("".to_string()); let (txt_tx, txt_rx) = watch::channel("".to_string());
let (op_tx, op_rx) = mpsc::unbounded_channel(); let (op_tx, op_rx) = mpsc::unbounded_channel();
let (end_tx, end_rx) = mpsc::unbounded_channel(); let (end_tx, end_rx) = mpsc::unbounded_channel();
let (poller_tx, poller_rx) = mpsc::unbounded_channel(); let (poller_tx, poller_rx) = mpsc::unbounded_channel();
let mut hasher = DefaultHasher::new(); let mut hasher = DefaultHasher::new();
uid.hash(&mut hasher); user_id.hash(&mut hasher);
let site_id = hasher.finish() as usize; let site_id = hasher.finish() as usize;
BufferControllerWorker { BufferWorker {
uid, _user_id: user_id,
name: path.to_string(), name: path.to_string(),
buffer: Woot::new(site_id % (2<<10), ""), // TODO remove the modulo, only for debugging! buffer: Woot::new(site_id % (2<<10), ""), // TODO remove the modulo, only for debugging!
content: txt_tx, content: txt_tx,
@ -62,26 +60,13 @@ impl BufferControllerWorker {
stop: end_rx, stop: end_rx,
} }
} }
async fn send_op(&self, tx: &mut BufferClient<Channel>, outbound: &Op) -> crate::Result<()> {
let opseq = serde_json::to_string(outbound).expect("could not serialize opseq");
let req = OperationRequest {
path: self.name.clone(),
hash: format!("{:x}", md5::compute(self.buffer.view())),
op: Some(RawOp {
opseq, user: self.uid.clone(),
}),
};
let _ = tx.edit(req).await?;
Ok(())
}
} }
#[async_trait] #[async_trait]
impl ControllerWorker<TextChange> for BufferControllerWorker { impl ControllerWorker<TextChange> for BufferControllerWorker {
type Controller = BufferController; type Controller = BufferController;
type Tx = BufferClient<Channel>; type Tx = mpsc::Sender<Operation>;
type Rx = Streaming<RawOp>; type Rx = Streaming<Operation>;
fn subscribe(&self) -> BufferController { fn subscribe(&self) -> BufferController {
BufferController::new( BufferController::new(
@ -93,7 +78,7 @@ impl ControllerWorker<TextChange> for BufferControllerWorker {
) )
} }
async fn work(mut self, mut tx: Self::Tx, mut rx: Self::Rx) { async fn work(mut self, tx: Self::Tx, mut rx: Self::Rx) {
loop { loop {
// block until one of these is ready // block until one of these is ready
tokio::select! { tokio::select! {
@ -143,7 +128,13 @@ impl ControllerWorker<TextChange> for BufferControllerWorker {
} }
for op in ops { for op in ops {
match self.send_op(&mut tx, &op).await { let operation = Operation {
data: postcard::to_extend(&op, Vec::new()).unwrap(),
user: None,
path: Some(self.name.clone())
};
match tx.send(operation).await {
Err(e) => tracing::error!("server refused to broadcast {}: {}", op, e), Err(e) => tracing::error!("server refused to broadcast {}: {}", op, e),
Ok(()) => { Ok(()) => {
self.content.send(self.buffer.view()).unwrap_or_warn("could not send buffer update"); self.content.send(self.buffer.view()).unwrap_or_warn("could not send buffer update");
@ -160,7 +151,7 @@ impl ControllerWorker<TextChange> for BufferControllerWorker {
res = rx.message() => match res { res = rx.message() => match res {
Err(_e) => break, Err(_e) => break,
Ok(None) => break, Ok(None) => break,
Ok(Some(change)) => match serde_json::from_str::<Op>(&change.opseq) { Ok(Some(change)) => match postcard::from_bytes::<Op>(&change.data) {
Ok(op) => { Ok(op) => {
self.buffer.merge(op); self.buffer.merge(op);
self.content.send(self.buffer.view()).unwrap_or_warn("could not send buffer update"); self.content.send(self.buffer.view()).unwrap_or_warn("could not send buffer update");

View file

@ -1,20 +1,21 @@
//! ### client //! ### client
//! //!
//! codemp client manager, containing grpc services //! codemp client manager, containing grpc services
use std::{sync::Arc, collections::BTreeMap}; use std::sync::Arc;
use tokio::sync::mpsc;
use tonic::transport::Channel; use tonic::service::interceptor::InterceptedService;
use tonic::service::Interceptor;
use crate::{ use tonic::transport::{Channel, Endpoint};
cursor::{worker::CursorControllerWorker, controller::CursorController}, use uuid::Uuid;
proto::{
buffer_client::BufferClient, cursor_client::CursorClient, UserIdentity, BufferPayload,
},
Error, api::controller::ControllerWorker,
buffer::{controller::BufferController, worker::BufferControllerWorker},
};
use crate::api::controller::ControllerWorker;
use crate::cursor::worker::CursorWorker;
use crate::proto::buffer_service::buffer_client::BufferClient;
use crate::proto::cursor_service::cursor_client::CursorClient;
use crate::proto::workspace::{JoinRequest, Token};
use crate::proto::workspace_service::workspace_client::WorkspaceClient;
use crate::workspace::Workspace;
/// codemp client manager /// codemp client manager
/// ///
@ -22,130 +23,102 @@ use crate::{
/// will disconnect when dropped /// will disconnect when dropped
/// can be used to interact with server /// can be used to interact with server
pub struct Client { pub struct Client {
id: String, user_id: Uuid,
client: Services, token_tx: Arc<tokio::sync::watch::Sender<Token>>,
workspace: Option<Workspace>, workspace: Option<Workspace>,
services: Arc<Services>
} }
struct Services { #[derive(Clone)]
buffer: BufferClient<Channel>, pub(crate) struct ClientInterceptor {
cursor: CursorClient<Channel>, token: tokio::sync::watch::Receiver<Token>
} }
struct Workspace { impl Interceptor for ClientInterceptor {
cursor: Arc<CursorController>, fn call(&mut self, mut request: tonic::Request<()>) -> Result<tonic::Request<()>, tonic::Status> {
buffers: BTreeMap<String, Arc<BufferController>>, if let Ok(token) = self.token.borrow().token.parse() {
request.metadata_mut().insert("auth", token);
}
Ok(request)
}
} }
#[derive(Debug, Clone)]
pub(crate) struct Services {
pub(crate) workspace: crate::proto::workspace_service::workspace_client::WorkspaceClient<InterceptedService<Channel, ClientInterceptor>>,
pub(crate) buffer: crate::proto::buffer_service::buffer_client::BufferClient<InterceptedService<Channel, ClientInterceptor>>,
pub(crate) cursor: crate::proto::cursor_service::cursor_client::CursorClient<InterceptedService<Channel, ClientInterceptor>>,
}
// TODO meno losco
fn parse_codemp_connection_string<'a>(string: &'a str) -> (String, String) {
let url = string.replace("codemp://", "");
let (host, workspace) = url.split_once('/').unwrap();
(format!("http://{}", host), workspace.to_string())
}
impl Client { impl Client {
/// instantiate and connect a new client /// instantiate and connect a new client
pub async fn new(dst: &str) -> Result<Self, tonic::transport::Error> { pub async fn new(dest: &str) -> crate::Result<Self> { //TODO interceptor
let buffer = BufferClient::connect(dst.to_string()).await?; let (_host, _workspace_id) = parse_codemp_connection_string(dest);
let cursor = CursorClient::connect(dst.to_string()).await?;
let id = uuid::Uuid::new_v4().to_string();
Ok(Client { id, client: Services { buffer, cursor}, workspace: None })
}
/// return a reference to current cursor controller, if currently in a workspace let channel = Endpoint::from_shared(dest.to_string())?
pub fn get_cursor(&self) -> Option<Arc<CursorController>> { .connect()
Some(self.workspace.as_ref()?.cursor.clone()) .await?;
}
/// leave current workspace if in one, disconnecting buffer and cursor controllers let (token_tx, token_rx) = tokio::sync::watch::channel(
pub fn leave_workspace(&mut self) { Token { token: "".to_string() }
// TODO need to stop tasks? );
self.workspace = None
}
/// disconnect from a specific buffer let inter = ClientInterceptor { token: token_rx };
pub fn disconnect_buffer(&mut self, path: &str) -> bool {
match &mut self.workspace {
Some(w) => w.buffers.remove(path).is_some(),
None => false,
}
}
/// get a new reference to a buffer controller, if any is active to given path let buffer = BufferClient::with_interceptor(channel.clone(), inter.clone());
pub fn get_buffer(&self, path: &str) -> Option<Arc<BufferController>> { let cursor = CursorClient::with_interceptor(channel.clone(), inter.clone());
self.workspace.as_ref()?.buffers.get(path).cloned() let workspace = WorkspaceClient::with_interceptor(channel.clone(), inter.clone());
let user_id = uuid::Uuid::new_v4();
Ok(Client {
user_id,
token_tx: Arc::new(token_tx),
workspace: None,
services: Arc::new(Services { workspace, buffer, cursor })
})
} }
/// join a workspace, starting a cursorcontroller and returning a new reference to it /// join a workspace, starting a cursorcontroller and returning a new reference to it
/// ///
/// to interact with such workspace [crate::api::Controller::send] cursor events or /// to interact with such workspace [crate::api::Controller::send] cursor events or
/// [crate::api::Controller::recv] for events on the associated [crate::cursor::Controller]. /// [crate::api::Controller::recv] for events on the associated [crate::cursor::Controller].
pub async fn join(&mut self, _session: &str) -> crate::Result<Arc<CursorController>> { pub async fn join(&mut self, workspace_id: &str) -> crate::Result<()> {
// TODO there is no real workspace handling in codemp server so it behaves like one big global self.token_tx.send(self.services.workspace.clone().join(
// session. I'm still creating this to start laying out the proper use flow tonic::Request::new(JoinRequest { username: "".to_string(), password: "".to_string() }) //TODO
let stream = self.client.cursor.listen(UserIdentity { id: "".into() }).await?.into_inner(); ).await?.into_inner())?;
let controller = CursorControllerWorker::new(self.id.clone()); let (tx, rx) = mpsc::channel(10);
let client = self.client.cursor.clone(); let stream = self.services.cursor.clone()
.attach(tokio_stream::wrappers::ReceiverStream::new(rx))
let handle = Arc::new(controller.subscribe()); .await?
.into_inner();
let worker = CursorWorker::new(self.user_id.clone());
let controller = Arc::new(worker.subscribe());
tokio::spawn(async move { tokio::spawn(async move {
tracing::debug!("cursor worker started"); tracing::debug!("controller worker started");
controller.work(client, stream).await; worker.work(tx, stream).await;
tracing::debug!("cursor worker stopped"); tracing::debug!("controller worker stopped");
}); });
self.workspace = Some( self.workspace = Some(Workspace::new(
Workspace { workspace_id.to_string(),
cursor: handle.clone(), self.user_id,
buffers: BTreeMap::new() self.token_tx.clone(),
} controller,
); self.services.clone()
).await?);
Ok(handle) Ok(())
}
/// create a new buffer in current workspace, with optional given content
pub async fn create(&mut self, path: &str, content: Option<&str>) -> crate::Result<()> {
if let Some(_workspace) = &self.workspace {
self.client.buffer
.create(BufferPayload {
user: self.id.clone(),
path: path.to_string(),
content: content.map(|x| x.to_string()),
}).await?;
Ok(())
} else {
Err(Error::InvalidState { msg: "join a workspace first".into() })
}
}
/// attach to a buffer, starting a buffer controller and returning a new reference to it
///
/// to interact with such buffer use [crate::api::Controller::send] or
/// [crate::api::Controller::recv] to exchange [crate::api::TextChange]
pub async fn attach(&mut self, path: &str) -> crate::Result<Arc<BufferController>> {
if let Some(workspace) = &mut self.workspace {
let mut client = self.client.buffer.clone();
let req = BufferPayload {
path: path.to_string(), user: self.id.clone(), content: None
};
let stream = client.attach(req).await?.into_inner();
let controller = BufferControllerWorker::new(self.id.clone(), path);
let handler = Arc::new(controller.subscribe());
let _path = path.to_string();
tokio::spawn(async move {
tracing::debug!("buffer[{}] worker started", _path);
controller.work(client, stream).await;
tracing::debug!("buffer[{}] worker stopped", _path);
});
workspace.buffers.insert(path.to_string(), handler.clone());
Ok(handler)
} else {
Err(Error::InvalidState { msg: "join a workspace first".into() })
}
} }
} }

View file

@ -4,8 +4,9 @@
use tokio::sync::{mpsc, broadcast::{self, error::{TryRecvError, RecvError}}, Mutex, watch}; use tokio::sync::{mpsc, broadcast::{self, error::{TryRecvError, RecvError}}, Mutex, watch};
use tonic::async_trait; use tonic::async_trait;
use uuid::Uuid;
use crate::{proto::{CursorPosition, CursorEvent}, Error, api::Controller, errors::IgnorableError}; use crate::{api::Controller, errors::IgnorableError, proto::{cursor::{CursorEvent, CursorPosition}, user::UserIdentity}};
/// the cursor controller implementation /// the cursor controller implementation
/// ///
@ -20,7 +21,7 @@ use crate::{proto::{CursorPosition, CursorEvent}, Error, api::Controller, errors
/// upon dropping this handle will stop the associated worker /// upon dropping this handle will stop the associated worker
#[derive(Debug)] #[derive(Debug)]
pub struct CursorController { pub struct CursorController {
uid: String, user_id: Uuid,
op: mpsc::UnboundedSender<CursorEvent>, op: mpsc::UnboundedSender<CursorEvent>,
last_op: Mutex<watch::Receiver<CursorEvent>>, last_op: Mutex<watch::Receiver<CursorEvent>>,
stream: Mutex<broadcast::Receiver<CursorEvent>>, stream: Mutex<broadcast::Receiver<CursorEvent>>,
@ -35,13 +36,13 @@ impl Drop for CursorController {
impl CursorController { impl CursorController {
pub(crate) fn new( pub(crate) fn new(
uid: String, user_id: Uuid,
op: mpsc::UnboundedSender<CursorEvent>, op: mpsc::UnboundedSender<CursorEvent>,
last_op: Mutex<watch::Receiver<CursorEvent>>, last_op: Mutex<watch::Receiver<CursorEvent>>,
stream: Mutex<broadcast::Receiver<CursorEvent>>, stream: Mutex<broadcast::Receiver<CursorEvent>>,
stop: mpsc::UnboundedSender<()>, stop: mpsc::UnboundedSender<()>,
) -> Self { ) -> Self {
CursorController { uid, op, last_op, stream, stop } CursorController { user_id, op, last_op, stream, stop }
} }
} }
@ -51,13 +52,13 @@ impl Controller<CursorEvent> for CursorController {
/// enqueue a cursor event to be broadcast to current workspace /// enqueue a cursor event to be broadcast to current workspace
/// will automatically invert cursor start/end if they are inverted /// will automatically invert cursor start/end if they are inverted
fn send(&self, mut cursor: CursorPosition) -> Result<(), Error> { fn send(&self, mut cursor: CursorPosition) -> crate::Result<()> {
if cursor.start() > cursor.end() { if cursor.start() > cursor.end() {
std::mem::swap(&mut cursor.start, &mut cursor.end); std::mem::swap(&mut cursor.start, &mut cursor.end);
} }
Ok(self.op.send(CursorEvent { Ok(self.op.send(CursorEvent {
user: self.uid.clone(), user: UserIdentity { id: self.user_id.to_string() },
position: Some(cursor), position: cursor,
})?) })?)
} }
@ -67,7 +68,7 @@ impl Controller<CursorEvent> for CursorController {
match stream.try_recv() { match stream.try_recv() {
Ok(x) => Ok(Some(x)), Ok(x) => Ok(Some(x)),
Err(TryRecvError::Empty) => Ok(None), Err(TryRecvError::Empty) => Ok(None),
Err(TryRecvError::Closed) => Err(Error::Channel { send: false }), Err(TryRecvError::Closed) => Err(crate::Error::Channel { send: false }),
Err(TryRecvError::Lagged(n)) => { Err(TryRecvError::Lagged(n)) => {
tracing::warn!("cursor channel lagged, skipping {} events", n); tracing::warn!("cursor channel lagged, skipping {} events", n);
Ok(stream.try_recv().ok()) Ok(stream.try_recv().ok())
@ -78,11 +79,11 @@ impl Controller<CursorEvent> for CursorController {
// TODO is this cancelable? so it can be used in tokio::select! // TODO is this cancelable? so it can be used in tokio::select!
// TODO is the result type overkill? should be an option? // TODO is the result type overkill? should be an option?
/// get next cursor event from current workspace, or block until one is available /// get next cursor event from current workspace, or block until one is available
async fn recv(&self) -> Result<CursorEvent, Error> { async fn recv(&self) -> crate::Result<CursorEvent> {
let mut stream = self.stream.lock().await; let mut stream = self.stream.lock().await;
match stream.recv().await { match stream.recv().await {
Ok(x) => Ok(x), Ok(x) => Ok(x),
Err(RecvError::Closed) => Err(Error::Channel { send: false }), Err(RecvError::Closed) => Err(crate::Error::Channel { send: false }),
Err(RecvError::Lagged(n)) => { Err(RecvError::Lagged(n)) => {
tracing::error!("cursor channel lagged behind, skipping {} events", n); tracing::error!("cursor channel lagged behind, skipping {} events", n);
Ok(stream.recv().await.expect("could not receive after lagging")) Ok(stream.recv().await.expect("could not receive after lagging"))

View file

@ -12,7 +12,7 @@ pub mod controller;
pub use controller::CursorController as Controller; pub use controller::CursorController as Controller;
use crate::proto::{RowCol, CursorPosition}; use crate::proto::cursor::{RowCol, CursorPosition};
impl From::<RowCol> for (i32, i32) { impl From::<RowCol> for (i32, i32) {
fn from(pos: RowCol) -> (i32, i32) { fn from(pos: RowCol) -> (i32, i32) {
@ -36,12 +36,12 @@ impl RowCol {
impl CursorPosition { impl CursorPosition {
/// extract start position, defaulting to (0,0), to help build protocol packets /// extract start position, defaulting to (0,0), to help build protocol packets
pub fn start(&self) -> RowCol { pub fn start(&self) -> RowCol {
self.start.clone().unwrap_or((0, 0).into()) self.start.clone()
} }
/// extract end position, defaulting to (0,0), to help build protocol packets /// extract end position, defaulting to (0,0), to help build protocol packets
pub fn end(&self) -> RowCol { pub fn end(&self) -> RowCol {
self.end.clone().unwrap_or((0, 0).into()) self.end.clone()
} }
} }

View file

@ -1,14 +1,15 @@
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::{mpsc, broadcast::{self}, Mutex, watch}; use tokio::sync::{mpsc, broadcast::{self}, Mutex, watch};
use tonic::{Streaming, transport::Channel, async_trait}; use tonic::{Streaming, async_trait};
use uuid::Uuid;
use crate::{proto::{cursor_client::CursorClient, CursorEvent}, errors::IgnorableError, api::controller::ControllerWorker}; use crate::{api::controller::ControllerWorker, errors::IgnorableError, proto::cursor::CursorEvent};
use super::controller::CursorController; use super::controller::CursorController;
pub(crate) struct CursorControllerWorker { pub(crate) struct CursorWorker {
uid: String, user_id: Uuid,
producer: mpsc::UnboundedSender<CursorEvent>, producer: mpsc::UnboundedSender<CursorEvent>,
op: mpsc::UnboundedReceiver<CursorEvent>, op: mpsc::UnboundedReceiver<CursorEvent>,
changed: watch::Sender<CursorEvent>, changed: watch::Sender<CursorEvent>,
@ -18,14 +19,14 @@ pub(crate) struct CursorControllerWorker {
stop_control: mpsc::UnboundedSender<()>, stop_control: mpsc::UnboundedSender<()>,
} }
impl CursorControllerWorker { impl CursorWorker {
pub(crate) fn new(uid: String) -> Self { pub(crate) fn new(user_id: Uuid) -> Self {
let (op_tx, op_rx) = mpsc::unbounded_channel(); let (op_tx, op_rx) = mpsc::unbounded_channel();
let (cur_tx, _cur_rx) = broadcast::channel(64); let (cur_tx, _cur_rx) = broadcast::channel(64);
let (end_tx, end_rx) = mpsc::unbounded_channel(); let (end_tx, end_rx) = mpsc::unbounded_channel();
let (change_tx, change_rx) = watch::channel(CursorEvent::default()); let (change_tx, change_rx) = watch::channel(CursorEvent::default());
Self { Self {
uid, user_id,
producer: op_tx, producer: op_tx,
op: op_rx, op: op_rx,
changed: change_tx, changed: change_tx,
@ -40,12 +41,12 @@ impl CursorControllerWorker {
#[async_trait] #[async_trait]
impl ControllerWorker<CursorEvent> for CursorControllerWorker { impl ControllerWorker<CursorEvent> for CursorControllerWorker {
type Controller = CursorController; type Controller = CursorController;
type Tx = CursorClient<Channel>; type Tx = mpsc::Sender<CursorEvent>;
type Rx = Streaming<CursorEvent>; type Rx = Streaming<CursorEvent>;
fn subscribe(&self) -> CursorController { fn subscribe(&self) -> CursorController {
CursorController::new( CursorController::new(
self.uid.clone(), self.user_id.clone(),
self.producer.clone(), self.producer.clone(),
Mutex::new(self.last_op.clone()), Mutex::new(self.last_op.clone()),
Mutex::new(self.channel.subscribe()), Mutex::new(self.channel.subscribe()),
@ -53,19 +54,18 @@ impl ControllerWorker<CursorEvent> for CursorControllerWorker {
) )
} }
async fn work(mut self, mut tx: Self::Tx, mut rx: Self::Rx) { async fn work(mut self, tx: Self::Tx, mut rx: Self::Rx) {
loop { loop {
tokio::select!{ tokio::select!{
Ok(Some(cur)) = rx.message() => { Ok(Some(cur)) = rx.message() => {
if cur.user == self.uid { continue } if cur.user.id == self.user_id.to_string() { continue }
self.channel.send(cur.clone()).unwrap_or_warn("could not broadcast event"); self.channel.send(cur.clone()).unwrap_or_warn("could not broadcast event");
self.changed.send(cur).unwrap_or_warn("could not update last event"); self.changed.send(cur).unwrap_or_warn("could not update last event");
}, },
Some(op) = self.op.recv() => { tx.moved(op).await.unwrap_or_warn("could not update cursor"); }, Some(op) = self.op.recv() => { tx.send(op).await.unwrap_or_warn("could not update cursor"); },
Some(()) = self.stop.recv() => { break; }, Some(()) = self.stop.recv() => { break; },
else => break, else => break,
} }
} }
} }
} }

View file

@ -109,6 +109,13 @@ impl<T> From<tokio::sync::mpsc::error::SendError<T>> for Error {
} }
} }
#[cfg(feature = "client")]
impl<T> From<tokio::sync::watch::error::SendError<T>> for Error {
fn from(_value: tokio::sync::watch::error::SendError<T>) -> Self {
Error::Channel { send: true }
}
}
#[cfg(feature = "client")] #[cfg(feature = "client")]
impl From<tokio::sync::broadcast::error::RecvError> for Error { impl From<tokio::sync::broadcast::error::RecvError> for Error {
fn from(_value: tokio::sync::broadcast::error::RecvError) -> Self { fn from(_value: tokio::sync::broadcast::error::RecvError) -> Self {

View file

@ -151,9 +151,11 @@ pub mod client;
pub mod tools; pub mod tools;
/// client wrapper to handle memory persistence /// client wrapper to handle memory persistence
#[cfg(feature = "client")] #[cfg(feature = "backport")]
pub mod instance; pub mod instance;
pub mod workspace;
/// all-in-one imports : `use codemp::prelude::*;` /// all-in-one imports : `use codemp::prelude::*;`
pub mod prelude; pub mod prelude;
@ -181,6 +183,6 @@ pub use errors::Result;
#[cfg(all(feature = "client", feature = "sync"))] #[cfg(all(feature = "client", feature = "sync"))]
pub use instance::sync::Instance; pub use instance::sync::Instance;
#[cfg(all(feature = "client", not(feature = "sync")))] #[cfg(all(feature = "backport", not(feature = "sync")))]
pub use instance::a_sync::Instance; pub use instance::a_sync::Instance;

View file

@ -18,10 +18,10 @@ pub use crate::api::{
#[cfg(feature = "client")] #[cfg(feature = "client")]
pub use crate::{ pub use crate::{
Instance as CodempInstance, // Instance as CodempInstance,
client::Client as CodempClient, client::Client as CodempClient,
cursor::Controller as CodempCursorController, cursor::Controller as CodempCursorController,
buffer::Controller as CodempBufferController, // buffer::Controller as CodempBufferController,
}; };
#[cfg(feature = "proto")] #[cfg(feature = "proto")]

215
src/workspace.rs Normal file
View file

@ -0,0 +1,215 @@
use std::{collections::{BTreeMap, BTreeSet}, str::FromStr, sync::Arc};
use tokio::sync::mpsc;
use uuid::Uuid;
use crate::{
proto::{user::UserIdentity, workspace::{AttachRequest, BufferListRequest, BufferPayload, Token, UserListRequest}},
api::controller::ControllerWorker,
buffer::{self, worker::BufferWorker},
client::Services,
cursor
};
//TODO may contain more info in the future
#[derive(Debug, Clone)]
pub struct UserInfo {
pub uuid: Uuid
}
impl From<Uuid> for UserInfo {
fn from(uuid: Uuid) -> Self {
UserInfo {
uuid
}
}
}
impl From<UserIdentity> for Uuid {
fn from(uid: UserIdentity) -> Uuid {
Uuid::from_str(&uid.id).expect("expected an uuid")
}
}
/// list_users -> A() , B()
/// get_user_info(B) -> B(cacca, pipu@piu)
pub struct Workspace {
id: String,
user_id: Uuid,
token: Arc<tokio::sync::watch::Sender<Token>>,
cursor: Arc<cursor::Controller>,
buffers: BTreeMap<String, Arc<buffer::Controller>>,
filetree: BTreeSet<String>,
users: BTreeMap<Uuid, UserInfo>,
services: Arc<Services>
}
impl Workspace {
pub(crate) async fn new(
id: String,
user_id: Uuid,
token: Arc<tokio::sync::watch::Sender<Token>>,
cursor: Arc<cursor::Controller>,
services: Arc<Services>
) -> crate::Result<Self> {
let mut ws = Workspace {
id,
user_id,
token,
cursor,
buffers: BTreeMap::new(),
filetree: BTreeSet::new(),
users: BTreeMap::new(),
services
};
ws.fetch_buffers().await?;
ws.fetch_users().await?;
Ok(ws)
}
/// create a new buffer in current workspace, with optional given content
pub async fn create(&mut self, path: &str) -> crate::Result<()> {
let mut workspace_client = self.services.workspace.clone();
workspace_client.create(
tonic::Request::new(BufferPayload { path: path.to_string() })
).await?;
//add to filetree
self.filetree.insert(path.to_string());
Ok(())
}
/// attach to a buffer, starting a buffer controller and returning a new reference to it
///
/// to interact with such buffer use [crate::api::Controller::send] or
/// [crate::api::Controller::recv] to exchange [crate::api::TextChange]
pub async fn attach(&mut self, path: &str) -> crate::Result<Arc<buffer::Controller>> {
let mut worskspace_client = self.services.workspace.clone();
self.token.send(worskspace_client.attach(
tonic::Request::new(AttachRequest { id: path.to_string() })
).await?.into_inner())?;
let (tx, rx) = mpsc::channel(10);
let stream = self.services.buffer.clone()
.attach(tokio_stream::wrappers::ReceiverStream::new(rx))
.await?
.into_inner();
let worker = BufferWorker::new(self.user_id, path);
let controller = Arc::new(worker.subscribe());
tokio::spawn(async move {
tracing::debug!("controller worker started");
worker.work(tx, stream).await;
tracing::debug!("controller worker stopped");
});
self.buffers.insert(path.to_string(), controller.clone());
Ok(controller)
}
pub async fn fetch_buffers(&mut self) -> crate::Result<()> {
let mut workspace_client = self.services.workspace.clone();
let buffers = workspace_client.list_buffers(
tonic::Request::new(BufferListRequest {})
).await?.into_inner().buffers;
self.filetree.clear();
for b in buffers {
self.filetree.insert(b.path);
}
Ok(())
}
pub async fn fetch_users(&mut self) -> crate::Result<()> {
let mut workspace_client = self.services.workspace.clone();
let users = BTreeSet::from_iter(workspace_client.list_users(
tonic::Request::new(UserListRequest {})
).await?.into_inner().users.into_iter().map(Uuid::from));
// only keep userinfo for users that still exist
self.users.retain(|k, _v| users.contains(k));
let _users = self.users.clone(); // damnnn rust
users.iter()
.filter(|u| _users.contains_key(u))
.for_each(|u| { self.users.insert(*u, UserInfo::from(*u)); });
Ok(())
}
pub async fn list_buffer_users() {
todo!(); //TODO what is this
}
pub async fn delete(&mut self, path: &str) -> crate::Result<()> {
let mut workspace_client = self.services.workspace.clone();
workspace_client.delete(
tonic::Request::new(BufferPayload { path: path.to_string() })
).await?;
self.filetree.remove(path);
Ok(())
}
/// leave current workspace if in one, disconnecting buffer and cursor controllers
pub fn leave_workspace(&self) {
todo!(); //TODO need proto
}
/// disconnect from a specific buffer
pub fn disconnect_buffer(&mut self, path: &str) -> bool {
match &mut self.buffers.remove(path) {
None => false,
Some(_) => true
}
}
pub fn id(&self) -> String { self.id.clone() }
/// get a new reference to a buffer controller, if any is active to given path
pub fn buffer_by_name(&self, path: &str) -> Option<Arc<buffer::Controller>> {
self.buffers.get(path).cloned()
}
/// return a reference to current cursor controller, if currently in a workspace
pub fn cursor(&self) -> Arc<cursor::Controller> { self.cursor.clone() }
}
/*
impl Interceptor for Workspace { //TODO
fn call(&mut self, mut request: tonic::Request<()>) -> Result<tonic::Request<()>, tonic::Status> {
request.metadata_mut().insert("auth", self.token.token.parse().unwrap());
Ok(request)
}
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub enum FSNode {
File(String),
Directory(String, Vec<FSNode>),
}
fn file_tree_rec(path: &str, root: &mut Vec<FSNode>) {
if let Some(idx) = path.find("/") {
let dir = path[..idx].to_string();
let mut dir_node = vec![];
Self::file_tree_rec(&path[idx..], &mut dir_node);
root.push(FSNode::Directory(dir, dir_node));
} else {
root.push(FSNode::File(path.to_string()));
}
}
fn file_tree(&self) -> Vec<FSNode> {
let mut root = vec![];
for path in &self.filetree {
Self::file_tree_rec(&path, &mut root);
}
root
}
*/