fix: add getter to state workspaces, imports fix

This commit is contained in:
əlemi 2022-08-03 18:51:56 +02:00
parent e030b9a48f
commit 18d86020c0
5 changed files with 48 additions and 38 deletions

View file

@ -1,21 +1,21 @@
use std::{collections::HashMap, sync::Arc}; use std::{collections::HashMap, sync::Arc};
use tokio::sync::{mpsc, watch}; use tokio::sync::{mpsc, watch::{self, Ref}};
use tracing::error; use tracing::error;
use crate::actor::workspace::Workspace; use crate::actor::workspace::Workspace;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct UserCursor{ pub struct UserCursor{
buffer: i64, // buffer: i64,
x: i32, // x: i32,
y: i32 // y: i32
} }
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct User { pub struct User {
name: String, // name: String,
cursor: UserCursor, // cursor: UserCursor,
} }
#[derive(Debug)] #[derive(Debug)]
@ -31,8 +31,8 @@ pub enum AlterState {
#[derive(Debug)] #[derive(Debug)]
pub struct StateManager { pub struct StateManager {
pub workspaces: watch::Receiver<HashMap<String, Arc<Workspace>>>, op_tx: mpsc::Sender<AlterState>, // TODO make method for this
pub op_tx: mpsc::Sender<AlterState>, // TODO make method for this workspaces: watch::Receiver<HashMap<String, Arc<Workspace>>>,
run: watch::Sender<bool>, run: watch::Sender<bool>,
} }
@ -58,7 +58,7 @@ impl StateManager {
tokio::spawn(async move { tokio::spawn(async move {
let mut store = HashMap::new(); let mut store = HashMap::new();
let mut users = HashMap::<String, User>::new(); let mut _users = HashMap::<String, User>::new();
while stop_rx.borrow().to_owned() { while stop_rx.borrow().to_owned() {
if let Some(event) = rx.recv().await { if let Some(event) = rx.recv().await {
@ -80,4 +80,13 @@ impl StateManager {
return s; return s;
} }
pub fn workspaces_ref(&self) -> Ref<HashMap<String, Arc<Workspace>>> {
self.workspaces.borrow()
}
// TODO wrap result of this func?
pub async fn op(&self, op: AlterState) -> Result<(), mpsc::error::SendError<AlterState>> {
self.op_tx.send(op).await
}
} }

View file

@ -2,6 +2,7 @@ use std::collections::HashMap;
use operational_transform::OperationSeq; use operational_transform::OperationSeq;
use tokio::sync::{broadcast, mpsc, watch::{self, Ref}}; use tokio::sync::{broadcast, mpsc, watch::{self, Ref}};
use tracing::warn;
use super::{buffer::{BufferView, Buffer}, state::User}; use super::{buffer::{BufferView, Buffer}, state::User};
@ -30,19 +31,25 @@ pub struct Workspace {
pub bus: broadcast::Sender<Event>, pub bus: broadcast::Sender<Event>,
buf_tx: mpsc::Sender<BufferAction>, buf_tx: mpsc::Sender<BufferAction>,
usr_tx: mpsc::Sender<UserAction>, pub usr_tx: mpsc::Sender<UserAction>,
run: watch::Sender<bool>, run: watch::Sender<bool>,
} }
impl Drop for Workspace {
fn drop(&mut self) {
self.run.send(false).unwrap_or_else(|e| warn!("could not stop workspace worker: {:?}", e));
}
}
impl Workspace { impl Workspace {
pub fn new(name: String) -> Self { pub fn new(name: String) -> Self {
let (buf_tx, mut buf_rx) = mpsc::channel(32); let (buf_tx, mut buf_rx) = mpsc::channel(32);
let (usr_tx, mut usr_rx) = mpsc::channel(32); let (usr_tx, mut _usr_rx) = mpsc::channel(32);
let (stop_tx, stop_rx) = watch::channel(true); let (stop_tx, stop_rx) = watch::channel(true);
let (buffer_tx, buffer_rx) = watch::channel(HashMap::new()); let (buffer_tx, buffer_rx) = watch::channel(HashMap::new());
let (broadcast_tx, broadcast_rx) = broadcast::channel(32); let (broadcast_tx, _broadcast_rx) = broadcast::channel(32);
let (users_tx, users_rx) = watch::channel(HashMap::new()); let (_users_tx, users_rx) = watch::channel(HashMap::new());
let w = Workspace { let w = Workspace {
name, name,

View file

@ -2,10 +2,10 @@ use std::collections::VecDeque;
use std::{pin::Pin, sync::Arc}; use std::{pin::Pin, sync::Arc};
use tokio_stream::wrappers::ReceiverStream; use tokio_stream::wrappers::ReceiverStream;
use tracing::{debug, error, info, warn}; use tracing::error;
use operational_transform::OperationSeq; use operational_transform::OperationSeq;
use tonic::{transport::Server, Request, Response, Status}; use tonic::{Request, Response, Status};
pub mod proto { pub mod proto {
tonic::include_proto!("session"); tonic::include_proto!("session");
@ -17,14 +17,12 @@ use tokio::sync::{broadcast, mpsc};
use tokio_stream::{Stream, StreamExt}; // TODO example used this? use tokio_stream::{Stream, StreamExt}; // TODO example used this?
use proto::buffer_server::{Buffer, BufferServer}; use proto::buffer_server::{Buffer, BufferServer};
use proto::session_server::{Session, SessionServer}; use proto::Operation;
use proto::workspace_server::{Workspace, WorkspaceServer};
use proto::{BufferList, Event, Operation, SessionRequest, SessionResponse, WorkspaceRequest};
use tonic::Streaming; use tonic::Streaming;
//use futures::{Stream, StreamExt}; //use futures::{Stream, StreamExt};
use crate::actor::{buffer::BufferView, state::{AlterState, StateManager}, workspace::Workspace as WorkspaceInstance}; use crate::actor::{buffer::BufferView, state::StateManager};
use self::proto::{BufferPayload, BufferResponse}; // TODO fuck x2! use self::proto::{BufferPayload, BufferResponse}; // TODO fuck x2!
@ -34,10 +32,10 @@ pub struct BufferService {
state: Arc<StateManager>, state: Arc<StateManager>,
} }
fn op_seq(o: &Operation) -> OperationSeq { fn op_seq(_o: &Operation) -> OperationSeq {
todo!() todo!()
} }
fn op_net(o: &OperationSeq) -> Operation { fn op_net(_o: &OperationSeq) -> Operation {
todo!() todo!()
} }
@ -113,11 +111,11 @@ impl Buffer for BufferService {
} }
// TODO make these above nicer? more concise? idk // TODO make these above nicer? more concise? idk
if let Some(workspace) = self.state.workspaces.borrow().get(&session_id) { if let Some(workspace) = self.state.workspaces_ref().get(&session_id) {
let in_stream = req.into_inner(); let in_stream = req.into_inner();
let (tx_og, rx) = mpsc::channel::<Result<Operation, Status>>(128); let (tx_og, rx) = mpsc::channel::<Result<Operation, Status>>(128);
let b: BufferView = workspace.buffers.borrow().get(&path).unwrap().clone(); let b: BufferView = workspace.buffers_ref().get(&path).unwrap().clone();
let w = workspace.clone(); let w = workspace.clone();
tokio::spawn(async move { tokio::spawn(async move {
buffer_worker(b, in_stream, tx_og, w.bus.subscribe()).await; buffer_worker(b, in_stream, tx_og, w.bus.subscribe()).await;
@ -135,11 +133,11 @@ impl Buffer for BufferService {
} }
} }
async fn push(&self, req:Request<BufferPayload>) -> Result<Response<BufferResponse>, Status> { async fn push(&self, _req:Request<BufferPayload>) -> Result<Response<BufferResponse>, Status> {
todo!() todo!()
} }
async fn pull(&self, req:Request<BufferPayload>) -> Result<Response<BufferPayload>, Status> { async fn pull(&self, _req:Request<BufferPayload>) -> Result<Response<BufferPayload>, Status> {
todo!() todo!()
} }

View file

@ -4,17 +4,14 @@ pub mod proto {
use std::sync::Arc; use std::sync::Arc;
use tracing::{debug, error, info, warn}; use tracing::debug;
use tokio_stream::{Stream, StreamExt}; // TODO example used this? use tonic::{Request, Response, Status};
use tonic::{transport::Server, Request, Response, Status}; use proto::session_server::Session;
use proto::session_server::{Session, SessionServer};
use proto::{SessionRequest, SessionResponse}; use proto::{SessionRequest, SessionResponse};
use crate::actor::{ use crate::actor::{
buffer::BufferView,
state::{AlterState, StateManager}, state::{AlterState, StateManager},
workspace::Workspace as WorkspaceInstance, // TODO fuck x2! workspace::Workspace as WorkspaceInstance, // TODO fuck x2!
}; };
@ -33,7 +30,7 @@ impl Session for SessionService {
debug!("create request: {:?}", request); debug!("create request: {:?}", request);
let r = request.into_inner(); let r = request.into_inner();
let w = WorkspaceInstance::new(r.session_key.clone()); let _w = WorkspaceInstance::new(r.session_key.clone());
let reply = proto::SessionResponse { let reply = proto::SessionResponse {
session_key: r.session_key.clone(), session_key: r.session_key.clone(),
@ -67,10 +64,9 @@ impl Session for SessionService {
let r = request.into_inner(); let r = request.into_inner();
let mut removed = false; let mut removed = false;
if self.state.workspaces.borrow().get(&r.session_key).is_some() { if self.state.workspaces_ref().get(&r.session_key).is_some() {
self.state self.state
.op_tx .op(AlterState::REMOVE {
.send(AlterState::REMOVE {
key: r.session_key.clone(), key: r.session_key.clone(),
}) })
.await .await

View file

@ -28,7 +28,7 @@ impl Workspace for WorkspaceService {
async fn subscribe( async fn subscribe(
&self, &self,
req: Request<WorkspaceRequest>, _req: Request<WorkspaceRequest>,
) -> Result<tonic::Response<EventStream>, Status> { ) -> Result<tonic::Response<EventStream>, Status> {
todo!() todo!()
} }
@ -38,9 +38,9 @@ impl Workspace for WorkspaceService {
req: Request<WorkspaceRequest>, req: Request<WorkspaceRequest>,
) -> Result<Response<BufferList>, Status> { ) -> Result<Response<BufferList>, Status> {
let r = req.into_inner(); let r = req.into_inner();
match self.state.workspaces.borrow().get(&r.session_key) { match self.state.workspaces_ref().get(&r.session_key) {
Some(w) => { Some(w) => {
let out = Vec::new(); let mut out = Vec::new();
for (_k, v) in w.buffers_ref().iter() { for (_k, v) in w.buffers_ref().iter() {
out.push(v.name.clone()); out.push(v.name.clone());
} }