From feff54bcdfb02ecf96d33e731c93d495d6f9f69b Mon Sep 17 00:00:00 2001 From: cschen Date: Tue, 20 Aug 2024 21:09:10 +0200 Subject: [PATCH] feat(python): switched to a "Promise" approach chore(python): removed, commented out some of the earlier attempts feat(python): reworked the logger and tokio runtime approach, now the logging callback drives the runtime. --- dist/py/codemp.pyi | 24 +++- src/ffi/python/client.rs | 27 ++-- src/ffi/python/mod.rs | 257 +++++++++++++++++++----------------- src/ffi/python/workspace.rs | 14 +- 4 files changed, 170 insertions(+), 152 deletions(-) diff --git a/dist/py/codemp.pyi b/dist/py/codemp.pyi index e36cf6d..71d54d9 100644 --- a/dist/py/codemp.pyi +++ b/dist/py/codemp.pyi @@ -1,14 +1,24 @@ -from typing import Tuple, Optional +from typing import Tuple, Optional, Callable -class PyLogger: +class Driver: """ - A python wrapper for the tracing subscriber so that we can - receive logging messages from the library. + this is akin to a big red button with a white "STOP" on top of it. + it is used to stop the runtime """ - def __init__(self, debug) -> None: ... - async def listen(self) -> Optional[str]: ... + def stop(self) -> None: ... +def init(logger_cb: Callable, debug: bool) -> Driver: ... + +class RustPromise[T]: + """ + This is a class akin to a future, which wraps a join handle from a spawned + task on the rust side. you may call .pyawait() on this promise to block + until we have a result, or return immediately if we already have one. + This only goes one way rust -> python. + """ + def pyawait(self) -> T: ... + class TextChange: """ Editor agnostic representation of a text change, it translate between internal @@ -84,7 +94,7 @@ class Client: to a server and joining/creating new workspaces """ def __new__(cls, host: str, username: str, password: str) -> None: ... - async def join_workspace(self, workspace: str) -> Workspace: ... + def join_workspace(self, workspace: str) -> RustPromise: ... def leave_workspace(self, workspace: str) -> bool: ... def get_workspace(self, id: str) -> Workspace: ... def active_workspaces(self) -> list[str]: ... diff --git a/src/ffi/python/client.rs b/src/ffi/python/client.rs index 0468785..b1e8f5b 100644 --- a/src/ffi/python/client.rs +++ b/src/ffi/python/client.rs @@ -11,25 +11,18 @@ impl Client { tokio().block_on(Client::new(host, username, password)) } - async fn dioboia(&self) { - tokio().spawn(async { tracing::info!("dioboia? si dioboia!") }); - } - #[pyo3(name = "join_workspace")] - async fn pyjoin_workspace(&self, workspace: String) -> crate::Result { - // self.join_workspace(workspace).await + fn pyjoin_workspace(&self, workspace: String) -> PyResult { + tracing::info!("attempting to join the workspace {workspace}"); + + // crate::a_sync! { self => self.join_workspace(workspace).await } let rc = self.clone(); - crate::spawn_future!(rc.join_workspace(workspace)) - .await - .unwrap() - // This expands to if spawn_future_allow_threads! is used - // tokio() - // .spawn(super::AllowThreads(Box::pin(async move { - // rc.join_workspace(workspace).await - // }))) - // or if only spawn_future! - // tokio() - // .spawn(async move { rc.join_workspace(workspace).await }) + Ok(super::RustPromise(Some(tokio().spawn(async move { + Ok(rc + .join_workspace(workspace) + .await + .map(|f| Python::with_gil(|py| f.into_py(py)))?) + })))) } #[pyo3(name = "leave_workspace")] diff --git a/src/ffi/python/mod.rs b/src/ffi/python/mod.rs index c52a126..6bc80bc 100644 --- a/src/ffi/python/mod.rs +++ b/src/ffi/python/mod.rs @@ -2,13 +2,6 @@ pub mod client; pub mod controllers; pub mod workspace; -use std::{ - future::Future, - pin::Pin, - sync::OnceLock, - task::{Context, Poll}, -}; - use crate::{ api::{Cursor, TextChange}, buffer::Controller as BufferController, @@ -18,12 +11,14 @@ use crate::{ use pyo3::prelude::*; use pyo3::{ exceptions::{PyConnectionError, PyRuntimeError, PySystemError}, - ffi::PyFunctionObject, types::PyFunction, }; -use tokio::sync::watch; +use tokio::sync::{mpsc, oneshot}; + +// global reference to a current_thread tokio runtime pub fn tokio() -> &'static tokio::runtime::Runtime { + use std::sync::OnceLock; static RT: OnceLock = OnceLock::new(); RT.get_or_init(|| { tokio::runtime::Builder::new_current_thread() @@ -35,51 +30,151 @@ pub fn tokio() -> &'static tokio::runtime::Runtime { }) } -// workaround to allow the GIL to be released across awaits, waiting on -// https://github.com/PyO3/pyo3/pull/3610 -struct AllowThreads(F); +// // workaround to allow the GIL to be released across awaits, waiting on +// // https://github.com/PyO3/pyo3/pull/3610 +// struct AllowThreads(F); -impl Future for AllowThreads -where - F: Future + Unpin + Send, - F::Output: Send, -{ - type Output = F::Output; +// impl Future for AllowThreads +// where +// F: Future + Unpin + Send, +// F::Output: Send, +// { +// type Output = F::Output; - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let waker = cx.waker(); - let fut = unsafe { self.map_unchecked_mut(|e| &mut e.0) }; - Python::with_gil(|py| py.allow_threads(|| fut.poll(&mut Context::from_waker(waker)))) - } -} +// fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { +// let waker = cx.waker(); +// let fut = unsafe { self.map_unchecked_mut(|e| &mut e.0) }; +// Python::with_gil(|py| py.allow_threads(|| fut.poll(&mut Context::from_waker(waker)))) +// } +// } +// #[macro_export] +// macro_rules! spawn_future_allow_threads { +// ($fut:expr) => { +// $crate::ffi::python::tokio().spawn($crate::ffi::python::AllowThreads(Box::pin( +// async move { +// tracing::info!("running future from rust."); +// $fut.await +// }, +// ))) +// }; +// } #[macro_export] macro_rules! a_sync { ($($clone:ident)* => $x:expr) => { { $(let $clone = $clone.clone();)* - Ok(Promise(Some($crate::ffi::python::tokio().spawn(async move { $x })))) + Ok($crate::ffi::python::RustPromise(Some($crate::ffi::python::tokio().spawn(async move { + Ok($x.map(|f| Python::with_gil(|py| f.into_py(py)))?) + })))) } }; } -#[macro_export] -macro_rules! spawn_future_allow_threads { - ($fut:expr) => { - $crate::ffi::python::tokio().spawn($crate::ffi::python::AllowThreads(Box::pin( - async move { - tracing::info!("running future from rust."); - $fut.await - }, - ))) - }; +// #[macro_export] +// macro_rules! spawn_future { +// ($fut:expr) => { +// $crate::ffi::python::tokio().spawn(async move { $fut.await }) +// }; +// } + +#[derive(Debug, Clone)] +struct LoggerProducer(mpsc::UnboundedSender); + +impl std::io::Write for LoggerProducer { + fn write(&mut self, buf: &[u8]) -> std::io::Result { + let _ = self.0.send(String::from_utf8_lossy(buf).to_string()); // ignore: logger disconnected or with full buffer + Ok(buf.len()) + } + + fn flush(&mut self) -> std::io::Result<()> { + Ok(()) + } } -#[macro_export] -macro_rules! spawn_future { - ($fut:expr) => { - $crate::ffi::python::tokio().spawn(async move { $fut.await }) +#[pyclass] +pub struct Driver(Option>); +#[pymethods] +impl Driver { + fn stop(&mut self) -> PyResult<()> { + match self.0.take() { + Some(tx) => { + let _ = tx.send(()); + Ok(()) + } + None => Err(PySystemError::new_err("Runtime was already stopped.")), + } + } +} +#[pyfunction] +fn init(logging_cb: Py, debug: bool) -> PyResult { + let (tx, mut rx) = mpsc::unbounded_channel(); + let level = if debug { + tracing::Level::DEBUG + } else { + tracing::Level::INFO }; + + let format = tracing_subscriber::fmt::format() + .without_time() + .with_level(true) + .with_target(true) + .with_thread_ids(false) + .with_thread_names(false) + .with_file(false) + .with_line_number(false) + .with_source_location(false) + .compact(); + + let log_subscribing = tracing_subscriber::fmt() + .with_ansi(false) + .event_format(format) + .with_max_level(level) + .with_writer(std::sync::Mutex::new(LoggerProducer(tx))) + .try_init(); + + let (rt_stop_tx, rt_stop_rx) = oneshot::channel::<()>(); + + match log_subscribing { + Ok(_) => { + // the runtime is driven by the logger awaiting messages from codemp and echoing them back to + // python logger. + std::thread::spawn(move || { + tokio().block_on(async move { + tokio::select! { + biased; + Some(msg) = rx.recv() => { + let _ = Python::with_gil(|py| logging_cb.call1(py, (msg,))); + }, + _ = rt_stop_rx => { todo!() }, + } + }) + }); + Ok(Python::with_gil(|py| Driver(Some(rt_stop_tx)).into_py(py))) + } + Err(_) => Err(PyRuntimeError::new_err("codemp was already initialised.")), + } +} + +#[pyclass] +pub struct RustPromise(Option>>); + +#[pymethods] +impl RustPromise { + #[pyo3(name = "pyawait")] + fn _await(&mut self) -> PyResult { + match self.0.take() { + None => Err(PySystemError::new_err( + "promise can't be awaited multiple times!", + )), + Some(x) => match tokio().block_on(x) { + Err(e) => Err(PySystemError::new_err(format!( + "error awaiting promise: {e}" + ))), + Ok(res) => res, + }, + } + } } impl From for PyErr { @@ -105,90 +200,10 @@ impl IntoPy for crate::api::User { } } -#[derive(Debug, Clone)] -struct LoggerProducer(watch::Sender); - -impl std::io::Write for LoggerProducer { - fn write(&mut self, buf: &[u8]) -> std::io::Result { - let _ = self.0.send(String::from_utf8_lossy(buf).to_string()); // ignore: logger disconnected or with full buffer - Ok(buf.len()) - } - - fn flush(&mut self) -> std::io::Result<()> { - Ok(()) - } -} - -#[pyclass] -struct PyLogger(watch::Receiver); - -#[pymethods] -impl PyLogger { - #[new] - fn init_logger(debug: bool) -> PyResult { - let (tx, mut rx) = watch::channel("logger initialised".to_string()); - let level = if debug { - tracing::Level::DEBUG - } else { - tracing::Level::INFO - }; - - let format = tracing_subscriber::fmt::format() - .without_time() - .with_level(true) - .with_target(true) - .with_thread_ids(false) - .with_thread_names(false) - .with_file(false) - .with_line_number(false) - .with_source_location(false) - .compact(); - - match tracing_subscriber::fmt() - .with_ansi(false) - .event_format(format) - .with_max_level(level) - .with_writer(std::sync::Mutex::new(LoggerProducer(tx))) - .try_init() - { - Ok(_) => Ok(PyLogger(rx)), - Err(_) => Err(PySystemError::new_err("A logger already exists")), - } - } - - async fn listen(&mut self) -> Option { - if self.0.changed().await.is_ok() { - return Some(self.0.borrow().clone()); - } - - None - } -} - -#[pyclass] -pub struct Promise(Option>>); - -#[pymethods] -impl Promise { - #[pyo3(name = "await")] - fn a_wait(&mut self) -> PyResult { - match self.0.take() { - None => Err(PySystemError::new_err( - "promise can't be awaited multiple times!", - )), - Some(x) => match tokio().block_on(x) { - Err(e) => Err(PySystemError::new_err(format!( - "error awaiting promise: {e}" - ))), - Ok(res) => res, - }, - } - } -} - #[pymodule] fn codemp(m: &Bound<'_, PyModule>) -> PyResult<()> { - m.add_class::()?; + m.add_function(wrap_pyfunction!(init, m)?)?; + m.add_class::()?; m.add_class::()?; m.add_class::()?; diff --git a/src/ffi/python/workspace.rs b/src/ffi/python/workspace.rs index 4257bd4..03f15df 100644 --- a/src/ffi/python/workspace.rs +++ b/src/ffi/python/workspace.rs @@ -5,7 +5,7 @@ use pyo3::prelude::*; use crate::spawn_future; -use super::Promise; +// use super::Promise; #[pymethods] impl Workspace { @@ -31,12 +31,12 @@ impl Workspace { } } - #[pyo3(name = "event")] - fn pyevent(&self) -> Promise { - crate::a_sync! { self => - self.event().await - } - } + // #[pyo3(name = "event")] + // fn pyevent(&self) -> Promise { + // crate::a_sync! { self => + // self.event().await + // } + // } #[pyo3(name = "fetch_buffers")] async fn pyfetch_buffers(&self) -> crate::Result<()> {