feat: introduce tracing::instrument and more debug

This commit is contained in:
əlemi 2025-02-15 13:38:50 +01:00
parent 51fe6e4e82
commit a4c2583b6f
Signed by: alemi
GPG key ID: A4895B84D311642C
5 changed files with 66 additions and 18 deletions

View file

@ -8,7 +8,7 @@
/// `host`, `port` and `tls` affect all connections to all gRPC services; the /// `host`, `port` and `tls` affect all connections to all gRPC services; the
/// resulting endpoint is composed like this: /// resulting endpoint is composed like this:
/// http{tls?'s':''}://{host}:{port} /// http{tls?'s':''}://{host}:{port}
#[derive(Clone, Debug, Default)] #[derive(Clone, Default)]
#[cfg_attr(feature = "js", napi_derive::napi(object))] #[cfg_attr(feature = "js", napi_derive::napi(object))]
#[cfg_attr(feature = "py", pyo3::pyclass(get_all, set_all))] #[cfg_attr(feature = "py", pyo3::pyclass(get_all, set_all))]
#[cfg_attr(feature = "serialize", derive(serde::Serialize, serde::Deserialize))] #[cfg_attr(feature = "serialize", derive(serde::Serialize, serde::Deserialize))]
@ -16,7 +16,7 @@ pub struct Config {
/// User identifier used to register, possibly your email. /// User identifier used to register, possibly your email.
pub username: String, pub username: String,
/// User password chosen upon registration. /// User password chosen upon registration.
pub password: String, pub password: String, // must not leak this!
/// Address of server to connect to, default api.code.mp. /// Address of server to connect to, default api.code.mp.
pub host: Option<String>, pub host: Option<String>,
/// Port to connect to, default 50053. /// Port to connect to, default 50053.
@ -61,3 +61,25 @@ impl Config {
) )
} }
} }
// manual impl: we want to obfuscate the password field!!
// TODO: can we just tag password to be obfuscated in debug print?
// reimplementing the whole Debug thing is pretty lame
impl std::fmt::Debug for Config {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
if f.alternate() {
write!(f,
r#"""Config {{
username: {},
password: ********,
host: {:#?},
port: {:#?},
tls: {:#?}
}}"""#,
self.username, self.host, self.port, self.tls
)
} else {
write!(f, "Config {{ username: {}, password: ********, host: {:?}, port: {:?}, tls: {:?} }}", self.username, self.host, self.port, self.tls)
}
}
}

View file

@ -19,6 +19,7 @@ use super::controller::{BufferController, BufferControllerInner};
struct BufferWorker { struct BufferWorker {
agent_id: u32, agent_id: u32,
path: String, path: String,
workspace_id: String,
latest_version: watch::Sender<diamond_types::LocalVersion>, latest_version: watch::Sender<diamond_types::LocalVersion>,
local_version: watch::Sender<diamond_types::LocalVersion>, local_version: watch::Sender<diamond_types::LocalVersion>,
ack_rx: mpsc::UnboundedReceiver<LocalVersion>, ack_rx: mpsc::UnboundedReceiver<LocalVersion>,
@ -75,6 +76,7 @@ impl BufferController {
let worker = BufferWorker { let worker = BufferWorker {
agent_id, agent_id,
path: path.to_string(), path: path.to_string(),
workspace_id: workspace_id.to_string(),
latest_version: latest_version_tx, latest_version: latest_version_tx,
local_version: my_version_tx, local_version: my_version_tx,
ack_rx, ack_rx,
@ -95,15 +97,16 @@ impl BufferController {
BufferController(controller) BufferController(controller)
} }
#[tracing::instrument(skip(worker, tx, rx), fields(ws = worker.workspace_id, path = worker.path))]
async fn work( async fn work(
mut worker: BufferWorker, mut worker: BufferWorker,
tx: mpsc::Sender<Operation>, tx: mpsc::Sender<Operation>,
mut rx: Streaming<BufferEvent>, mut rx: Streaming<BufferEvent>,
) { ) {
tracing::debug!("controller worker started"); tracing::debug!("buffer worker started");
loop { loop {
if worker.controller.upgrade().is_none() { if worker.controller.upgrade().is_none() {
break; break tracing::debug!("buffer worker clean exit");
}; };
// block until one of these is ready // block until one of these is ready
@ -114,6 +117,7 @@ impl BufferController {
res = worker.ack_rx.recv() => match res { res = worker.ack_rx.recv() => match res {
None => break tracing::error!("ack channel closed"), None => break tracing::error!("ack channel closed"),
Some(v) => { Some(v) => {
tracing::debug!("client acked change");
worker.branch.merge(&worker.oplog, &v); worker.branch.merge(&worker.oplog, &v);
worker.local_version.send(worker.branch.local_version()) worker.local_version.send(worker.branch.local_version())
.unwrap_or_warn("could not ack local version"); .unwrap_or_warn("could not ack local version");
@ -160,11 +164,12 @@ impl BufferController {
} }
} }
tracing::debug!("controller worker stopped"); tracing::debug!("buffer worker stopped");
} }
} }
impl BufferWorker { impl BufferWorker {
#[tracing::instrument(skip(self, tx))]
async fn handle_editor_change(&mut self, change: TextChange, tx: &mpsc::Sender<Operation>) { async fn handle_editor_change(&mut self, change: TextChange, tx: &mpsc::Sender<Operation>) {
let last_ver = self.oplog.local_version(); let last_ver = self.oplog.local_version();
// clip to buffer extents // clip to buffer extents
@ -205,11 +210,16 @@ impl BufferWorker {
} }
} }
#[tracing::instrument(skip(self))]
async fn handle_server_change(&mut self, change: BufferEvent) -> bool { async fn handle_server_change(&mut self, change: BufferEvent) -> bool {
match self.controller.upgrade() { match self.controller.upgrade() {
None => true, // clean exit actually, just weird we caught it here None => { // clean exit actually, just weird we caught it here
tracing::debug!("clean exit while handling server change");
true
},
Some(controller) => match self.oplog.decode_and_add(&change.op.data) { Some(controller) => match self.oplog.decode_and_add(&change.op.data) {
Ok(local_version) => { Ok(local_version) => {
tracing::debug!("updating local version: {local_version:?}");
self.latest_version self.latest_version
.send(local_version) .send(local_version)
.unwrap_or_warn("failed to update latest version!"); .unwrap_or_warn("failed to update latest version!");
@ -229,6 +239,7 @@ impl BufferWorker {
} }
} }
#[tracing::instrument(skip(self, tx))]
async fn handle_delta_request(&mut self, tx: oneshot::Sender<Option<BufferUpdate>>) { async fn handle_delta_request(&mut self, tx: oneshot::Sender<Option<BufferUpdate>>) {
let last_ver = self.branch.local_version(); let last_ver = self.branch.local_version();
if let Some((lv, Some(dtop))) = self if let Some((lv, Some(dtop))) = self
@ -285,9 +296,11 @@ impl BufferWorker {
}, },
}, },
}; };
tracing::debug!("sending update {tc:?}");
tx.send(Some(tc)) tx.send(Some(tc))
.unwrap_or_warn("could not update ops channel -- is controller dead?"); .unwrap_or_warn("could not update ops channel -- is controller dead?");
} else { } else {
tracing::debug!("no enqueued changes");
tx.send(None) tx.send(None)
.unwrap_or_warn("could not update ops channel -- is controller dead?"); .unwrap_or_warn("could not update ops channel -- is controller dead?");
} }

View file

@ -47,6 +47,7 @@ struct ClientInner {
impl Client { impl Client {
/// Connect to the server, authenticate and instantiate a new [`Client`]. /// Connect to the server, authenticate and instantiate a new [`Client`].
#[tracing::instrument]
pub async fn connect(config: crate::api::Config) -> ConnectionResult<Self> { pub async fn connect(config: crate::api::Config) -> ConnectionResult<Self> {
// TODO move these two into network.rs // TODO move these two into network.rs
let channel = Endpoint::from_shared(config.endpoint())?.connect().await?; let channel = Endpoint::from_shared(config.endpoint())?.connect().await?;
@ -157,6 +158,7 @@ impl Client {
} }
/// Join and return a [`Workspace`]. /// Join and return a [`Workspace`].
#[tracing::instrument(skip(self, workspace), fields(ws = workspace.as_ref()))]
pub async fn attach_workspace( pub async fn attach_workspace(
&self, &self,
workspace: impl AsRef<str>, workspace: impl AsRef<str>,

View file

@ -13,6 +13,7 @@ use codemp_proto::cursor::{CursorEvent, CursorPosition};
use super::controller::{CursorController, CursorControllerInner}; use super::controller::{CursorController, CursorControllerInner};
struct CursorWorker { struct CursorWorker {
workspace_id: String,
op: mpsc::UnboundedReceiver<CursorPosition>, op: mpsc::UnboundedReceiver<CursorPosition>,
map: Arc<dashmap::DashMap<Uuid, User>>, map: Arc<dashmap::DashMap<Uuid, User>>,
stream: mpsc::Receiver<oneshot::Sender<Option<Cursor>>>, stream: mpsc::Receiver<oneshot::Sender<Option<Cursor>>>,
@ -24,6 +25,7 @@ struct CursorWorker {
} }
impl CursorWorker { impl CursorWorker {
#[tracing::instrument(skip(self, tx))]
fn handle_recv(&mut self, tx: oneshot::Sender<Option<Cursor>>) { fn handle_recv(&mut self, tx: oneshot::Sender<Option<Cursor>>) {
tx.send( tx.send(
self.store.pop_front().and_then(|event| { self.store.pop_front().and_then(|event| {
@ -71,6 +73,7 @@ impl CursorController {
let weak = Arc::downgrade(&controller); let weak = Arc::downgrade(&controller);
let worker = CursorWorker { let worker = CursorWorker {
workspace_id: workspace_id.to_string(),
op: op_rx, op: op_rx,
map: user_map, map: user_map,
stream: stream_rx, stream: stream_rx,
@ -86,16 +89,17 @@ impl CursorController {
CursorController(controller) CursorController(controller)
} }
#[tracing::instrument(skip(worker, tx, rx), fields(ws = worker.workspace_id))]
async fn work( async fn work(
mut worker: CursorWorker, mut worker: CursorWorker,
tx: mpsc::Sender<CursorPosition>, tx: mpsc::Sender<CursorPosition>,
mut rx: Streaming<CursorEvent>, mut rx: Streaming<CursorEvent>,
) { ) {
tracing::debug!("starting cursor worker");
loop { loop {
tracing::debug!("cursor worker polling");
if worker.controller.upgrade().is_none() { if worker.controller.upgrade().is_none() {
break; break tracing::debug!("cursor worker clean exit");
}; // clean exit: all controllers dropped };
tokio::select! { tokio::select! {
biased; biased;
@ -110,7 +114,7 @@ impl CursorController {
// server sents us a cursor // server sents us a cursor
Ok(Some(cur)) = rx.message() => match worker.controller.upgrade() { Ok(Some(cur)) = rx.message() => match worker.controller.upgrade() {
None => break, // clean exit, just weird that we got it here None => break tracing::debug!("cursor worker clean (late) exit"), // clean exit, just weird that we got it here
Some(controller) => { Some(controller) => {
tracing::debug!("received cursor from server"); tracing::debug!("received cursor from server");
worker.store.push_back(cur); worker.store.push_back(cur);
@ -127,8 +131,9 @@ impl CursorController {
// client wants to get next cursor event // client wants to get next cursor event
Some(tx) = worker.stream.recv() => worker.handle_recv(tx), Some(tx) = worker.stream.recv() => worker.handle_recv(tx),
else => break, else => break tracing::debug!("cursor worker clean-ish exit"),
} }
} }
tracing::debug!("stopping cursor worker");
} }
} }

View file

@ -84,6 +84,7 @@ impl AsyncReceiver<Event> for Workspace {
} }
impl Workspace { impl Workspace {
#[tracing::instrument(skip(name, user, token, claims), fields(ws = name))]
pub(crate) async fn connect( pub(crate) async fn connect(
name: String, name: String,
user: Arc<User>, user: Arc<User>,
@ -165,6 +166,7 @@ impl Workspace {
} }
/// Attach to a buffer and return a handle to it. /// Attach to a buffer and return a handle to it.
#[tracing::instrument(skip(self))]
pub async fn attach_buffer(&self, path: &str) -> ConnectionResult<buffer::Controller> { pub async fn attach_buffer(&self, path: &str) -> ConnectionResult<buffer::Controller> {
let mut worskspace_client = self.0.services.ws(); let mut worskspace_client = self.0.services.ws();
let request = tonic::Request::new(BufferNode { let request = tonic::Request::new(BufferNode {
@ -326,7 +328,7 @@ impl Workspace {
.0 .0
.filetree .filetree
.iter() .iter()
.filter(|f| filter.map_or(true, |flt| f.starts_with(flt))) .filter(|f| filter.is_none_or(|flt| f.starts_with(flt)))
.map(|f| f.clone()) .map(|f| f.clone())
.collect::<Vec<String>>(); .collect::<Vec<String>>();
tree.sort(); tree.sort();
@ -342,7 +344,8 @@ struct WorkspaceWorker {
} }
impl WorkspaceWorker { impl WorkspaceWorker {
pub(crate) async fn work(mut self, name: String, mut stream: Streaming<WorkspaceEvent>, weak: Weak<WorkspaceInner>) { #[tracing::instrument(skip(self, stream, weak))]
pub(crate) async fn work(mut self, ws: String, mut stream: Streaming<WorkspaceEvent>, weak: Weak<WorkspaceInner>) {
tracing::debug!("workspace worker starting"); tracing::debug!("workspace worker starting");
loop { loop {
tokio::select! { tokio::select! {
@ -352,13 +355,16 @@ impl WorkspaceWorker {
}, },
res = stream.message() => match res { res = stream.message() => match res {
Err(e) => break tracing::error!("workspace '{}' stream closed: {}", name, e), Err(e) => break tracing::error!("workspace '{ws}' stream closed: {e}"),
Ok(None) => break tracing::info!("leaving workspace {}", name), Ok(None) => break tracing::info!("leaving workspace {ws}"),
Ok(Some(WorkspaceEvent { event: None })) => { Ok(Some(WorkspaceEvent { event: None })) => {
tracing::warn!("workspace {} received empty event", name) tracing::warn!("workspace {ws} received empty event")
} }
Ok(Some(WorkspaceEvent { event: Some(ev) })) => { Ok(Some(WorkspaceEvent { event: Some(ev) })) => {
let Some(inner) = weak.upgrade() else { break }; let Some(inner) = weak.upgrade() else {
break tracing::debug!("workspace worker clean exit");
};
tracing::debug!("received workspace event: {ev:?}");
let update = crate::api::Event::from(&ev); let update = crate::api::Event::from(&ev);
match ev { match ev {
// user // user
@ -391,7 +397,7 @@ impl WorkspaceWorker {
if let Some(ws) = weak.upgrade() { if let Some(ws) = weak.upgrade() {
cb.call(Workspace(ws)); cb.call(Workspace(ws));
} else { } else {
break tracing::debug!("workspace worker clean exit"); break tracing::debug!("workspace worker clean (late) exit");
} }
} }
} }