chore(python): final cleanup before merge into dev

This commit is contained in:
cschen 2024-08-21 18:15:19 +02:00
parent dc7ae20b7d
commit 76f8058186
2 changed files with 35 additions and 17 deletions

View file

@ -15,22 +15,39 @@ use pyo3::{
types::PyFunction,
};
use std::sync::OnceLock;
use tokio::sync::{mpsc, oneshot};
// global reference to a current_thread tokio runtime
pub fn tokio() -> &'static tokio::runtime::Runtime {
use std::sync::OnceLock;
static RT: OnceLock<tokio::runtime::Runtime> = OnceLock::new();
RT.get_or_init(|| {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.on_thread_start(|| tracing::info!("tokio thread started."))
.on_thread_stop(|| tracing::info!("tokio thread stopped."))
.build()
.unwrap()
})
}
// #[pyfunction]
// fn register_event_loop(event_loop: PyObject) {
// static EVENT_LOOP: OnceLock<PyObject> = OnceLock::new();
// EVENT_LOOP.
// }
// #[pyfunction]
// fn setup_async(
// event_loop: PyObject,
// call_soon_thread_safe: PyObject, // asyncio.EventLoop.call_soon_threadsafe
// call_coroutine_thread_safe: PyObject, // asyncio.call_coroutine_threadsafe
// create_future: PyObject, // asyncio.EventLoop.create_future
// ) {
// let _ = EVENT_LOOP.get_or_init(|| event_loop);
// let _ = CALL_SOON.get_or_init(|| call_soon_thread_safe);
// let _ = CREATE_TASK.get_or_init(|| call_coroutine_thread_safe);
// let _ = CREATE_FUTURE.get_or_init(|| create_future);
// }
#[pyclass]
pub struct Promise(Option<tokio::task::JoinHandle<PyResult<PyObject>>>);
@ -113,7 +130,7 @@ impl Driver {
}
}
#[pyfunction]
fn init(logging_cb: Py<PyFunction>, debug: bool) -> PyResult<PyObject> {
fn init(py: Python, logging_cb: Py<PyFunction>, debug: bool) -> PyResult<Driver> {
let (tx, mut rx) = mpsc::unbounded_channel();
let level = if debug {
tracing::Level::DEBUG
@ -144,21 +161,23 @@ fn init(logging_cb: Py<PyFunction>, debug: bool) -> PyResult<PyObject> {
match log_subscribing {
Ok(_) => {
// the runtime is driven by the logger awaiting messages from codemp and echoing them back to
// python logger.
std::thread::spawn(move || {
tokio().block_on(async move {
loop {
tokio::select! {
biased;
Some(msg) = rx.recv() => {
let _ = Python::with_gil(|py| logging_cb.call1(py, (msg,)));
},
_ = &mut rt_stop_rx => { todo!() },
// a provided logger.
py.allow_threads(move || {
std::thread::spawn(move || {
tokio().block_on(async move {
loop {
tokio::select! {
biased;
Some(msg) = rx.recv() => {
let _ = Python::with_gil(|py| logging_cb.call1(py, (msg,)));
},
_ = &mut rt_stop_rx => { break }, // a bit brutal but will do for now.
}
}
}
})
})
});
Ok(Python::with_gil(|py| Driver(Some(rt_stop_tx)).into_py(py)))
Ok(Driver(Some(rt_stop_tx)))
}
Err(_) => Err(PyRuntimeError::new_err("codemp was already initialised.")),
}

View file

@ -5,7 +5,6 @@ use pyo3::prelude::*;
use super::Promise;
use crate::a_sync_allow_threads;
// use super::Promise;
#[pymethods]
impl Workspace {