chore: moved responsabilites around again

This commit is contained in:
əlemi 2022-08-28 23:41:11 +02:00
parent 6fd0ab4944
commit 824cb62933
4 changed files with 122 additions and 72 deletions

View file

@ -2,9 +2,8 @@ syntax = "proto3";
package session;
service Session {
rpc Create (SessionRequest) returns (SessionResponse);
rpc Join (SessionRequest) returns (SessionResponse);
rpc Leave (SessionRequest) returns (SessionResponse);
rpc Authenticate(SessionRequest) returns (SessionResponse);
rpc ListWorkspaces(SessionRequest) returns (WorkspaceList);
}
message SessionRequest {
@ -15,3 +14,7 @@ message SessionResponse {
string sessionKey = 1;
bool accepted = 2;
}
message WorkspaceList {
repeated string name = 1; // TODO add more fields
}

View file

@ -2,8 +2,12 @@ syntax = "proto3";
package workspace;
service Workspace {
rpc Subscribe (WorkspaceRequest) returns (stream Event);
rpc Buffers (WorkspaceRequest) returns (BufferList);
rpc Create (WorkspaceRequest) returns (WorkspaceResponse);
rpc Subscribe (WorkspaceRequest) returns (stream Event);
rpc Buffers (WorkspaceRequest) returns (BufferList);
rpc ListUsers (WorkspaceRequest) returns (UsersList);
rpc NewBuffer (BufferRequest) returns (WorkspaceResponse);
rpc RemoveBuffer (BufferRequest) returns (WorkspaceResponse);
}
message Event {
@ -15,7 +19,20 @@ message WorkspaceRequest {
string sessionKey = 1;
}
message BufferRequest {
string sessionKey = 1;
string path = 2;
}
message WorkspaceResponse {
bool accepted = 1;
}
message BufferList {
repeated string path = 1;
}
message UsersList {
repeated string name = 1;
}

View file

@ -21,64 +21,6 @@ pub struct SessionService {
state: Arc<StateManager>,
}
#[tonic::async_trait]
impl Session for SessionService {
async fn create(
&self,
request: Request<SessionRequest>,
) -> Result<Response<SessionResponse>, Status> {
debug!("create request: {:?}", request);
let r = request.into_inner();
let _w = WorkspaceInstance::new(r.session_key.clone());
let reply = proto::SessionResponse {
session_key: r.session_key.clone(),
accepted: true,
};
// self.tx.send(AlterState::ADD{key: r.session_key.clone(), w}).await.unwrap();
Ok(Response::new(reply))
}
async fn join(
&self,
request: Request<SessionRequest>,
) -> Result<Response<SessionResponse>, Status> {
debug!("join request: {:?}", request);
let reply = proto::SessionResponse {
session_key: request.into_inner().session_key,
accepted: true,
};
Ok(Response::new(reply))
}
async fn leave(
&self,
request: Request<SessionRequest>,
) -> Result<Response<SessionResponse>, Status> {
debug!("leave request: {:?}", request);
let r = request.into_inner();
let mut removed = false;
if self.state.workspaces_ref().get(&r.session_key).is_some() {
self.state
.op(AlterState::REMOVE {
key: r.session_key.clone(),
})
.await
.unwrap();
removed = true; // TODO this is a lie! Verify it
}
let reply = proto::SessionResponse {
session_key: r.session_key,
accepted: removed,
};
Ok(Response::new(reply))
}
}
// #[tonic::async_trait]
// impl Session for SessionService {
// }

View file

@ -1,8 +1,10 @@
use std::{pin::Pin, sync::Arc};
// use tracing::{debug, error, info, warn};
use tracing::{debug, error, info, warn};
use tokio_stream::wrappers::ReceiverStream;
use tonic::{Request, Response, Status};
use tokio::sync::{watch, mpsc};
pub mod proto {
tonic::include_proto!("workspace");
@ -11,9 +13,9 @@ pub mod proto {
use tokio_stream::Stream; // TODO example used this?
use proto::workspace_server::{Workspace, WorkspaceServer};
use proto::{BufferList, Event, WorkspaceRequest};
use proto::{BufferList, Event, WorkspaceRequest, WorkspaceResponse, UsersList, BufferRequest};
use crate::actor::state::StateManager;
use crate::actor::{buffer::Buffer, state::StateManager, workspace::{BufferAction, UserAction, Workspace as WorkspaceInstance}}; // TODO fuck x2!
type EventStream = Pin<Box<dyn Stream<Item = Result<Event, Status>> + Send>>;
@ -26,11 +28,50 @@ pub struct WorkspaceService {
impl Workspace for WorkspaceService {
type SubscribeStream = EventStream;
async fn create(
&self,
request: Request<WorkspaceRequest>,
) -> Result<Response<WorkspaceResponse>, Status> {
debug!("create request: {:?}", request);
let r = request.into_inner();
let _w = WorkspaceInstance::new(r.session_key.clone());
let reply = WorkspaceResponse {
// session_key: r.session_key.clone(),
accepted: true,
};
// self.tx.send(AlterState::ADD{key: r.session_key.clone(), w}).await.unwrap();
Ok(Response::new(reply))
}
async fn subscribe(
&self,
_req: Request<WorkspaceRequest>,
req: Request<WorkspaceRequest>,
) -> Result<tonic::Response<EventStream>, Status> {
todo!()
let r = req.into_inner();
match self.state.workspaces_ref().get(&r.session_key) {
Some(w) => {
let bus_clone = w.bus.clone();
let (_stop_tx, stop_rx) = watch::channel(true);
let (tx, rx) = mpsc::channel::<Result<Event, Status>>(128);
tokio::spawn(async move {
let mut event_receiver = bus_clone.subscribe();
while stop_rx.borrow().to_owned() {
let _res = event_receiver.recv().await.unwrap();
let broadcasting = Event { id: 1, body: Some("".to_string()) }; // TODO actually process packet
tx.send(Ok(broadcasting)).await.unwrap();
}
});
return Ok(Response::new(Box::pin(ReceiverStream::new(rx))));
},
None => Err(Status::not_found(format!(
"No active workspace with session_key '{}'",
r.session_key
)))
}
}
async fn buffers(
@ -41,7 +82,7 @@ impl Workspace for WorkspaceService {
match self.state.workspaces_ref().get(&r.session_key) {
Some(w) => {
let mut out = Vec::new();
for (_k, v) in w.buffers_ref().iter() {
for (_k, v) in w.buffers.borrow().iter() {
out.push(v.name.clone());
}
Ok(Response::new(BufferList { path: out }))
@ -52,6 +93,53 @@ impl Workspace for WorkspaceService {
))),
}
}
async fn new_buffer(
&self,
req: Request<BufferRequest>,
) -> Result<Response<WorkspaceResponse>, Status> {
let r = req.into_inner();
if let Some(w) = self.state.workspaces_ref().get(&r.session_key) {
let mut view = w.view();
let buf = Buffer::new(r.path, w.bus.clone());
view.buffers.add(buf);
Ok(Response::new(WorkspaceResponse { accepted: true }))
} else {
return Err(Status::not_found(format!(
"No active workspace with session_key '{}'",
r.session_key
)));
}
}
async fn remove_buffer(
&self,
req: Request<BufferRequest>,
) -> Result<Response<WorkspaceResponse>, Status> {
let r = req.into_inner();
match self.state.workspaces_ref().get(&r.session_key) {
Some(w) => {
let mut out = Vec::new();
for (_k, v) in w.buffers.borrow().iter() {
out.push(v.name.clone());
}
Ok(Response::new(WorkspaceResponse { accepted: true }))
}
None => Err(Status::not_found(format!(
"No active workspace with session_key '{}'",
r.session_key
))),
}
}
async fn list_users(
&self,
req: Request<WorkspaceRequest>,
) -> Result<Response<UsersList>, Status> {
todo!()
}
}
impl WorkspaceService {