diff --git a/src/api/config.rs b/src/api/config.rs index 26f90f1..8cfea00 100644 --- a/src/api/config.rs +++ b/src/api/config.rs @@ -8,7 +8,7 @@ /// `host`, `port` and `tls` affect all connections to all gRPC services; the /// resulting endpoint is composed like this: /// http{tls?'s':''}://{host}:{port} -#[derive(Clone, Debug, Default)] +#[derive(Clone, Default)] #[cfg_attr(feature = "js", napi_derive::napi(object))] #[cfg_attr(feature = "py", pyo3::pyclass(get_all, set_all))] #[cfg_attr(feature = "serialize", derive(serde::Serialize, serde::Deserialize))] @@ -16,7 +16,7 @@ pub struct Config { /// User identifier used to register, possibly your email. pub username: String, /// User password chosen upon registration. - pub password: String, + pub password: String, // must not leak this! /// Address of server to connect to, default api.code.mp. pub host: Option<String>, /// Port to connect to, default 50053. @@ -61,3 +61,25 @@ impl Config { ) } } + +// manual impl: we want to obfuscate the password field!! +// TODO: can we just tag password to be obfuscated in debug print? +// reimplementing the whole Debug thing is pretty lame +impl std::fmt::Debug for Config { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + if f.alternate() { + write!(f, +r#"""Config {{ + username: {}, + password: ********, + host: {:#?}, + port: {:#?}, + tls: {:#?} +}}"""#, + self.username, self.host, self.port, self.tls + ) + } else { + write!(f, "Config {{ username: {}, password: ********, host: {:?}, port: {:?}, tls: {:?} }}", self.username, self.host, self.port, self.tls) + } + } +} diff --git a/src/buffer/worker.rs b/src/buffer/worker.rs index fa3e8e3..e5ee53b 100644 --- a/src/buffer/worker.rs +++ b/src/buffer/worker.rs @@ -19,6 +19,7 @@ use super::controller::{BufferController, BufferControllerInner}; struct BufferWorker { agent_id: u32, path: String, + workspace_id: String, latest_version: watch::Sender<diamond_types::LocalVersion>, local_version: watch::Sender<diamond_types::LocalVersion>, ack_rx: mpsc::UnboundedReceiver<LocalVersion>, @@ -75,6 +76,7 @@ impl BufferController { let worker = BufferWorker { agent_id, path: path.to_string(), + workspace_id: workspace_id.to_string(), latest_version: latest_version_tx, local_version: my_version_tx, ack_rx, @@ -95,15 +97,16 @@ impl BufferController { BufferController(controller) } + #[tracing::instrument(skip(worker, tx, rx), fields(ws = worker.workspace_id, path = worker.path))] async fn work( mut worker: BufferWorker, tx: mpsc::Sender<Operation>, mut rx: Streaming<BufferEvent>, ) { - tracing::debug!("controller worker started"); + tracing::debug!("buffer worker started"); loop { if worker.controller.upgrade().is_none() { - break; + break tracing::debug!("buffer worker clean exit"); }; // block until one of these is ready @@ -114,6 +117,7 @@ impl BufferController { res = worker.ack_rx.recv() => match res { None => break tracing::error!("ack channel closed"), Some(v) => { + tracing::debug!("client acked change"); worker.branch.merge(&worker.oplog, &v); worker.local_version.send(worker.branch.local_version()) .unwrap_or_warn("could not ack local version"); @@ -160,11 +164,12 @@ impl BufferController { } } - tracing::debug!("controller worker stopped"); + tracing::debug!("buffer worker stopped"); } } impl BufferWorker { + #[tracing::instrument(skip(self, tx))] async fn handle_editor_change(&mut self, change: TextChange, tx: &mpsc::Sender<Operation>) { let last_ver = self.oplog.local_version(); // clip to buffer extents @@ -205,11 +210,16 @@ impl BufferWorker { } } + #[tracing::instrument(skip(self))] async fn handle_server_change(&mut self, change: BufferEvent) -> bool { match self.controller.upgrade() { - None => true, // clean exit actually, just weird we caught it here + None => { // clean exit actually, just weird we caught it here + tracing::debug!("clean exit while handling server change"); + true + }, Some(controller) => match self.oplog.decode_and_add(&change.op.data) { Ok(local_version) => { + tracing::debug!("updating local version: {local_version:?}"); self.latest_version .send(local_version) .unwrap_or_warn("failed to update latest version!"); @@ -229,6 +239,7 @@ impl BufferWorker { } } + #[tracing::instrument(skip(self, tx))] async fn handle_delta_request(&mut self, tx: oneshot::Sender<Option<BufferUpdate>>) { let last_ver = self.branch.local_version(); if let Some((lv, Some(dtop))) = self @@ -285,9 +296,11 @@ impl BufferWorker { }, }, }; + tracing::debug!("sending update {tc:?}"); tx.send(Some(tc)) .unwrap_or_warn("could not update ops channel -- is controller dead?"); } else { + tracing::debug!("no enqueued changes"); tx.send(None) .unwrap_or_warn("could not update ops channel -- is controller dead?"); } diff --git a/src/client.rs b/src/client.rs index 15b44c4..0df972c 100644 --- a/src/client.rs +++ b/src/client.rs @@ -47,6 +47,7 @@ struct ClientInner { impl Client { /// Connect to the server, authenticate and instantiate a new [`Client`]. + #[tracing::instrument] pub async fn connect(config: crate::api::Config) -> ConnectionResult<Self> { // TODO move these two into network.rs let channel = Endpoint::from_shared(config.endpoint())?.connect().await?; @@ -157,6 +158,7 @@ impl Client { } /// Join and return a [`Workspace`]. + #[tracing::instrument(skip(self, workspace), fields(ws = workspace.as_ref()))] pub async fn attach_workspace( &self, workspace: impl AsRef<str>, diff --git a/src/cursor/worker.rs b/src/cursor/worker.rs index 1c1fda2..2e6e62e 100644 --- a/src/cursor/worker.rs +++ b/src/cursor/worker.rs @@ -13,6 +13,7 @@ use codemp_proto::cursor::{CursorEvent, CursorPosition}; use super::controller::{CursorController, CursorControllerInner}; struct CursorWorker { + workspace_id: String, op: mpsc::UnboundedReceiver<CursorPosition>, map: Arc<dashmap::DashMap<Uuid, User>>, stream: mpsc::Receiver<oneshot::Sender<Option<Cursor>>>, @@ -24,6 +25,7 @@ struct CursorWorker { } impl CursorWorker { + #[tracing::instrument(skip(self, tx))] fn handle_recv(&mut self, tx: oneshot::Sender<Option<Cursor>>) { tx.send( self.store.pop_front().and_then(|event| { @@ -71,6 +73,7 @@ impl CursorController { let weak = Arc::downgrade(&controller); let worker = CursorWorker { + workspace_id: workspace_id.to_string(), op: op_rx, map: user_map, stream: stream_rx, @@ -86,16 +89,17 @@ impl CursorController { CursorController(controller) } + #[tracing::instrument(skip(worker, tx, rx), fields(ws = worker.workspace_id))] async fn work( mut worker: CursorWorker, tx: mpsc::Sender<CursorPosition>, mut rx: Streaming<CursorEvent>, ) { + tracing::debug!("starting cursor worker"); loop { - tracing::debug!("cursor worker polling"); if worker.controller.upgrade().is_none() { - break; - }; // clean exit: all controllers dropped + break tracing::debug!("cursor worker clean exit"); + }; tokio::select! { biased; @@ -110,7 +114,7 @@ impl CursorController { // server sents us a cursor Ok(Some(cur)) = rx.message() => match worker.controller.upgrade() { - None => break, // clean exit, just weird that we got it here + None => break tracing::debug!("cursor worker clean (late) exit"), // clean exit, just weird that we got it here Some(controller) => { tracing::debug!("received cursor from server"); worker.store.push_back(cur); @@ -127,8 +131,9 @@ impl CursorController { // client wants to get next cursor event Some(tx) = worker.stream.recv() => worker.handle_recv(tx), - else => break, + else => break tracing::debug!("cursor worker clean-ish exit"), } } + tracing::debug!("stopping cursor worker"); } } diff --git a/src/workspace.rs b/src/workspace.rs index 7de40dd..80d2af0 100644 --- a/src/workspace.rs +++ b/src/workspace.rs @@ -84,6 +84,7 @@ impl AsyncReceiver<Event> for Workspace { } impl Workspace { + #[tracing::instrument(skip(name, user, token, claims), fields(ws = name))] pub(crate) async fn connect( name: String, user: Arc<User>, @@ -165,6 +166,7 @@ impl Workspace { } /// Attach to a buffer and return a handle to it. + #[tracing::instrument(skip(self))] pub async fn attach_buffer(&self, path: &str) -> ConnectionResult<buffer::Controller> { let mut worskspace_client = self.0.services.ws(); let request = tonic::Request::new(BufferNode { @@ -326,7 +328,7 @@ impl Workspace { .0 .filetree .iter() - .filter(|f| filter.map_or(true, |flt| f.starts_with(flt))) + .filter(|f| filter.is_none_or(|flt| f.starts_with(flt))) .map(|f| f.clone()) .collect::<Vec<String>>(); tree.sort(); @@ -342,7 +344,8 @@ struct WorkspaceWorker { } impl WorkspaceWorker { - pub(crate) async fn work(mut self, name: String, mut stream: Streaming<WorkspaceEvent>, weak: Weak<WorkspaceInner>) { + #[tracing::instrument(skip(self, stream, weak))] + pub(crate) async fn work(mut self, ws: String, mut stream: Streaming<WorkspaceEvent>, weak: Weak<WorkspaceInner>) { tracing::debug!("workspace worker starting"); loop { tokio::select! { @@ -352,13 +355,16 @@ impl WorkspaceWorker { }, res = stream.message() => match res { - Err(e) => break tracing::error!("workspace '{}' stream closed: {}", name, e), - Ok(None) => break tracing::info!("leaving workspace {}", name), + Err(e) => break tracing::error!("workspace '{ws}' stream closed: {e}"), + Ok(None) => break tracing::info!("leaving workspace {ws}"), Ok(Some(WorkspaceEvent { event: None })) => { - tracing::warn!("workspace {} received empty event", name) + tracing::warn!("workspace {ws} received empty event") } Ok(Some(WorkspaceEvent { event: Some(ev) })) => { - let Some(inner) = weak.upgrade() else { break }; + let Some(inner) = weak.upgrade() else { + break tracing::debug!("workspace worker clean exit"); + }; + tracing::debug!("received workspace event: {ev:?}"); let update = crate::api::Event::from(&ev); match ev { // user @@ -391,7 +397,7 @@ impl WorkspaceWorker { if let Some(ws) = weak.upgrade() { cb.call(Workspace(ws)); } else { - break tracing::debug!("workspace worker clean exit"); + break tracing::debug!("workspace worker clean (late) exit"); } } }