feat: reworked workspace and workspaceview

now there are handy methods that hide the underlying message passing
This commit is contained in:
əlemi 2022-08-28 23:40:15 +02:00
parent 692c3f4977
commit 6fd0ab4944

View file

@ -1,114 +1,202 @@
use std::collections::HashMap; use std::collections::HashMap;
use operational_transform::OperationSeq;
use tokio::sync::{broadcast, mpsc, watch::{self, Ref}}; use tokio::sync::{broadcast, mpsc, watch::{self, Ref}};
use tracing::warn; use tracing::warn;
use super::{buffer::{BufferView, Buffer}, state::User}; use crate::events::Event;
type Event = (String, OperationSeq); // TODO jank! use super::{buffer::{BufferView, Buffer}, state::{User, UserCursor}};
pub enum UserAction { #[derive(Debug, Clone)]
ADD {}, pub struct UsersView {
REMOVE {}, watch: watch::Receiver<HashMap<String, User>>,
op: mpsc::Sender<UserAction>,
}
impl UsersView { // TODO don't unwrap everything!
pub fn borrow(&self) -> Ref<HashMap<String, User>> {
return self.watch.borrow();
}
pub async fn add(&mut self, user: User) {
self.op.send(UserAction::ADD{ user }).await.unwrap();
}
pub async fn remove(&mut self, name: String) {
self.op.send(UserAction::REMOVE{ name }).await.unwrap();
}
pub async fn update(&mut self, user_name: String, cursor: UserCursor) {
self.op.send(UserAction::CURSOR { name: user_name, cursor }).await.unwrap();
}
}
#[derive(Debug, Clone)]
pub struct BuffersTreeView {
watch: watch::Receiver<HashMap<String, BufferView>>,
op: mpsc::Sender<BufferAction>,
}
impl BuffersTreeView {
pub fn borrow(&self) -> Ref<HashMap<String, BufferView>> {
return self.watch.borrow();
}
pub async fn add(&mut self, buffer: Buffer) {
self.op.send(BufferAction::ADD { buffer }).await.unwrap();
}
pub async fn remove(&mut self, path: String) {
self.op.send(BufferAction::REMOVE { path }).await.unwrap();
}
} }
pub struct WorkspaceView { pub struct WorkspaceView {
pub rx: broadcast::Receiver<Event>, rx: broadcast::Receiver<Event>,
pub tx: mpsc::Sender<BufferAction>, pub users: UsersView,
pub users: watch::Receiver<HashMap<String, User>>, pub buffers: BuffersTreeView,
pub buffers: watch::Receiver<HashMap<String, BufferView>>, }
impl WorkspaceView {
pub async fn event(&mut self) -> Result<Event, broadcast::error::RecvError> {
self.rx.recv().await
}
} }
// Must be clonable, containing references to the actual state maybe? Or maybe give everyone an Arc, idk // Must be clonable, containing references to the actual state maybe? Or maybe give everyone an Arc, idk
#[derive(Debug)] #[derive(Debug)]
pub struct Workspace { pub struct Workspace {
pub name: String, pub name: String,
buffers: watch::Receiver<HashMap<String, BufferView>>,
users: watch::Receiver<HashMap<String, User>>,
pub bus: broadcast::Sender<Event>, pub bus: broadcast::Sender<Event>,
buf_tx: mpsc::Sender<BufferAction>, pub buffers: BuffersTreeView,
pub usr_tx: mpsc::Sender<UserAction>, pub users: UsersView,
run: watch::Sender<bool>, run_tx: watch::Sender<bool>,
run_rx: watch::Receiver<bool>,
} }
impl Drop for Workspace { impl Drop for Workspace {
fn drop(&mut self) { fn drop(&mut self) {
self.run.send(false).unwrap_or_else(|e| warn!("could not stop workspace worker: {:?}", e)); self.run_tx.send(false).unwrap_or_else(|e| warn!("could not stop workspace worker: {:?}", e));
} }
} }
impl Workspace { impl Workspace {
pub fn new(name: String) -> Self { pub fn new(name: String) -> Self {
let (buf_tx, mut buf_rx) = mpsc::channel(32); let (op_buf_tx, mut op_buf_rx) = mpsc::channel::<BufferAction>(32);
let (usr_tx, mut _usr_rx) = mpsc::channel(32); let (op_usr_tx, mut op_usr_rx) = mpsc::channel::<UserAction>(32);
let (stop_tx, stop_rx) = watch::channel(true); let (run_tx, run_rx) = watch::channel::<bool>(true);
let (buffer_tx, buffer_rx) = watch::channel(HashMap::new()); let (buffer_tx, buffer_rx) = watch::channel::<HashMap<String, BufferView>>(HashMap::new());
let (broadcast_tx, _broadcast_rx) = broadcast::channel(32); let (users_tx, users_rx) = watch::channel(HashMap::new());
let (_users_tx, users_rx) = watch::channel(HashMap::new()); let (broadcast_tx, _broadcast_rx) = broadcast::channel::<Event>(32);
let w = Workspace { let w = Workspace {
name, name,
run: stop_tx,
buf_tx,
usr_tx,
buffers: buffer_rx,
bus: broadcast_tx, bus: broadcast_tx,
users: users_rx, buffers: BuffersTreeView{ op: op_buf_tx, watch: buffer_rx },
users: UsersView{ op: op_usr_tx, watch: users_rx },
run_tx,
run_rx,
}; };
w.users_worker(op_usr_rx, users_tx); // spawn worker to handle users
w.buffers_worker(op_buf_rx, buffer_tx); // spawn worker to handle buffers
return w;
}
fn buffers_worker(&self, mut rx: mpsc::Receiver<BufferAction>, tx: watch::Sender<HashMap<String, BufferView>>) {
let bus = self.bus.clone();
let run = self.run_rx.clone();
tokio::spawn(async move { tokio::spawn(async move {
let mut buffers = HashMap::new(); let mut buffers : HashMap<String, Buffer> = HashMap::new();
while stop_rx.borrow().to_owned() {
while run.borrow().to_owned() {
// TODO handle these errors!! // TODO handle these errors!!
let action = buf_rx.recv().await.unwrap(); let action = rx.recv().await.unwrap();
match action { match action {
BufferAction::ADD { buffer } => { BufferAction::ADD { buffer } => {
buffers.insert(buffer.view().name.clone(), buffer); buffers.insert(buffer.view().name.clone(), buffer);
} }
BufferAction::REMOVE { name } => { BufferAction::REMOVE { path } => {
buffers.remove(&name); buffers.remove(&path);
} }
} }
buffer_tx.send( tx.send(
buffers.iter() buffers.iter()
.map(|(k, v)| (k.clone(), v.view())) .map(|(k, v)| (k.clone(), v.view()))
.collect() .collect()
).unwrap(); ).unwrap();
} }
}); });
return w;
} }
pub fn buffers_ref(&self) -> Ref<HashMap<String, BufferView>> { fn users_worker(&self, mut rx: mpsc::Receiver<UserAction>, tx: watch::Sender<HashMap<String, User>>) {
self.buffers.borrow() let bus = self.bus.clone();
let run = self.run_rx.clone();
tokio::spawn(async move {
let mut users : HashMap<String, User> = HashMap::new();
while run.borrow().to_owned() {
match rx.recv().await.unwrap() {
UserAction::ADD { user } => {
users.insert(user.name.clone(), user);
},
UserAction::REMOVE { name } => {
if let None = users.remove(&name) {
continue; // don't update channel since this was a no-op
}
},
UserAction::CURSOR { name, cursor } => {
if let Some(user) = users.get_mut(&name) {
user.cursor = cursor.clone();
bus.send(Event::Cursor{user: name, cursor}).unwrap();
} else {
continue; // don't update channel since this was a no-op
}
},
} }
pub fn users_ref(&self) -> Ref<HashMap<String, User>> { tx.send(
self.users.borrow() users.iter()
.map(|(k, u)| (k.clone(), u.clone()))
.collect()
).unwrap();
}
});
} }
pub fn view(&self) -> WorkspaceView { pub fn view(&self) -> WorkspaceView {
WorkspaceView { WorkspaceView {
rx: self.bus.subscribe(), rx: self.bus.subscribe(),
tx: self.buf_tx.clone(),
users: self.users.clone(), users: self.users.clone(),
buffers: self.buffers.clone(), buffers: self.buffers.clone(),
} }
} }
} }
#[derive(Debug)]
pub enum UserAction {
ADD {
user: User,
},
REMOVE {
name: String,
},
CURSOR {
name: String,
cursor: UserCursor,
},
}
#[derive(Debug)]
pub enum BufferAction { pub enum BufferAction {
ADD { ADD {
buffer: Buffer, buffer: Buffer,
}, },
REMOVE { REMOVE {
name: String, // TODO remove by id? path: String, // TODO remove by id?
}, },
} }