feat: added some getters to state obj, implemented /buffers

This commit is contained in:
əlemi 2022-08-03 15:09:15 +02:00
parent 7a4e69377d
commit e030b9a48f
4 changed files with 88 additions and 20 deletions

View file

@ -5,13 +5,28 @@ use tracing::error;
use crate::actor::workspace::Workspace; use crate::actor::workspace::Workspace;
#[derive(Debug, Clone)]
pub struct UserCursor{
buffer: i64,
x: i32,
y: i32
}
#[derive(Debug, Clone)]
pub struct User {
name: String,
cursor: UserCursor,
}
#[derive(Debug)] #[derive(Debug)]
pub enum AlterState { pub enum AlterState {
ADD { ADD {
key: String, key: String,
w: Workspace w: Workspace
}, },
REMOVE { key: String }, REMOVE {
key: String
},
} }
#[derive(Debug)] #[derive(Debug)]
@ -32,28 +47,31 @@ impl Drop for StateManager {
impl StateManager { impl StateManager {
pub fn new() -> Self { pub fn new() -> Self {
let (tx, mut rx) = mpsc::channel(32); // TODO quantify backpressure let (tx, mut rx) = mpsc::channel(32); // TODO quantify backpressure
let (watch_tx, watch_rx) = watch::channel(HashMap::new()); let (workspaces_tx, workspaces_rx) = watch::channel(HashMap::new());
let (stop_tx, stop_rx) = watch::channel(true); let (stop_tx, stop_rx) = watch::channel(true);
let s = StateManager { let s = StateManager {
workspaces: watch_rx, workspaces: workspaces_rx,
op_tx: tx, op_tx: tx,
run: stop_tx, run: stop_tx,
}; };
tokio::spawn(async move { tokio::spawn(async move {
let mut store = HashMap::new(); let mut store = HashMap::new();
let mut users = HashMap::<String, User>::new();
while stop_rx.borrow().to_owned() { while stop_rx.borrow().to_owned() {
if let Some(event) = rx.recv().await { if let Some(event) = rx.recv().await {
match event { match event {
AlterState::ADD { key, w } => { AlterState::ADD { key, w } => {
store.insert(key, Arc::new(w)); // TODO put in hashmap store.insert(key, Arc::new(w)); // TODO put in hashmap
workspaces_tx.send(store.clone()).unwrap();
}, },
AlterState::REMOVE { key } => { AlterState::REMOVE { key } => {
store.remove(&key); store.remove(&key);
}, },
} }
watch_tx.send(store.clone()).unwrap(); workspaces_tx.send(store.clone()).unwrap();
} else { } else {
break break
} }

View file

@ -1,45 +1,64 @@
use std::collections::HashMap; use std::collections::HashMap;
use operational_transform::OperationSeq; use operational_transform::OperationSeq;
use tokio::sync::{broadcast, mpsc, watch}; use tokio::sync::{broadcast, mpsc, watch::{self, Ref}};
use super::buffer::{BufferView, Buffer}; use super::{buffer::{BufferView, Buffer}, state::User};
type Event = (String, OperationSeq); // TODO jank!
pub enum UserAction {
ADD {},
REMOVE {},
}
pub struct WorkspaceView { pub struct WorkspaceView {
pub rx: broadcast::Receiver<OperationSeq>, pub rx: broadcast::Receiver<Event>,
pub tx: mpsc::Sender<OperationSeq>, pub tx: mpsc::Sender<BufferAction>,
pub users: watch::Receiver<HashMap<String, User>>,
pub buffers: watch::Receiver<HashMap<String, BufferView>>,
} }
// Must be clonable, containing references to the actual state maybe? Or maybe give everyone an Arc, idk // Must be clonable, containing references to the actual state maybe? Or maybe give everyone an Arc, idk
#[derive(Debug)] #[derive(Debug)]
pub struct Workspace { pub struct Workspace {
pub name: String, pub name: String,
pub buffers: watch::Receiver<HashMap<String, BufferView>>,
pub bus: broadcast::Sender<(String, OperationSeq)>, buffers: watch::Receiver<HashMap<String, BufferView>>,
op_tx: mpsc::Sender<BufferAction>, users: watch::Receiver<HashMap<String, User>>,
pub bus: broadcast::Sender<Event>,
buf_tx: mpsc::Sender<BufferAction>,
usr_tx: mpsc::Sender<UserAction>,
run: watch::Sender<bool>, run: watch::Sender<bool>,
} }
impl Workspace { impl Workspace {
pub fn new(name: String) -> Self { pub fn new(name: String) -> Self {
let (op_tx, mut op_rx) = mpsc::channel(32); let (buf_tx, mut buf_rx) = mpsc::channel(32);
let (usr_tx, mut usr_rx) = mpsc::channel(32);
let (stop_tx, stop_rx) = watch::channel(true); let (stop_tx, stop_rx) = watch::channel(true);
let (buf_tx, buf_rx) = watch::channel(HashMap::new()); let (buffer_tx, buffer_rx) = watch::channel(HashMap::new());
let (broadcast_tx, broadcast_rx) = broadcast::channel(32); let (broadcast_tx, broadcast_rx) = broadcast::channel(32);
let (users_tx, users_rx) = watch::channel(HashMap::new());
let w = Workspace { let w = Workspace {
name, name,
run: stop_tx, run: stop_tx,
op_tx, buf_tx,
buffers: buf_rx, usr_tx,
buffers: buffer_rx,
bus: broadcast_tx, bus: broadcast_tx,
users: users_rx,
}; };
tokio::spawn(async move { tokio::spawn(async move {
let mut buffers = HashMap::new(); let mut buffers = HashMap::new();
while stop_rx.borrow().to_owned() { while stop_rx.borrow().to_owned() {
// TODO handle these errors!! // TODO handle these errors!!
let action = op_rx.recv().await.unwrap(); let action = buf_rx.recv().await.unwrap();
match action { match action {
BufferAction::ADD { buffer } => { BufferAction::ADD { buffer } => {
buffers.insert(buffer.view().name.clone(), buffer); buffers.insert(buffer.view().name.clone(), buffer);
@ -48,7 +67,7 @@ impl Workspace {
buffers.remove(&name); buffers.remove(&name);
} }
} }
buf_tx.send( buffer_tx.send(
buffers.iter() buffers.iter()
.map(|(k, v)| (k.clone(), v.view())) .map(|(k, v)| (k.clone(), v.view()))
.collect() .collect()
@ -58,6 +77,23 @@ impl Workspace {
return w; return w;
} }
pub fn buffers_ref(&self) -> Ref<HashMap<String, BufferView>> {
self.buffers.borrow()
}
pub fn users_ref(&self) -> Ref<HashMap<String, User>> {
self.users.borrow()
}
pub fn view(&self) -> WorkspaceView {
WorkspaceView {
rx: self.bus.subscribe(),
tx: self.buf_tx.clone(),
users: self.users.clone(),
buffers: self.buffers.clone(),
}
}
} }
pub enum BufferAction { pub enum BufferAction {

View file

@ -3,13 +3,13 @@ pub mod service;
use std::sync::Arc; use std::sync::Arc;
use tracing::{debug, error, info, warn}; use tracing::info;
use tonic::transport::Server; use tonic::transport::Server;
use crate::{ use crate::{
actor::state::StateManager, actor::state::StateManager,
service::{buffer::BufferService, session::SessionService, workspace::WorkspaceService}, service::{buffer::BufferService, workspace::WorkspaceService},
}; };
#[tokio::main] #[tokio::main]
@ -28,5 +28,6 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.serve(addr) .serve(addr)
.await?; .await?;
Ok(()) Ok(())
} }

View file

@ -37,7 +37,20 @@ impl Workspace for WorkspaceService {
&self, &self,
req: Request<WorkspaceRequest>, req: Request<WorkspaceRequest>,
) -> Result<Response<BufferList>, Status> { ) -> Result<Response<BufferList>, Status> {
todo!() let r = req.into_inner();
match self.state.workspaces.borrow().get(&r.session_key) {
Some(w) => {
let out = Vec::new();
for (_k, v) in w.buffers_ref().iter() {
out.push(v.name.clone());
}
Ok(Response::new(BufferList { path: out }))
}
None => Err(Status::not_found(format!(
"No active workspace with session_key '{}'",
r.session_key
))),
}
} }
} }