feat: did some plumbing for events and cursors channels

This commit is contained in:
əlemi 2022-09-24 01:14:12 +02:00
parent 495b8279fc
commit e9500afd55
10 changed files with 219 additions and 90 deletions

View file

@ -26,6 +26,7 @@ tokio-stream = "0.1"
rmpv = "1"
operational-transform = "0.6"
nvim-rs = { version = "0.4", features = ["use_tokio"] } # TODO put this behind a conditional feature
uuid = { version = "1", features = ["v4", "fast-rng", "macro-diagnostics"] }
[build-dependencies]
tonic-build = "0.7"

View file

@ -2,8 +2,9 @@ syntax = "proto3";
package session;
service Session {
rpc Authenticate(SessionRequest) returns (SessionResponse);
rpc ListWorkspaces(SessionRequest) returns (WorkspaceList);
// rpc Authenticate(SessionRequest) returns (SessionResponse);
// rpc ListWorkspaces(SessionRequest) returns (WorkspaceList);
rpc CreateWorkspace(WorkspaceBuilderRequest) returns (SessionResponse);
}
message SessionRequest {
@ -15,6 +16,10 @@ message SessionResponse {
bool accepted = 2;
}
message WorkspaceBuilderRequest {
string name = 1;
}
message WorkspaceList {
repeated string name = 1; // TODO add more fields
}

View file

@ -2,19 +2,30 @@ syntax = "proto3";
package workspace;
service Workspace {
rpc Create (WorkspaceRequest) returns (WorkspaceResponse);
rpc Subscribe (WorkspaceRequest) returns (stream Event);
rpc Buffers (WorkspaceRequest) returns (BufferList);
rpc Join (JoinRequest) returns (stream WorkspaceEvent);
rpc Subscribe (stream CursorUpdate) returns (stream CursorUpdate);
rpc ListUsers (WorkspaceRequest) returns (UsersList);
rpc Buffers (WorkspaceRequest) returns (BufferList);
rpc NewBuffer (BufferRequest) returns (WorkspaceResponse);
rpc RemoveBuffer (BufferRequest) returns (WorkspaceResponse);
}
message Event {
message JoinRequest {
string name = 1;
}
message WorkspaceEvent {
int32 id = 1;
optional string body = 2;
}
message CursorUpdate {
string username = 1;
int32 buffer = 2;
int32 col = 3;
int32 row = 4;
}
message WorkspaceRequest {
string sessionKey = 1;
}

View file

@ -1,5 +1,5 @@
use std::{collections::HashMap, sync::Arc};
use std::{collections::HashMap, sync::Arc, fmt::Display};
use tokio::sync::{mpsc, watch};
use tracing::error;
@ -7,17 +7,31 @@ use crate::actor::workspace::Workspace;
#[derive(Debug, Clone)]
pub struct UserCursor{
pub buffer: i64,
pub buffer: i32,
pub x: i32,
pub y: i32
}
impl Display for UserCursor {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Cursor(buffer:{}, x:{}, y:{})", self.buffer, self.x, self.y)
}
}
#[derive(Debug, Clone)]
pub struct User {
pub name: String,
pub cursor: UserCursor,
}
impl Display for User {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "User(name:{}, cursor:{})", self.name, self.cursor)
}
}
#[derive(Debug)]
enum WorkspaceAction {
ADD {
@ -41,7 +55,7 @@ impl WorkspacesView {
}
pub async fn add(&mut self, w: Workspace) {
self.op.send(WorkspaceAction::ADD { key: w.name.clone(), w: Box::new(w) }).await.unwrap();
self.op.send(WorkspaceAction::ADD { key: w.id.to_string(), w: Box::new(w) }).await.unwrap();
}
pub async fn remove(&mut self, key: String) {
@ -52,8 +66,8 @@ impl WorkspacesView {
#[derive(Debug)]
pub struct StateManager {
pub workspaces: WorkspacesView,
pub run: watch::Receiver<bool>,
run_tx: watch::Sender<bool>,
run_rx: watch::Receiver<bool>,
}
impl Drop for StateManager {
@ -72,7 +86,7 @@ impl StateManager {
let s = StateManager {
workspaces: WorkspacesView { watch: workspaces_rx, op: tx },
run_tx, run_rx,
run_tx, run: run_rx,
};
s.workspaces_worker(rx, workspaces_tx);
@ -81,7 +95,7 @@ impl StateManager {
}
fn workspaces_worker(&self, mut rx: mpsc::Receiver<WorkspaceAction>, tx: watch::Sender<HashMap<String, Arc<Workspace>>>) {
let run = self.run_rx.clone();
let run = self.run.clone();
tokio::spawn(async move {
let mut store = HashMap::new();

View file

@ -3,7 +3,7 @@ use std::collections::HashMap;
use tokio::sync::{broadcast, mpsc, watch::{self, Ref}};
use tracing::warn;
use crate::events::Event;
use crate::{events::Event, service::workspace::proto::CursorUpdate};
use super::{buffer::{BufferView, Buffer}, state::{User, UserCursor}};
@ -66,8 +66,10 @@ impl WorkspaceView {
// Must be clonable, containing references to the actual state maybe? Or maybe give everyone an Arc, idk
#[derive(Debug)]
pub struct Workspace {
pub id: uuid::Uuid,
pub name: String,
pub bus: broadcast::Sender<Event>,
pub cursors: broadcast::Sender<CursorUpdate>,
pub buffers: BuffersTreeView,
pub users: UsersView,
@ -90,10 +92,13 @@ impl Workspace {
let (buffer_tx, buffer_rx) = watch::channel::<HashMap<String, BufferView>>(HashMap::new());
let (users_tx, users_rx) = watch::channel(HashMap::new());
let (broadcast_tx, _broadcast_rx) = broadcast::channel::<Event>(32);
let (cursors_tx, _cursors_rx) = broadcast::channel::<CursorUpdate>(32);
let w = Workspace {
id: uuid::Uuid::new_v4(),
name,
bus: broadcast_tx,
cursors: cursors_tx,
buffers: BuffersTreeView{ op: op_buf_tx, watch: buffer_rx },
users: UsersView{ op: op_usr_tx, watch: users_rx },
run_tx,
@ -137,28 +142,42 @@ impl Workspace {
fn users_worker(&self, mut rx: mpsc::Receiver<UserAction>, tx: watch::Sender<HashMap<String, User>>) {
let bus = self.bus.clone();
let cursors_tx = self.cursors.clone();
let run = self.run_rx.clone();
tokio::spawn(async move {
let mut cursors_rx = cursors_tx.subscribe();
let mut users : HashMap<String, User> = HashMap::new();
while run.borrow().to_owned() {
match rx.recv().await.unwrap() {
tokio::select!{
action = rx.recv() => {
match action.unwrap() {
UserAction::ADD { user } => {
users.insert(user.name.clone(), user);
users.insert(user.name.clone(), user.clone());
bus.send(Event::UserJoin { user }).unwrap();
},
UserAction::REMOVE { name } => {
if let None = users.remove(&name) {
continue; // don't update channel since this was a no-op
} else {
bus.send(Event::UserLeave { name }).unwrap();
}
},
UserAction::CURSOR { name, cursor } => {
if let Some(user) = users.get_mut(&name) {
user.cursor = cursor.clone();
bus.send(Event::Cursor{user: name, cursor}).unwrap();
} else {
continue; // don't update channel since this was a no-op
}
},
};
},
cursor = cursors_rx.recv() => {
let cursor = cursor.unwrap();
if let Some(user) = users.get_mut(&cursor.username) {
user.cursor = UserCursor { buffer: cursor.buffer, x:cursor.col, y:cursor.row };
}
}
}
tx.send(

View file

@ -1,14 +1,25 @@
use crate::actor::state::{User, UserCursor};
use std::fmt::Display;
use crate::actor::state::User;
#[derive(Debug, Clone)]
pub enum Event {
UserJoin { user: User },
UserLeave { name: String },
Cursor { user: String, cursor: UserCursor },
BufferNew { path: String },
BufferDelete { path: String },
}
impl Display for Event {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::UserJoin { user } => write!(f, "UserJoin(user:{})", user),
Self::UserLeave { name } => write!(f, "UserLeave(user:{})", name),
Self::BufferNew { path } => write!(f, "BufferNew(path:{})", path),
Self::BufferDelete { path } => write!(f, "BufferDelete(path:{})", path),
}
}
}
// pub type Event = Box<dyn EventInterface>;
//
// pub trait EventInterface {

View file

@ -16,7 +16,7 @@ use tonic::transport::Server;
use crate::{
actor::state::StateManager,
service::{buffer::BufferService, workspace::WorkspaceService},
service::{buffer::BufferService, workspace::WorkspaceService, session::SessionService},
};
#[tokio::main]
@ -30,6 +30,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
info!("Starting server");
Server::builder()
.add_service(SessionService::new(state.clone()).server())
.add_service(WorkspaceService::new(state.clone()).server())
.add_service(BufferService::new(state.clone()).server())
.serve(addr)

View file

@ -8,8 +8,6 @@ use operational_transform::OperationSeq;
use tonic::{Request, Response, Status};
pub mod proto {
tonic::include_proto!("session");
tonic::include_proto!("workspace");
tonic::include_proto!("buffer");
}

View file

@ -4,23 +4,59 @@ pub mod proto {
use std::sync::Arc;
use tracing::debug;
use proto::{session_server::{Session}, WorkspaceBuilderRequest, SessionRequest, SessionResponse, WorkspaceList};
use tonic::{Request, Response, Status};
use proto::session_server::Session;
use proto::{SessionRequest, SessionResponse};
use crate::actor::{
state::StateManager,
workspace::Workspace as WorkspaceInstance, // TODO fuck x2!
state::StateManager, workspace::Workspace, // TODO fuck x2!
};
use self::proto::session_server::SessionServer;
use super::workspace::WorkspaceExtension;
#[derive(Debug)]
pub struct SessionService {
state: Arc<StateManager>,
}
// #[tonic::async_trait]
// impl Session for SessionService {
#[tonic::async_trait]
impl Session for SessionService {
async fn create_workspace(
&self,
req: Request<WorkspaceBuilderRequest>,
) -> Result<Response<SessionResponse>, Status> {
let name = req.extensions().get::<WorkspaceExtension>().unwrap().id.clone();
let w = Workspace::new(name);
let res = SessionResponse { accepted:true, session_key: w.id.to_string() };
self.state.view().add(w).await;
Ok(Response::new(res))
}
// async fn authenticate(
// &self,
// req: Request<SessionRequest>,
// ) -> Result<Response<SessionResponse>, Status> {
// todo!()
// }
// async fn list_workspaces(
// &self,
// req: Request<SessionRequest>,
// ) -> Result<Response<WorkspaceList>, Status> {
// todo!()
// }
}
impl SessionService {
pub fn new(state: Arc<StateManager>) -> SessionService {
SessionService { state }
}
pub fn server(self) -> SessionServer<SessionService> {
SessionServer::new(self)
}
}

View file

@ -5,22 +5,23 @@ use tonic::service::Interceptor;
use tracing::debug;
use tokio_stream::wrappers::ReceiverStream;
use tonic::{Request, Response, Status};
use tonic::{Request, Response, Status, Streaming};
use tokio::sync::{watch, mpsc};
pub mod proto {
tonic::include_proto!("workspace");
}
use tokio_stream::Stream; // TODO example used this?
use tokio_stream::{Stream, StreamExt}; // TODO example used this?
use proto::workspace_server::{Workspace, WorkspaceServer};
use proto::{BufferList, Event, WorkspaceRequest, WorkspaceResponse, UsersList, BufferRequest};
use proto::{BufferList, WorkspaceEvent, WorkspaceRequest, WorkspaceResponse, UsersList, BufferRequest, CursorUpdate, JoinRequest};
use crate::actor::state::UserCursor;
use crate::actor::{buffer::Buffer, state::StateManager, workspace::Workspace as WorkspaceInstance}; // TODO fuck x2!
struct WorkspaceExtension {
id: String
pub struct WorkspaceExtension {
pub id: String
}
#[derive(Debug, Clone)]
@ -57,9 +58,8 @@ impl Interceptor for WorkspaceInterceptor {
}
type EventStream = Pin<Box<dyn Stream<Item = Result<Event, Status>> + Send>>;
type EventStream = Pin<Box<dyn Stream<Item = Result<WorkspaceEvent, Status>> + Send>>;
type CursorUpdateStream = Pin<Box<dyn Stream<Item = Result<CursorUpdate, Status>> + Send>>;
#[derive(Debug)]
pub struct WorkspaceService {
@ -68,52 +68,85 @@ pub struct WorkspaceService {
#[tonic::async_trait]
impl Workspace for WorkspaceService {
type SubscribeStream = EventStream;
type JoinStream = EventStream;
type SubscribeStream = CursorUpdateStream;
async fn create(
async fn join(
&self,
request: Request<WorkspaceRequest>,
) -> Result<Response<WorkspaceResponse>, Status> {
debug!("create request: {:?}", request);
// We should always have an extension because of the interceptor but maybe don't unwrap?
let ext = request.extensions().get::<WorkspaceExtension>().unwrap();
let r = request.into_inner();
let _w = WorkspaceInstance::new(ext.id);
let reply = WorkspaceResponse {
// session_key: r.session_key.clone(),
accepted: true,
};
// self.tx.send(AlterState::ADD{key: r.session_key.clone(), w}).await.unwrap();
Ok(Response::new(reply))
req: Request<JoinRequest>,
) -> Result<tonic::Response<Self::JoinStream>, Status> {
let session_id = req.extensions().get::<WorkspaceExtension>().unwrap().id.clone();
let r = req.into_inner();
let run = self.state.run.clone();
let user_name = r.name.clone();
match self.state.get(&session_id) {
Some(w) => {
let (tx, rx) = mpsc::channel::<Result<WorkspaceEvent, Status>>(128);
tokio::spawn(async move {
let mut event_receiver = w.bus.subscribe();
w.view().users.add(
crate::actor::state::User {
name: r.name.clone(),
cursor: UserCursor { buffer:0, x:0, y:0 }
}
);
while run.borrow().to_owned() {
let res = event_receiver.recv().await.unwrap();
let broadcasting = WorkspaceEvent { id: 1, body: Some(res.to_string()) }; // TODO actually process packet
tx.send(Ok(broadcasting)).await.unwrap();
}
w.view().users.remove(user_name);
});
return Ok(Response::new(Box::pin(ReceiverStream::new(rx))));
},
None => Err(Status::not_found(format!(
"No active workspace with session_key '{}'",
session_id
)))
}
}
async fn subscribe(
&self,
req: Request<WorkspaceRequest>,
) -> Result<tonic::Response<EventStream>, Status> {
let r = req.into_inner();
match self.state.get(&r.session_key) {
req: tonic::Request<Streaming<CursorUpdate>>,
) -> Result<Response<Self::SubscribeStream>, Status> {
let s_id = req.extensions().get::<WorkspaceExtension>().unwrap().id.clone();
let mut r = req.into_inner();
match self.state.get(&s_id) {
Some(w) => {
let bus_clone = w.bus.clone();
let cursors_ref = w.cursors.clone();
let (_stop_tx, stop_rx) = watch::channel(true);
let (tx, rx) = mpsc::channel::<Result<Event, Status>>(128);
let (tx, rx) = mpsc::channel::<Result<CursorUpdate, Status>>(128);
tokio::spawn(async move {
let mut event_receiver = bus_clone.subscribe();
let mut workspace_bus = cursors_ref.subscribe();
while stop_rx.borrow().to_owned() {
let _res = event_receiver.recv().await.unwrap();
let broadcasting = Event { id: 1, body: Some("".to_string()) }; // TODO actually process packet
tx.send(Ok(broadcasting)).await.unwrap();
tokio::select!{
remote = workspace_bus.recv() => {
if let Ok(cur) = remote {
tx.send(Ok(cur)).await.unwrap();
}
},
local = r.next() => {
match local {
Some(request) => {
match request {
Ok(cur) => {
cursors_ref.send(cur).unwrap();
},
Err(e) => {},
}
},
None => {},
}
},
}
}
});
return Ok(Response::new(Box::pin(ReceiverStream::new(rx))));
},
None => Err(Status::not_found(format!(
"No active workspace with session_key '{}'",
r.session_key
s_id
)))
}
}
@ -142,8 +175,9 @@ impl Workspace for WorkspaceService {
&self,
req: Request<BufferRequest>,
) -> Result<Response<WorkspaceResponse>, Status> {
let session_id = req.extensions().get::<WorkspaceExtension>().unwrap().id.clone();
let r = req.into_inner();
if let Some(w) = self.state.get(&r.session_key) {
if let Some(w) = self.state.get(&session_id) {
let mut view = w.view();
let buf = Buffer::new(r.path, w.bus.clone());
view.buffers.add(buf).await;
@ -161,13 +195,11 @@ impl Workspace for WorkspaceService {
&self,
req: Request<BufferRequest>,
) -> Result<Response<WorkspaceResponse>, Status> {
let session_id = req.extensions().get::<WorkspaceExtension>().unwrap().id.clone();
let r = req.into_inner();
match self.state.get(&r.session_key) {
match self.state.get(&session_id) {
Some(w) => {
let mut out = Vec::new();
for (_k, v) in w.buffers.borrow().iter() {
out.push(v.name.clone());
}
w.view().buffers.remove(r.path);
Ok(Response::new(WorkspaceResponse { accepted: true }))
}
None => Err(Status::not_found(format!(
@ -181,8 +213,9 @@ impl Workspace for WorkspaceService {
&self,
req: Request<WorkspaceRequest>,
) -> Result<Response<UsersList>, Status> {
let session_id = req.extensions().get::<WorkspaceExtension>().unwrap().id.clone();
let r = req.into_inner();
match self.state.get(&r.session_key) {
match self.state.get(&session_id) {
Some(w) => {
let mut out = Vec::new();
for (_k, v) in w.users.borrow().iter() {