chore: dramatically simplified everything

working on this was really hard, so i'm making simple things first.
removed almost everything except bare buffer changes, and not even done
in a smart way, but should be a working PoC? now trying to make a
working client to test it out and actually work on a real prototype
This commit is contained in:
əlemi 2023-04-07 03:05:21 +02:00
parent 192ce94ac6
commit ebbca24a99
26 changed files with 371 additions and 1378 deletions

View file

@ -3,9 +3,9 @@ name = "codemp"
version = "0.1.0"
edition = "2021"
[features]
default = ["nvim"]
nvim = []
# [features]
# default = ["nvim"]
# nvim = []
[lib]
name = "library"
@ -15,23 +15,35 @@ path = "src/lib/lib.rs"
name = "server"
path = "src/server/main.rs"
[[bin]] # Bin to run the CodeMP gRPC client
name = "client-neovim"
path = "src/client/main.rs"
[[bin]]
name = "client-cli"
path = "src/client/cli/main.rs"
required-features = ["cli"]
[[bin]]
name = "client-nvim"
path = "src/client/nvim/main.rs"
required-features = ["nvim"]
[dependencies]
tracing = "0.1"
tracing-subscriber = "0.3"
tonic = "0.7"
prost = "0.10"
futures = "0.3"
tonic = "0.9"
tokio = { version = "1.0", features = ["macros", "rt-multi-thread", "sync", "full"] }
tokio-stream = "0.1"
rmpv = "1"
operational-transform = "0.6"
nvim-rs = { version = "0.4", features = ["use_tokio"] } # TODO put this behind a conditional feature
uuid = { version = "1", features = ["v4", "fast-rng", "macro-diagnostics"] }
rand = "0.8.5"
serde = "1"
serde_json = "1"
operational-transform = { version = "0.6", features = ["serde"] }
md5 = "0.7.0"
prost = "0.11.8"
clap = { version = "4.2.1", features = ["derive"], optional = true }
nvim-rs = { version = "0.5", features = ["use_tokio"], optional = true }
[build-dependencies]
tonic-build = "0.7"
tonic-build = "0.9"
[features]
default = []
cli = ["dep:clap"]
nvim = ["dep:nvim-rs"]

View file

@ -1,6 +1,4 @@
fn main() -> Result<(), Box<dyn std::error::Error>> {
tonic_build::compile_protos("proto/session.proto")?;
tonic_build::compile_protos("proto/workspace.proto")?;
tonic_build::compile_protos("proto/buffer.proto")?;
Ok(())
}

View file

@ -2,35 +2,26 @@ syntax = "proto3";
package buffer;
service Buffer {
rpc Attach (stream Operation) returns (stream Operation);
rpc Push (BufferPayload) returns (BufferResponse);
rpc Pull (BufferPayload) returns (BufferPayload);
rpc Attach (BufferPayload) returns (stream RawOp);
rpc Edit (OperationRequest) returns (BufferResponse);
rpc Create (BufferPayload) returns (BufferResponse);
}
message Operation {
int64 opId = 1;
message RawOp {
string opseq = 1;
}
enum Action {
RETAIN = 0;
INSERT = 1;
DELETE = 2;
};
Action action = 2;
int32 row = 3;
int32 column = 4;
optional string text = 5;
message OperationRequest {
string path = 1;
string hash = 2;
string opseq = 3;
}
message BufferPayload {
string sessionKey = 1;
string path = 2;
optional string content = 3;
}
message BufferResponse {
string sessionKey = 1;
string path = 2;
bool accepted = 3;
}

View file

@ -1,25 +0,0 @@
syntax = "proto3";
package session;
service Session {
// rpc Authenticate(SessionRequest) returns (SessionResponse);
// rpc ListWorkspaces(SessionRequest) returns (WorkspaceList);
rpc CreateWorkspace(WorkspaceBuilderRequest) returns (SessionResponse);
}
message SessionRequest {
string sessionKey = 1;
}
message SessionResponse {
string sessionKey = 1;
bool accepted = 2;
}
message WorkspaceBuilderRequest {
string name = 1;
}
message WorkspaceList {
repeated string name = 1; // TODO add more fields
}

View file

@ -1,51 +0,0 @@
syntax = "proto3";
package workspace;
service Workspace {
rpc Join (JoinRequest) returns (stream WorkspaceEvent);
rpc Subscribe (stream CursorUpdate) returns (stream CursorUpdate);
rpc ListUsers (WorkspaceRequest) returns (UsersList);
rpc Buffers (WorkspaceRequest) returns (BufferList);
rpc NewBuffer (BufferRequest) returns (WorkspaceResponse);
rpc RemoveBuffer (BufferRequest) returns (WorkspaceResponse);
}
message JoinRequest {
string name = 1;
}
message WorkspaceEvent {
int32 id = 1;
optional string body = 2;
}
// nvim-rs passes everything as i64, so having them as i64 in the packet itself is convenient
// TODO can we make them i32 and save some space?
message CursorUpdate {
string username = 1;
int64 buffer = 2;
int64 col = 3;
int64 row = 4;
}
message WorkspaceRequest {
string sessionKey = 1;
}
message BufferRequest {
string sessionKey = 1;
string path = 2;
}
message WorkspaceResponse {
bool accepted = 1;
}
message BufferList {
repeated string path = 1;
}
message UsersList {
repeated string name = 1;
}

46
src/client/cli/main.rs Normal file
View file

@ -0,0 +1,46 @@
use clap::Parser;
use library::proto::{buffer_client::BufferClient, BufferPayload};
use tokio_stream::StreamExt;
#[derive(Parser, Debug)]
struct CliArgs {
/// path of buffer to create
path: String,
/// initial content for buffer
#[arg(short, long)]
content: Option<String>,
/// attach instead of creating a new buffer
#[arg(long, default_value_t = false)]
attach: bool,
/// host to connect to
#[arg(long, default_value = "http://[::1]:50051")]
host: String,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let args = CliArgs::parse();
let mut client = BufferClient::connect(args.host).await?;
let request = BufferPayload {
path: args.path,
content: args.content,
};
if !args.attach {
client.create(request.clone()).await.unwrap();
}
let mut stream = client.attach(request).await.unwrap().into_inner();
while let Some(item) = stream.next().await {
println!("> {:?}", item);
}
Ok(())
}

View file

@ -1,96 +0,0 @@
pub mod proto {
tonic::include_proto!("session");
tonic::include_proto!("workspace");
tonic::include_proto!("buffer");
}
use std::sync::Arc;
use tracing::error;
use tokio::sync::{mpsc, Mutex};
use tokio_stream::StreamExt;
use tokio_stream::wrappers::ReceiverStream;
use proto::{
workspace_client::WorkspaceClient,
session_client::SessionClient,
buffer_client::BufferClient,
WorkspaceBuilderRequest, JoinRequest, SessionResponse, CursorUpdate
};
use tonic::{transport::Channel, Status, Request, Response};
#[derive(Clone)]
pub struct Dispatcher {
name: String,
dp: Arc<Mutex<DispatcherWorker>>, // TODO use channels and don't lock
}
struct DispatcherWorker {
// TODO do I need all three? Did I design the server badly?
session: SessionClient<Channel>,
workspace: WorkspaceClient<Channel>,
_buffers: BufferClient<Channel>,
}
impl Dispatcher {
pub async fn connect(addr:String) -> Result<Dispatcher, tonic::transport::Error> {
let (s, w, b) = tokio::join!(
SessionClient::connect(addr.clone()),
WorkspaceClient::connect(addr.clone()),
BufferClient::connect(addr.clone()),
);
Ok(
Dispatcher {
name: format!("User#{}", rand::random::<u16>()),
dp: Arc::new(
Mutex::new(
DispatcherWorker { session: s?, workspace: w?, _buffers: b? }
)
)
}
)
}
pub async fn create_workspace(&self, name:String) -> Result<Response<SessionResponse>, Status> {
self.dp.lock().await.session.create_workspace(
Request::new(WorkspaceBuilderRequest { name })
).await
}
pub async fn join_workspace(&self, session_id:String) -> Result<(), Status> {
let mut req = Request::new(JoinRequest { name: self.name.clone() });
req.metadata_mut().append("workspace", session_id.parse().unwrap());
let mut stream = self.dp.lock().await.workspace.join(req).await?.into_inner();
let _worker = tokio::spawn(async move {
while let Some(pkt) = stream.next().await {
match pkt {
Ok(_event) => {
// TODO do something with events when they will mean something!
},
Err(e) => error!("Error receiving event | {}", e),
}
}
});
Ok(())
}
pub async fn start_cursor_worker(&self, session_id:String, feed:mpsc::Receiver<CursorUpdate>) -> Result<mpsc::Receiver<CursorUpdate>, Status> {
let mut in_stream = Request::new(ReceiverStream::new(feed));
in_stream.metadata_mut().append("workspace", session_id.parse().unwrap());
let mut stream = self.dp.lock().await.workspace.subscribe(in_stream).await?.into_inner();
let (tx, rx) = mpsc::channel(50);
let _worker = tokio::spawn(async move {
while let Some(pkt) = stream.next().await {
match pkt {
Ok(update) => tx.send(update).await.unwrap(), // TODO how to handle an error here?
Err(e) => error!("Error receiving cursor update | {}", e),
}
}
});
Ok(rx)
}
}

View file

@ -1,16 +0,0 @@
mod nvim;
pub mod dispatcher;
use dispatcher::Dispatcher;
#[tokio::main]
async fn main() -> Result<(), Box<(dyn std::error::Error + 'static)>> {
let dispatcher = Dispatcher::connect("http://[::1]:50051".into()).await.unwrap();
#[cfg(feature = "nvim")]
crate::nvim::run_nvim_client(dispatcher).await?;
Ok(())
}

91
src/client/nvim/main.rs Normal file
View file

@ -0,0 +1,91 @@
//! A basic example. Mainly for use in a test, but also shows off some basic
//! functionality.
use std::{env, error::Error, fs};
use rmpv::Value;
use tokio::io::Stdout;
use nvim_rs::{
compat::tokio::Compat, create::tokio as create, rpc::IntoVal, Handler, Neovim,
};
use tonic::async_trait;
#[derive(Clone)]
struct NeovimHandler {
}
#[async_trait]
impl Handler for NeovimHandler {
type Writer = Compat<Stdout>;
async fn handle_request(
&self,
name: String,
args: Vec<Value>,
nvim: Neovim<Compat<Stdout>>,
) -> Result<Value, Value> {
match name.as_ref() {
"ping" => Ok(Value::from("pong")),
_ => unimplemented!(),
}
}
async fn handle_notify(
&self,
name: String,
args: Vec<Value>,
nvim: Neovim<Compat<Stdout>>,
) {
}
}
#[tokio::main]
async fn main() {
let handler: NeovimHandler = NeovimHandler {};
let (nvim, io_handler) = create::new_parent(handler).await;
let curbuf = nvim.get_current_buf().await.unwrap();
let mut envargs = env::args();
let _ = envargs.next();
let testfile = envargs.next().unwrap();
fs::write(testfile, &format!("{:?}", curbuf.into_val())).unwrap();
// Any error should probably be logged, as stderr is not visible to users.
match io_handler.await {
Err(joinerr) => eprintln!("Error joining IO loop: '{}'", joinerr),
Ok(Err(err)) => {
if !err.is_reader_error() {
// One last try, since there wasn't an error with writing to the
// stream
nvim
.err_writeln(&format!("Error: '{}'", err))
.await
.unwrap_or_else(|e| {
// We could inspect this error to see what was happening, and
// maybe retry, but at this point it's probably best
// to assume the worst and print a friendly and
// supportive message to our users
eprintln!("Well, dang... '{}'", e);
});
}
if !err.is_channel_closed() {
// Closed channel usually means neovim quit itself, or this plugin was
// told to quit by closing the channel, so it's not always an error
// condition.
eprintln!("Error: '{}'", err);
let mut source = err.source();
while let Some(e) = source {
eprintln!("Caused by: '{}'", e);
source = e.source();
}
}
}
Ok(Ok(())) => {}
}
}

View file

@ -1,133 +0,0 @@
use std::sync::Arc;
use rmpv::Value;
use tokio::io::Stdout;
use nvim_rs::{compat::tokio::Compat, Handler, Neovim};
use nvim_rs::create::tokio::new_parent;
use tokio::sync::{mpsc, Mutex};
use crate::dispatcher::{Dispatcher, proto::CursorUpdate};
#[derive(Clone)]
pub struct NeovimHandler {
dispatcher: Dispatcher,
sink: Arc<Mutex<Option<mpsc::Sender<CursorUpdate>>>>,
}
impl NeovimHandler {
pub fn new(dispatcher: Dispatcher) -> Self {
NeovimHandler {
dispatcher,
sink: Arc::new(Mutex::new(None)),
}
}
}
#[tonic::async_trait]
impl Handler for NeovimHandler {
type Writer = Compat<Stdout>;
async fn handle_request(
&self,
name: String,
args: Vec<Value>,
neovim: Neovim<Compat<Stdout>>,
) -> Result<Value, Value> {
match name.as_ref() {
"ping" => Ok(Value::from("pong")),
"create" => {
if args.len() < 1 {
return Err(Value::from("[!] no session key"));
}
let res = self.dispatcher.create_workspace(args[0].to_string())
.await
.map_err(|e| Value::from(e.to_string()))?
.into_inner();
Ok(res.session_key.into())
},
"join" => {
if args.len() < 1 {
return Err(Value::from("[!] no session key"));
}
self.dispatcher.join_workspace(
args[0].as_str().unwrap().to_string(), // TODO throw err if it's not a string?
).await.map_err(|e| Value::from(e.to_string()))?;
Ok("OK".into())
},
"cursor-start" => {
if args.len() < 1 {
return Err(Value::from("[!] no session key"));
}
let (tx, stream) = mpsc::channel(50);
let mut rx = self.dispatcher.start_cursor_worker(
args[0].as_str().unwrap().to_string(), stream
).await.map_err(|e| Value::from(e.to_string()))?;
let sink = self.sink.clone();
sink.lock().await.replace(tx);
let _worker = tokio::spawn(async move {
let mut col : i64;
let mut row : i64;
let ns = neovim.create_namespace("Cursor").await.unwrap();
while let Some(update) = rx.recv().await {
neovim.exec_lua(format!("print('{:?}')", update).as_str(), vec![]).await.unwrap();
let buf = neovim.get_current_buf().await.unwrap();
buf.clear_namespace(ns, 0, -1).await.unwrap();
row = update.row as i64;
col = update.col as i64;
buf.add_highlight(ns, "ErrorMsg", row-1, col-1, col).await.unwrap();
}
sink.lock().await.take();
});
Ok("OK".into())
},
_ => {
eprintln!("[!] unexpected call");
Ok(Value::from(""))
},
}
}
async fn handle_notify(
&self,
name: String,
args: Vec<Value>,
_neovim: Neovim<Compat<Stdout>>,
) {
match name.as_ref() {
"insert" => {},
"cursor" => {
if args.len() >= 3 {
if let Some(sink) = self.sink.lock().await.as_ref() {
sink.send(CursorUpdate {
buffer: args[0].as_i64().unwrap(),
row: args[1].as_i64().unwrap(),
col: args[2].as_i64().unwrap(),
username: "root".into()
}).await.unwrap();
}
}
},
"tick" => eprintln!("tock"),
_ => eprintln!("[!] unexpected notify",)
}
}
}
pub async fn run_nvim_client(dispatcher: Dispatcher) -> Result<(), Box<dyn std::error::Error + 'static>> {
let handler: NeovimHandler = NeovimHandler::new(dispatcher);
let (_nvim, io_handler) = new_parent(handler).await;
// Any error should probably be logged, as stderr is not visible to users.
match io_handler.await {
Err(err) => eprintln!("Error joining IO loop: {:?}", err),
Ok(Err(err)) => eprintln!("Process ended with error: {:?}", err),
Ok(Ok(())) => eprintln!("Finished"),
}
Ok(())
}

View file

@ -1,100 +0,0 @@
use std::fmt::Display;
use crate::user::User;
#[derive(Debug, Clone)]
pub enum Event {
UserJoin { user: User },
UserLeave { name: String },
BufferNew { path: String },
BufferDelete { path: String },
}
impl Display for Event {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::UserJoin { user } => write!(f, "UserJoin(user:{})", user),
Self::UserLeave { name } => write!(f, "UserLeave(user:{})", name),
Self::BufferNew { path } => write!(f, "BufferNew(path:{})", path),
Self::BufferDelete { path } => write!(f, "BufferDelete(path:{})", path),
}
}
}
// pub type Event = Box<dyn EventInterface>;
//
// pub trait EventInterface {
// fn class(&self) -> EventClass;
// fn unwrap(e: Event) -> Option<Self> where Self: Sized;
//
// fn wrap(self) -> Event {
// Box::new(self)
// }
// }
//
//
// // User joining workspace
//
// pub struct UserJoinEvent {
// user: User,
// }
//
// impl EventInterface for UserJoinEvent {
// fn class(&self) -> EventClass { EventClass::UserJoin }
// fn unwrap(e: Event) -> Option<Self> where Self: Sized {
// if matches!(e.class(), EventClass::UserJoin) {
// return Some(*e);
// }
// None
// }
// }
//
//
// // User leaving workspace
//
// pub struct UserLeaveEvent {
// name: String,
// }
//
// impl EventInterface for UserLeaveEvent {
// fn class(&self) -> EventClass { EventClass::UserLeave }
// }
//
//
// // Cursor movement
//
// pub struct CursorEvent {
// user: String,
// cursor: UserCursor,
// }
//
// impl EventInterface for CursorEvent {
// fn class(&self) -> EventClass { EventClass::Cursor }
// }
//
// impl CursorEvent {
// pub fn new(user:String, cursor: UserCursor) -> Self {
// CursorEvent { user, cursor }
// }
// }
//
//
// // Buffer added
//
// pub struct BufferNewEvent {
// path: String,
// }
//
// impl EventInterface for BufferNewEvent {
// fn class(&self) -> EventClass { EventClass::BufferNew }
// }
//
//
// // Buffer deleted
//
// pub struct BufferDeleteEvent {
// path: String,
// }
//
// impl EventInterface for BufferDeleteEvent {
// fn class(&self) -> EventClass { EventClass::BufferDelete }
// }

View file

@ -1,2 +1,4 @@
pub mod events;
pub mod user;
pub mod proto;
pub use tonic;
pub use tokio;

1
src/lib/proto.rs Normal file
View file

@ -0,0 +1 @@
tonic::include_proto!("buffer");

View file

@ -1,27 +0,0 @@
use std::fmt::Display;
#[derive(Debug, Clone)]
pub struct UserCursor{
pub buffer: i64,
pub x: i64,
pub y: i64
}
impl Display for UserCursor {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Cursor(buffer:{}, x:{}, y:{})", self.buffer, self.x, self.y)
}
}
#[derive(Debug, Clone)]
pub struct User {
pub name: String,
pub cursor: UserCursor,
}
impl Display for User {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "User(name:{}, cursor:{})", self.name, self.cursor)
}
}

View file

@ -1,67 +0,0 @@
use operational_transform::OperationSeq;
use tokio::sync::{broadcast, mpsc, watch};
use tracing::error;
use library::events::Event;
#[derive(Debug, Clone)]
/// A view of a buffer, with references to access value and send operations
pub struct BufferView {
pub name: String,
pub content: watch::Receiver<String>,
op_tx: mpsc::Sender<OperationSeq>,
}
impl BufferView {
pub async fn op(&self, op: OperationSeq) -> Result<(), mpsc::error::SendError<OperationSeq>> {
self.op_tx.send(op).await
}
}
#[derive(Debug)]
pub struct Buffer {
view: BufferView,
run: watch::Sender<bool>,
}
impl Drop for Buffer {
fn drop(&mut self) {
self.run.send(false).unwrap_or_else(|e| {
error!("Could not stop Buffer worker task: {:?}", e);
});
}
}
impl Buffer {
pub fn new(name: String, _bus: broadcast::Sender<Event>) -> Self {
let (op_tx, mut op_rx) = mpsc::channel(32);
let (stop_tx, stop_rx) = watch::channel(true);
let (content_tx, content_rx) = watch::channel(String::new());
let b = Buffer {
run: stop_tx,
view: BufferView {
name: name.clone(),
op_tx,
content: content_rx,
},
};
tokio::spawn(async move {
let mut content = String::new();
while stop_rx.borrow().to_owned() {
// TODO handle these errors!!
let op = op_rx.recv().await.unwrap();
content = op.apply(content.as_str()).unwrap();
// bus.send((name.clone(), op)).unwrap(); // TODO fails when there are no receivers subscribed
content_tx.send(content.clone()).unwrap();
}
});
return b;
}
pub fn view(&self) -> BufferView {
return self.view.clone();
}
}

View file

@ -1,3 +0,0 @@
pub mod buffer;
pub mod workspace;
pub mod state;

View file

@ -1,106 +0,0 @@
use std::{collections::HashMap, sync::Arc};
use tokio::sync::{mpsc, watch};
use tracing::error;
use uuid::Uuid;
use crate::actor::workspace::Workspace;
#[derive(Debug)]
enum WorkspaceAction {
ADD {
key: Uuid,
w: Box<Workspace>,
},
REMOVE {
key: Uuid
},
}
#[derive(Debug, Clone)]
pub struct WorkspacesView {
watch: watch::Receiver<HashMap<Uuid, Arc<Workspace>>>,
op: mpsc::Sender<WorkspaceAction>,
}
impl WorkspacesView {
pub fn borrow(&self) -> watch::Ref<HashMap<Uuid, Arc<Workspace>>> {
self.watch.borrow()
}
pub async fn add(&mut self, w: Workspace) {
self.op.send(WorkspaceAction::ADD { key: w.id, w: Box::new(w) }).await.unwrap();
}
pub async fn remove(&mut self, key: Uuid) {
self.op.send(WorkspaceAction::REMOVE { key }).await.unwrap();
}
}
#[derive(Debug)]
pub struct StateManager {
pub workspaces: WorkspacesView,
pub run: watch::Receiver<bool>,
run_tx: watch::Sender<bool>,
}
impl Drop for StateManager {
fn drop(&mut self) {
self.run_tx.send(false).unwrap_or_else(|e| {
error!("Could not stop StateManager worker: {:?}", e);
})
}
}
impl StateManager {
pub fn new() -> Self {
let (tx, rx) = mpsc::channel(32); // TODO quantify backpressure
let (workspaces_tx, workspaces_rx) = watch::channel(HashMap::new());
let (run_tx, run_rx) = watch::channel(true);
let s = StateManager {
workspaces: WorkspacesView { watch: workspaces_rx, op: tx },
run_tx, run: run_rx,
};
s.workspaces_worker(rx, workspaces_tx);
return s;
}
fn workspaces_worker(&self, mut rx: mpsc::Receiver<WorkspaceAction>, tx: watch::Sender<HashMap<Uuid, Arc<Workspace>>>) {
let run = self.run.clone();
tokio::spawn(async move {
let mut store = HashMap::new();
while run.borrow().to_owned() {
if let Some(event) = rx.recv().await {
match event {
WorkspaceAction::ADD { key, w } => {
store.insert(key, Arc::new(*w)); // TODO put in hashmap
},
WorkspaceAction::REMOVE { key } => {
store.remove(&key);
},
}
tx.send(store.clone()).unwrap();
} else {
break
}
}
});
}
pub fn view(&self) -> WorkspacesView {
return self.workspaces.clone();
}
/// get a workspace Arc directly, without passing by the WorkspacesView
pub fn get(&self, key: &Uuid) -> Option<Arc<Workspace>> {
if let Some(w) = self.workspaces.borrow().get(key) {
return Some(w.clone());
}
return None;
}
}

View file

@ -1,228 +0,0 @@
use std::collections::HashMap;
use tokio::sync::{broadcast, mpsc, watch::{self, Ref}};
use tracing::{warn, info};
use library::{events::Event, user::{User, UserCursor}};
use crate::service::workspace::proto::CursorUpdate;
use super::buffer::{BufferView, Buffer};
#[derive(Debug, Clone)]
pub struct UsersView {
watch: watch::Receiver<HashMap<String, User>>,
op: mpsc::Sender<UserAction>,
}
impl UsersView { // TODO don't unwrap everything!
pub fn borrow(&self) -> Ref<HashMap<String, User>> {
return self.watch.borrow();
}
pub async fn add(&mut self, user: User) {
self.op.send(UserAction::ADD{ user }).await.unwrap();
}
pub async fn remove(&mut self, name: String) {
self.op.send(UserAction::REMOVE{ name }).await.unwrap();
}
pub async fn update(&mut self, user_name: String, cursor: UserCursor) {
self.op.send(UserAction::CURSOR { name: user_name, cursor }).await.unwrap();
}
}
#[derive(Debug, Clone)]
pub struct BuffersTreeView {
watch: watch::Receiver<HashMap<String, BufferView>>,
op: mpsc::Sender<BufferAction>,
}
impl BuffersTreeView {
pub fn borrow(&self) -> Ref<HashMap<String, BufferView>> {
return self.watch.borrow();
}
pub async fn add(&mut self, buffer: Buffer) {
self.op.send(BufferAction::ADD { buffer }).await.unwrap();
}
pub async fn remove(&mut self, path: String) {
self.op.send(BufferAction::REMOVE { path }).await.unwrap();
}
}
pub struct WorkspaceView {
rx: broadcast::Receiver<Event>,
pub users: UsersView,
pub buffers: BuffersTreeView,
}
impl WorkspaceView {
pub async fn event(&mut self) -> Result<Event, broadcast::error::RecvError> {
self.rx.recv().await
}
}
// Must be clonable, containing references to the actual state maybe? Or maybe give everyone an Arc, idk
#[derive(Debug)]
pub struct Workspace {
pub id: uuid::Uuid,
pub name: String,
pub bus: broadcast::Sender<Event>,
pub cursors: broadcast::Sender<CursorUpdate>,
pub buffers: BuffersTreeView,
pub users: UsersView,
run_tx: watch::Sender<bool>,
run_rx: watch::Receiver<bool>,
}
impl Drop for Workspace {
fn drop(&mut self) {
self.run_tx.send(false).unwrap_or_else(|e| warn!("could not stop workspace worker: {:?}", e));
}
}
impl Workspace {
pub fn new(name: String) -> Self {
let (op_buf_tx, op_buf_rx) = mpsc::channel::<BufferAction>(32);
let (op_usr_tx, op_usr_rx) = mpsc::channel::<UserAction>(32);
let (run_tx, run_rx) = watch::channel::<bool>(true);
let (buffer_tx, buffer_rx) = watch::channel::<HashMap<String, BufferView>>(HashMap::new());
let (users_tx, users_rx) = watch::channel(HashMap::new());
let (broadcast_tx, _broadcast_rx) = broadcast::channel::<Event>(32);
let (cursors_tx, _cursors_rx) = broadcast::channel::<CursorUpdate>(32);
let w = Workspace {
id: uuid::Uuid::new_v4(),
name,
bus: broadcast_tx,
cursors: cursors_tx,
buffers: BuffersTreeView{ op: op_buf_tx, watch: buffer_rx },
users: UsersView{ op: op_usr_tx, watch: users_rx },
run_tx,
run_rx,
};
w.users_worker(op_usr_rx, users_tx); // spawn worker to handle users
w.buffers_worker(op_buf_rx, buffer_tx); // spawn worker to handle buffers
//
info!("new workspace created: {}[{}]", w.name, w.id);
return w;
}
fn buffers_worker(&self, mut rx: mpsc::Receiver<BufferAction>, tx: watch::Sender<HashMap<String, BufferView>>) {
let bus = self.bus.clone();
let run = self.run_rx.clone();
tokio::spawn(async move {
let mut buffers : HashMap<String, Buffer> = HashMap::new();
while run.borrow().to_owned() {
// TODO handle these errors!!
let action = rx.recv().await.unwrap();
match action {
BufferAction::ADD { buffer } => {
let view = buffer.view();
buffers.insert(view.name.clone(), buffer);
bus.send(Event::BufferNew { path: view.name }).unwrap();
}
BufferAction::REMOVE { path } => {
buffers.remove(&path);
bus.send(Event::BufferDelete { path }).unwrap();
}
}
tx.send(
buffers.iter()
.map(|(k, v)| (k.clone(), v.view()))
.collect()
).unwrap();
}
});
}
fn users_worker(&self, mut rx: mpsc::Receiver<UserAction>, tx: watch::Sender<HashMap<String, User>>) {
let bus = self.bus.clone();
let cursors_tx = self.cursors.clone();
let run = self.run_rx.clone();
tokio::spawn(async move {
let mut cursors_rx = cursors_tx.subscribe();
let mut users : HashMap<String, User> = HashMap::new();
while run.borrow().to_owned() {
tokio::select!{
action = rx.recv() => {
match action.unwrap() {
UserAction::ADD { user } => {
users.insert(user.name.clone(), user.clone());
bus.send(Event::UserJoin { user }).unwrap();
},
UserAction::REMOVE { name } => {
if let None = users.remove(&name) {
continue; // don't update channel since this was a no-op
} else {
bus.send(Event::UserLeave { name }).unwrap();
}
},
UserAction::CURSOR { name, cursor } => {
if let Some(user) = users.get_mut(&name) {
user.cursor = cursor.clone();
} else {
continue; // don't update channel since this was a no-op
}
},
};
},
cursor = cursors_rx.recv() => {
let cursor = cursor.unwrap();
if let Some(user) = users.get_mut(&cursor.username) {
user.cursor = UserCursor { buffer: cursor.buffer, x:cursor.col, y:cursor.row };
}
}
}
tx.send(
users.iter()
.map(|(k, u)| (k.clone(), u.clone()))
.collect()
).unwrap();
}
});
}
pub fn view(&self) -> WorkspaceView {
WorkspaceView {
rx: self.bus.subscribe(),
users: self.users.clone(),
buffers: self.buffers.clone(),
}
}
}
#[derive(Debug)]
enum UserAction {
ADD {
user: User,
},
REMOVE {
name: String,
},
CURSOR {
name: String,
cursor: UserCursor,
},
}
#[derive(Debug)]
enum BufferAction {
ADD {
buffer: Buffer,
},
REMOVE {
path: String, // TODO remove by id?
},
}

View file

@ -0,0 +1,83 @@
use tokio::sync::{mpsc, broadcast, watch};
use tracing::error;
use md5::Digest;
use operational_transform::OperationSeq;
pub trait BufferStore<T> {
fn get(&self, key: &T) -> Option<&BufferHandle>;
fn put(&mut self, key: T, handle: BufferHandle) -> Option<BufferHandle>;
fn handle(&mut self, key: T, content: Option<String>) {
let handle = BufferHandle::new(content);
self.put(key, handle);
}
}
#[derive(Clone)]
pub struct BufferHandle {
pub edit: mpsc::Sender<OperationSeq>,
events: broadcast::Sender<OperationSeq>,
pub digest: watch::Receiver<Digest>,
}
impl BufferHandle {
fn new(init: Option<String>) -> Self {
let init_val = init.unwrap_or("".into());
let (edits_tx, edits_rx) = mpsc::channel(64); // TODO hardcoded size
let (events_tx, _events_rx) = broadcast::channel(64); // TODO hardcoded size
let (digest_tx, digest_rx) = watch::channel(md5::compute(&init_val));
let events_tx_clone = events_tx.clone();
tokio::spawn(async move {
let worker = BufferWorker {
content: init_val,
edits: edits_rx,
events: events_tx_clone,
digest: digest_tx,
};
worker.work().await
});
BufferHandle {
edit: edits_tx,
events: events_tx,
digest: digest_rx,
}
}
pub fn subscribe(&self) -> broadcast::Receiver<OperationSeq> {
self.events.subscribe()
}
}
struct BufferWorker {
content: String,
edits: mpsc::Receiver<OperationSeq>,
events: broadcast::Sender<OperationSeq>,
digest: watch::Sender<Digest>,
}
impl BufferWorker {
async fn work(mut self) {
loop {
match self.edits.recv().await {
None => break,
Some(v) => {
match v.apply(&self.content) {
Ok(res) => {
self.content = res;
self.digest.send(md5::compute(&self.content)).unwrap();
if let Err(e) = self.events.send(v) {
error!("could not broadcast OpSeq: {}", e);
}
},
Err(e) => error!("coult not apply OpSeq '{:?}' on '{}' : {}", v, self.content, e),
}
},
}
}
}
}

2
src/server/buffer/mod.rs Normal file
View file

@ -0,0 +1,2 @@
pub mod actor;
pub mod service;

View file

@ -0,0 +1,105 @@
use std::{pin::Pin, sync::{Arc, RwLock}, collections::HashMap};
use operational_transform::OperationSeq;
use tokio::sync::mpsc;
use tonic::{Request, Response, Status};
use tokio_stream::{Stream, wrappers::ReceiverStream}; // TODO example used this?
use library::proto::{buffer_server::{Buffer, BufferServer}, RawOp, BufferPayload, BufferResponse, OperationRequest};
use tracing::info;
use super::actor::{BufferHandle, BufferStore};
type OperationStream = Pin<Box<dyn Stream<Item = Result<RawOp, Status>> + Send>>;
struct BufferMap {
store: HashMap<String, BufferHandle>,
}
impl From::<HashMap<String, BufferHandle>> for BufferMap {
fn from(value: HashMap<String, BufferHandle>) -> Self {
BufferMap { store: value }
}
}
impl BufferStore<String> for BufferMap {
fn get(&self, key: &String) -> Option<&BufferHandle> {
self.store.get(key)
}
fn put(&mut self, key: String, handle: BufferHandle) -> Option<BufferHandle> {
self.store.insert(key, handle)
}
}
pub struct BufferService {
map: Arc<RwLock<BufferMap>>,
}
#[tonic::async_trait]
impl Buffer for BufferService {
type AttachStream = OperationStream;
async fn attach(&self, req: Request<BufferPayload>) -> Result<tonic::Response<OperationStream>, Status> {
let request = req.into_inner();
match self.map.read().unwrap().get(&request.path) {
Some(handle) => {
let (tx, rx) = mpsc::channel(128);
let mut sub = handle.subscribe();
tokio::spawn(async move {
loop {
match sub.recv().await {
Ok(v) => {
let snd = RawOp { opseq: serde_json::to_string(&v).unwrap() };
tx.send(Ok(snd)).await.unwrap();
}
Err(_e) => break,
}
}
});
let output_stream = ReceiverStream::new(rx);
info!("registered new subscriber on buffer");
Ok(Response::new(Box::pin(output_stream)))
},
None => Err(Status::not_found("path not found")),
}
}
async fn edit(&self, req:Request<OperationRequest>) -> Result<Response<BufferResponse>, Status> {
let request = req.into_inner();
let tx = match self.map.read().unwrap().get(&request.path) {
Some(handle) => {
if format!("{:x}", *handle.digest.borrow()) != request.hash {
return Ok(Response::new(BufferResponse { accepted : false } ));
}
handle.edit.clone()
},
None => return Err(Status::not_found("path not found")),
};
let opseq : OperationSeq = serde_json::from_str(&request.opseq).unwrap();
tx.send(opseq).await.unwrap();
info!("sent edit to buffer");
Ok(Response::new(BufferResponse { accepted: true }))
}
async fn create(&self, req:Request<BufferPayload>) -> Result<Response<BufferResponse>, Status> {
let request = req.into_inner();
let _handle = self.map.write().unwrap().handle(request.path, request.content);
info!("created new buffer");
let answ = BufferResponse { accepted: true };
Ok(Response::new(answ))
}
}
impl BufferService {
pub fn new() -> BufferService {
BufferService {
map: Arc::new(RwLock::new(HashMap::new().into())),
}
}
pub fn server(self) -> BufferServer<BufferService> {
BufferServer::new(self)
}
}

View file

@ -4,19 +4,13 @@
//! all clients and synching everyone's cursor.
//!
pub mod actor;
pub mod service;
use std::sync::Arc;
mod buffer;
use tracing::info;
use tonic::transport::Server;
use crate::{
actor::state::StateManager,
service::{buffer::BufferService, workspace::WorkspaceService, session::SessionService},
};
use crate::buffer::service::BufferService;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
@ -24,14 +18,10 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let addr = "[::1]:50051".parse()?;
let state = Arc::new(StateManager::new());
info!("Starting server");
Server::builder()
.add_service(SessionService::new(state.clone()).server())
.add_service(WorkspaceService::new(state.clone()).server())
.add_service(BufferService::new(state.clone()).server())
.add_service(BufferService::new().server())
.serve(addr)
.await?;

View file

@ -1,156 +0,0 @@
use std::collections::VecDeque;
use std::{pin::Pin, sync::Arc};
use uuid::Uuid;
use tokio_stream::wrappers::ReceiverStream;
use tracing::error;
use operational_transform::OperationSeq;
use tonic::{Request, Response, Status};
pub mod proto {
tonic::include_proto!("buffer");
}
use library::events::Event;
use tokio::sync::{broadcast, mpsc};
use tokio_stream::{Stream, StreamExt}; // TODO example used this?
use proto::buffer_server::{Buffer, BufferServer};
use proto::Operation;
use tonic::Streaming;
//use futures::{Stream, StreamExt};
use crate::actor::{buffer::BufferView, state::StateManager};
use self::proto::{BufferPayload, BufferResponse}; // TODO fuck x2!
type OperationStream = Pin<Box<dyn Stream<Item = Result<Operation, Status>> + Send>>;
pub struct BufferService {
state: Arc<StateManager>,
}
fn op_seq(_o: &Operation) -> OperationSeq {
todo!()
}
fn _op_net(_o: &OperationSeq) -> Operation {
todo!()
}
// async fn buffer_worker(tx: mpsc::Sender<Result<Operation, Status>>, mut rx:Streaming<Operation>, mut rx_core: mpsc::Receiver<Operation>) {
async fn buffer_worker(
bv: BufferView,
mut client_rx: Streaming<Operation>,
_tx_client: mpsc::Sender<Result<Operation, Status>>,
mut rx_core: broadcast::Receiver<Event>,
) {
let mut queue: VecDeque<Operation> = VecDeque::new();
loop {
tokio::select! {
client_op = client_rx.next() => {
if let Some(result) = client_op {
match result {
Ok(op) => {
bv.op(op_seq(&op)).await.unwrap(); // TODO make OpSeq from network Operation pkt!
queue.push_back(op);
},
Err(status) => {
error!("error receiving op from client: {:?}", status);
break;
}
}
}
},
server_op = rx_core.recv() => {
if let Ok(_oop) = server_op {
let mut send_op = true;
for (i, _op) in queue.iter().enumerate() {
if true { // TODO must compare underlying OperationSeq here! (op.equals(server_op))
queue.remove(i);
send_op = false;
break;
} else {
// serv_op.transform(op); // TODO transform OpSeq !
}
}
if send_op {
// tx_client.send(Ok(op_net(&oop.1))).await.unwrap();
}
}
}
}
}
}
#[tonic::async_trait]
impl Buffer for BufferService {
// type ServerStreamingEchoStream = ResponseStream;
type AttachStream = OperationStream;
async fn attach(
&self,
req: Request<Streaming<Operation>>,
) -> Result<tonic::Response<OperationStream>, Status> {
let session_id: String;
if let Some(sid) = req.metadata().get("session_id") {
session_id = sid.to_str().unwrap().to_string();
} else {
return Err(Status::failed_precondition(
"Missing metadata key 'session_id'",
));
}
let path: String;
if let Some(p) = req.metadata().get("path") {
path = p.to_str().unwrap().to_string();
} else {
return Err(Status::failed_precondition("Missing metadata key 'path'"));
}
// TODO make these above nicer? more concise? idk
if let Some(workspace) = self.state.workspaces.borrow().get(&Uuid::parse_str(session_id.as_str()).unwrap()) {
let in_stream = req.into_inner();
let (tx_og, rx) = mpsc::channel::<Result<Operation, Status>>(128);
let b: BufferView = workspace.buffers.borrow().get(&path).unwrap().clone();
let w = workspace.clone();
tokio::spawn(async move {
buffer_worker(b, in_stream, tx_og, w.bus.subscribe()).await;
});
// echo just write the same data that was received
let out_stream = ReceiverStream::new(rx);
return Ok(Response::new(Box::pin(out_stream) as Self::AttachStream));
} else {
return Err(Status::not_found(format!(
"Norkspace with session_id {}",
session_id
)));
}
}
async fn push(&self, _req:Request<BufferPayload>) -> Result<Response<BufferResponse>, Status> {
todo!()
}
async fn pull(&self, _req:Request<BufferPayload>) -> Result<Response<BufferPayload>, Status> {
todo!()
}
}
impl BufferService {
pub fn new(state: Arc<StateManager>) -> BufferService {
BufferService { state }
}
pub fn server(self) -> BufferServer<BufferService> {
BufferServer::new(self)
}
}

View file

@ -1,3 +0,0 @@
pub mod buffer;
pub mod session;
pub mod workspace;

View file

@ -1,59 +0,0 @@
pub mod proto {
tonic::include_proto!("session");
}
use std::sync::Arc;
use proto::{session_server::Session, WorkspaceBuilderRequest, SessionResponse};
use tonic::{Request, Response, Status};
use crate::actor::{
state::StateManager, workspace::Workspace, // TODO fuck x2!
};
use self::proto::session_server::SessionServer;
#[derive(Debug)]
pub struct SessionService {
state: Arc<StateManager>,
}
#[tonic::async_trait]
impl Session for SessionService {
async fn create_workspace(
&self,
_req: Request<WorkspaceBuilderRequest>,
) -> Result<Response<SessionResponse>, Status> {
// let name = req.extensions().get::<WorkspaceExtension>().unwrap().id.clone();
let w = Workspace::new("im lazy".into());
let res = SessionResponse { accepted:true, session_key: w.id.to_string() };
self.state.view().add(w).await;
Ok(Response::new(res))
}
// async fn authenticate(
// &self,
// req: Request<SessionRequest>,
// ) -> Result<Response<SessionResponse>, Status> {
// todo!()
// }
// async fn list_workspaces(
// &self,
// req: Request<SessionRequest>,
// ) -> Result<Response<WorkspaceList>, Status> {
// todo!()
// }
}
impl SessionService {
pub fn new(state: Arc<StateManager>) -> SessionService {
SessionService { state }
}
pub fn server(self) -> SessionServer<SessionService> {
SessionServer::new(self)
}
}

View file

@ -1,258 +0,0 @@
use std::{pin::Pin, sync::Arc};
use uuid::Uuid;
use tonic::codegen::InterceptedService;
use tonic::service::Interceptor;
use tracing::info;
use tokio_stream::wrappers::ReceiverStream;
use tonic::{Request, Response, Status, Streaming};
use tokio::sync::{watch, mpsc};
pub mod proto {
tonic::include_proto!("workspace");
}
use library::user::User;
use tokio_stream::{Stream, StreamExt}; // TODO example used this?
use proto::workspace_server::{Workspace, WorkspaceServer};
use proto::{BufferList, WorkspaceEvent, WorkspaceRequest, WorkspaceResponse, UsersList, BufferRequest, CursorUpdate, JoinRequest};
use library::user::UserCursor;
use crate::actor::{buffer::Buffer, state::StateManager}; // TODO fuck x2!
pub struct WorkspaceExtension {
pub id: String
}
#[derive(Debug, Clone)]
pub struct WorkspaceInterceptor {
state: Arc<StateManager>,
}
impl Interceptor for WorkspaceInterceptor {
fn call(&mut self, mut req: tonic::Request<()>) -> Result<tonic::Request<()>, Status> {
// Set an extension that can be retrieved by `say_hello`
let id;
// TODO this is kinda spaghetti but I can't borrow immutably and mutably req inside this match
// tree...
match req.metadata().get("workspace") {
Some(value) => {
info!("Metadata: {:?}", value);
match value.to_str() {
Ok(w_id) => {
id = w_id.to_string();
},
Err(_) => return Err(Status::invalid_argument("Workspace key is not valid")),
}
},
None => return Err(Status::unauthenticated("No workspace key included in request"))
}
info!("checking request : {}", id);
let uid = match Uuid::parse_str(id.as_str()) {
Ok(id) => id,
Err(e) => { return Err(Status::invalid_argument(format!("Invalid uuid : {}", e))); },
};
if !self.state.workspaces.borrow().contains_key(&uid) {
return Err(Status::not_found(format!("Workspace '{}' could not be found", id)));
}
req.extensions_mut().insert(WorkspaceExtension { id });
Ok(req)
}
}
type EventStream = Pin<Box<dyn Stream<Item = Result<WorkspaceEvent, Status>> + Send>>;
type CursorUpdateStream = Pin<Box<dyn Stream<Item = Result<CursorUpdate, Status>> + Send>>;
#[derive(Debug)]
pub struct WorkspaceService {
state: Arc<StateManager>,
}
#[tonic::async_trait]
impl Workspace for WorkspaceService {
type JoinStream = EventStream;
type SubscribeStream = CursorUpdateStream;
async fn join(
&self,
req: Request<JoinRequest>,
) -> Result<tonic::Response<Self::JoinStream>, Status> {
let session_id = Uuid::parse_str(req.extensions().get::<WorkspaceExtension>().unwrap().id.as_str()).unwrap();
let r = req.into_inner();
let run = self.state.run.clone();
let user_name = r.name.clone();
match self.state.get(&session_id) {
Some(w) => {
let (tx, rx) = mpsc::channel::<Result<WorkspaceEvent, Status>>(128);
tokio::spawn(async move {
let mut event_receiver = w.bus.subscribe();
w.view().users.add(
User {
name: r.name.clone(),
cursor: UserCursor { buffer:0, x:0, y:0 }
}
).await;
info!("User {} joined workspace {}", r.name, w.id);
while run.borrow().to_owned() {
let res = event_receiver.recv().await.unwrap();
let broadcasting = WorkspaceEvent { id: 1, body: Some(res.to_string()) }; // TODO actually process packet
tx.send(Ok(broadcasting)).await.unwrap();
}
w.view().users.remove(user_name).await;
});
return Ok(Response::new(Box::pin(ReceiverStream::new(rx))));
},
None => Err(Status::not_found(format!(
"No active workspace with session_key '{}'",
session_id
)))
}
}
async fn subscribe(
&self,
req: tonic::Request<Streaming<CursorUpdate>>,
) -> Result<Response<Self::SubscribeStream>, Status> {
let s_id = Uuid::parse_str(req.extensions().get::<WorkspaceExtension>().unwrap().id.as_str()).unwrap();
let mut r = req.into_inner();
match self.state.get(&s_id) {
Some(w) => {
let cursors_ref = w.cursors.clone();
let (_stop_tx, stop_rx) = watch::channel(true);
let (tx, rx) = mpsc::channel::<Result<CursorUpdate, Status>>(128);
tokio::spawn(async move {
let mut workspace_bus = cursors_ref.subscribe();
while stop_rx.borrow().to_owned() {
tokio::select!{
remote = workspace_bus.recv() => {
if let Ok(cur) = remote {
info!("Sending cursor update : {:?}", cur);
tx.send(Ok(cur)).await.unwrap();
}
},
local = r.next() => {
match local {
Some(request) => {
info!("Received cursor update : {:?}", request);
match request {
Ok(cur) => {
cursors_ref.send(cur).unwrap();
},
Err(_e) => {},
}
},
None => {},
}
},
}
}
});
return Ok(Response::new(Box::pin(ReceiverStream::new(rx))));
},
None => Err(Status::not_found(format!(
"No active workspace with session_key '{}'",
s_id
)))
}
}
async fn buffers(
&self,
req: Request<WorkspaceRequest>,
) -> Result<Response<BufferList>, Status> {
let r = req.into_inner();
match self.state.get(&Uuid::parse_str(r.session_key.as_str()).unwrap()) {
Some(w) => {
let mut out = Vec::new();
for (_k, v) in w.buffers.borrow().iter() {
out.push(v.name.clone());
}
Ok(Response::new(BufferList { path: out }))
}
None => Err(Status::not_found(format!(
"No active workspace with session_key '{}'",
r.session_key
))),
}
}
async fn new_buffer(
&self,
req: Request<BufferRequest>,
) -> Result<Response<WorkspaceResponse>, Status> {
let session_id = req.extensions().get::<WorkspaceExtension>().unwrap().id.clone();
let r = req.into_inner();
if let Some(w) = self.state.get(&Uuid::parse_str(session_id.as_str()).unwrap()) {
let mut view = w.view();
let buf = Buffer::new(r.path, w.bus.clone());
view.buffers.add(buf).await;
Ok(Response::new(WorkspaceResponse { accepted: true }))
} else {
return Err(Status::not_found(format!(
"No active workspace with session_key '{}'",
r.session_key
)));
}
}
async fn remove_buffer(
&self,
req: Request<BufferRequest>,
) -> Result<Response<WorkspaceResponse>, Status> {
let session_id = req.extensions().get::<WorkspaceExtension>().unwrap().id.clone();
let r = req.into_inner();
match self.state.get(&Uuid::parse_str(session_id.as_str()).unwrap()) {
Some(w) => {
w.view().buffers.remove(r.path).await;
Ok(Response::new(WorkspaceResponse { accepted: true }))
}
None => Err(Status::not_found(format!(
"No active workspace with session_key '{}'",
r.session_key
))),
}
}
async fn list_users(
&self,
req: Request<WorkspaceRequest>,
) -> Result<Response<UsersList>, Status> {
let session_id = req.extensions().get::<WorkspaceExtension>().unwrap().id.clone();
let r = req.into_inner();
match self.state.get(&Uuid::parse_str(session_id.as_str()).unwrap()) {
Some(w) => {
let mut out = Vec::new();
for (_k, v) in w.users.borrow().iter() {
out.push(v.name.clone());
}
Ok(Response::new(UsersList { name: out }))
},
None => Err(Status::not_found(format!(
"No active workspace with session_key '{}'",
r.session_key
))),
}
}
}
impl WorkspaceService {
pub fn new(state: Arc<StateManager>) -> WorkspaceService {
WorkspaceService { state }
}
pub fn server(self) -> InterceptedService<WorkspaceServer<WorkspaceService>, WorkspaceInterceptor> {
let state = self.state.clone();
WorkspaceServer::with_interceptor(self, WorkspaceInterceptor { state })
}
}