diff --git a/src/lib.rs b/src/lib.rs index c5191ff..4995d17 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -3,3 +3,6 @@ pub mod tricks; #[cfg(feature = "rc")] pub mod rc; + +#[cfg(feature = "monitor")] +pub mod monitor; diff --git a/src/monitor.rs b/src/monitor.rs new file mode 100644 index 0000000..de66a7b --- /dev/null +++ b/src/monitor.rs @@ -0,0 +1,77 @@ +use std::{net::{TcpListener, TcpStream}, sync::{mpsc, Mutex}, io::{self, Write}}; + +use tracing::{info, error}; + +pub fn listen_logs() { + info!("listening for logs from injected payload ..."); + if let Ok(listener) = TcpListener::bind("127.0.0.1:13337") { + if let Ok((mut stream, addr)) = listener.accept() { + info!("incoming data ({})", addr); + while let Ok(n) = std::io::copy(&mut stream, &mut std::io::stdout()) { + if n <= 0 { break; } + } + info!("connection closed ({})", addr); + } + } +} + + +// TODO split this into its building blocks, rather than providing a +// complete and non-customizable solution +pub fn prepare_log_collector(addr: String) { + let (tx, rx) = mpsc::channel(); + tracing_subscriber::fmt() + .with_writer(Mutex::new(LogSink(tx))) // TODO can we get rid of the mutex by cloning tx? + .init(); + std::thread::spawn(move || log_dispatcher_worker(&addr, rx, std::time::Duration::from_secs(60))); +} + +pub fn log_dispatcher_worker(addr: &str, rx: mpsc::Receiver, reconnect_timeout: std::time::Duration) { + loop { + match TcpStream::connect(addr) { + Ok(mut stream) => { + loop { + match rx.recv() { + Ok(s) => { + match stream.write_all(s.as_bytes()) { + Ok(()) => {}, + Err(e) => { + error!("error sending log message to collector: {}", e); + break; + } + } + }, + Err(e) => { + error!("error consuming tracing channel: {}", e); + break; + } + } + } + }, + Err(e) => error!("could not connect to log collector: {}", e), + } + // don't abuse resources, sleep 30s and try again + std::thread::sleep(reconnect_timeout); + } +} + +struct LogSink(mpsc::Sender); + +impl io::Write for LogSink { + fn write(&mut self, buf: &[u8]) -> io::Result { + match std::str::from_utf8(buf) { + Ok(txt) => { + match self.0.send(txt.into()) { + Ok(()) => Ok(buf.len()), + Err(e) => Err(io::Error::new(io::ErrorKind::BrokenPipe, e.to_string())), + } + }, + Err(e) => Err(io::Error::new(io::ErrorKind::InvalidData, e.to_string())), + } + } + + fn flush(&mut self) -> io::Result<()> { + Ok(()) // nothing to do, channel "flushes" itself + } +} + diff --git a/src/vector/monitor.rs b/src/vector/monitor.rs deleted file mode 100644 index 46f132a..0000000 --- a/src/vector/monitor.rs +++ /dev/null @@ -1,16 +0,0 @@ -use std::net::TcpListener; - -use tracing::info; - -pub fn listen_logs() { - info!("listening for logs from injected payload ..."); - if let Ok(listener) = TcpListener::bind("127.0.0.1:13337") { - if let Ok((mut stream, addr)) = listener.accept() { - info!("incoming data ({})", addr); - while let Ok(n) = std::io::copy(&mut stream, &mut std::io::stdout()) { - if n <= 0 { break; } - } - info!("connection closed ({})", addr); - } - } -}