From b09c7a2206a0f44e394a748f19613644f20e2bac Mon Sep 17 00:00:00 2001 From: cschen Date: Tue, 20 Aug 2024 11:22:45 +0200 Subject: [PATCH] ON_HOLD: pyo3 experimental async is still too experimental, pivoting to blocking behaviour for now. --- src/ffi/python/client.rs | 25 ++++++++++++----- src/ffi/python/controllers.rs | 34 ++++++++++++++--------- src/ffi/python/mod.rs | 51 ++++++++++++++++++++++++----------- src/ffi/python/workspace.rs | 25 ++++++++++++----- 4 files changed, 94 insertions(+), 41 deletions(-) diff --git a/src/ffi/python/client.rs b/src/ffi/python/client.rs index b4a8a8e..0468785 100644 --- a/src/ffi/python/client.rs +++ b/src/ffi/python/client.rs @@ -2,21 +2,34 @@ use crate::workspace::Workspace; use crate::Client; use pyo3::prelude::*; -// #[pyfunction] -// pub fn codemp_init<'a>(py: Python<'a>) -> PyResult> { -// Ok(Py::new(py, Client::default())?) -// } +use super::tokio; #[pymethods] impl Client { #[new] fn __new__(host: String, username: String, password: String) -> crate::Result { - super::tokio().block_on(async move { Client::new(host, username, password).await }) + tokio().block_on(Client::new(host, username, password)) + } + + async fn dioboia(&self) { + tokio().spawn(async { tracing::info!("dioboia? si dioboia!") }); } #[pyo3(name = "join_workspace")] async fn pyjoin_workspace(&self, workspace: String) -> crate::Result { - self.join_workspace(workspace).await + // self.join_workspace(workspace).await + let rc = self.clone(); + crate::spawn_future!(rc.join_workspace(workspace)) + .await + .unwrap() + // This expands to if spawn_future_allow_threads! is used + // tokio() + // .spawn(super::AllowThreads(Box::pin(async move { + // rc.join_workspace(workspace).await + // }))) + // or if only spawn_future! + // tokio() + // .spawn(async move { rc.join_workspace(workspace).await }) } #[pyo3(name = "leave_workspace")] diff --git a/src/ffi/python/controllers.rs b/src/ffi/python/controllers.rs index 666fbd2..6dae779 100644 --- a/src/ffi/python/controllers.rs +++ b/src/ffi/python/controllers.rs @@ -5,6 +5,8 @@ use crate::buffer::Controller as BufferController; use crate::cursor::Controller as CursorController; use pyo3::prelude::*; +use crate::spawn_future; + // need to do manually since Controller is a trait implementation #[pymethods] impl CursorController { @@ -16,22 +18,26 @@ impl CursorController { buffer: path, user: None, }; - super::AllowThreads(self.send(pos)).await + let rc = self.clone(); + spawn_future!(rc.send(pos)).await.unwrap() } #[pyo3(name = "try_recv")] async fn pytry_recv(&self) -> crate::Result> { - super::AllowThreads(self.try_recv()).await + let rc = self.clone(); + spawn_future!(rc.try_recv()).await.unwrap() } #[pyo3(name = "recv")] async fn pyrecv(&self) -> crate::Result { - super::AllowThreads(self.recv()).await + let rc = self.clone(); + spawn_future!(rc.recv()).await.unwrap() } #[pyo3(name = "poll")] async fn pypoll(&self) -> crate::Result<()> { - super::AllowThreads(self.poll()).await + let rc = self.clone(); + spawn_future!(rc.poll()).await.unwrap() } #[pyo3(name = "stop")] @@ -45,7 +51,8 @@ impl CursorController { impl BufferController { #[pyo3(name = "content")] async fn pycontent(&self) -> crate::Result { - super::AllowThreads(Box::pin(self.content())).await + let rc = self.clone(); + spawn_future!(rc.content()).await.unwrap() } #[pyo3(name = "send")] @@ -56,22 +63,26 @@ impl BufferController { content: txt, hash: None, }; - super::AllowThreads(self.send(op)).await + let rc = self.clone(); + spawn_future!(rc.send(op)).await.unwrap() } #[pyo3(name = "try_recv")] async fn pytry_recv(&self) -> crate::Result> { - super::AllowThreads(self.try_recv()).await + let rc = self.clone(); + spawn_future!(rc.try_recv()).await.unwrap() } #[pyo3(name = "recv")] async fn pyrecv(&self) -> crate::Result { - super::AllowThreads(self.recv()).await + let rc = self.clone(); + spawn_future!(rc.recv()).await.unwrap() } #[pyo3(name = "poll")] async fn pypoll(&self) -> crate::Result<()> { - super::AllowThreads(self.poll()).await + let rc = self.clone(); + spawn_future!(rc.poll()).await.unwrap() } } @@ -97,9 +108,6 @@ impl Cursor { #[getter(user)] fn pyuser(&self) -> Option { - match self.user { - Some(user) => Some(user.to_string()), - None => None, - } + self.user.map(|user| user.to_string()) } } diff --git a/src/ffi/python/mod.rs b/src/ffi/python/mod.rs index ff454fd..f89af03 100644 --- a/src/ffi/python/mod.rs +++ b/src/ffi/python/mod.rs @@ -2,10 +2,10 @@ pub mod client; pub mod controllers; pub mod workspace; -use std::sync::Arc; use std::{ future::Future, - pin::{pin, Pin}, + pin::Pin, + sync::OnceLock, task::{Context, Poll}, }; @@ -17,10 +17,9 @@ use crate::{ }; use pyo3::exceptions::{PyConnectionError, PyRuntimeError, PySystemError}; use pyo3::prelude::*; -use tokio::sync::{mpsc, Mutex}; +use tokio::sync::watch; fn tokio() -> &'static tokio::runtime::Runtime { - use std::sync::OnceLock; static RT: OnceLock = OnceLock::new(); RT.get_or_init(|| { tokio::runtime::Builder::new_current_thread() @@ -43,14 +42,32 @@ where { type Output = F::Output; - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let waker = cx.waker(); - Python::with_gil(|gil| { - gil.allow_threads(|| pin!(&mut self.0).poll(&mut Context::from_waker(waker))) - }) + let fut = unsafe { self.map_unchecked_mut(|e| &mut e.0) }; + Python::with_gil(|py| py.allow_threads(|| fut.poll(&mut Context::from_waker(waker)))) } } +#[macro_export] +macro_rules! spawn_future_allow_threads { + ($fut:expr) => { + $crate::ffi::python::tokio().spawn($crate::ffi::python::AllowThreads(Box::pin( + async move { + tracing::info!("running future from rust."); + $fut.await + }, + ))) + }; +} + +#[macro_export] +macro_rules! spawn_future { + ($fut:expr) => { + $crate::ffi::python::tokio().spawn(async move { $fut.await }) + }; +} + impl From for PyErr { fn from(value: crate::Error) -> Self { match value { @@ -75,11 +92,11 @@ impl IntoPy for crate::api::User { } #[derive(Debug, Clone)] -struct LoggerProducer(mpsc::Sender); +struct LoggerProducer(watch::Sender); impl std::io::Write for LoggerProducer { fn write(&mut self, buf: &[u8]) -> std::io::Result { - let _ = self.0.try_send(String::from_utf8_lossy(buf).to_string()); // ignore: logger disconnected or with full buffer + let _ = self.0.send(String::from_utf8_lossy(buf).to_string()); // ignore: logger disconnected or with full buffer Ok(buf.len()) } @@ -89,13 +106,13 @@ impl std::io::Write for LoggerProducer { } #[pyclass] -struct PyLogger(Arc>>); +struct PyLogger(watch::Receiver); #[pymethods] impl PyLogger { #[new] fn init_logger(debug: bool) -> PyResult { - let (tx, rx) = mpsc::channel(256); + let (tx, mut rx) = watch::channel("logger initialised".to_string()); let level = if debug { tracing::Level::DEBUG } else { @@ -120,13 +137,17 @@ impl PyLogger { .with_writer(std::sync::Mutex::new(LoggerProducer(tx))) .try_init() { - Ok(_) => Ok(PyLogger(Arc::new(Mutex::new(rx)))), + Ok(_) => Ok(PyLogger(rx)), Err(_) => Err(PySystemError::new_err("A logger already exists")), } } - async fn listen(&self) -> Option { - AllowThreads(Box::pin(self.0.lock().await.recv())).await + async fn listen(&mut self) -> Option { + if self.0.changed().await.is_ok() { + return Some(self.0.borrow().clone()); + } else { + return None; + } } } diff --git a/src/ffi/python/workspace.rs b/src/ffi/python/workspace.rs index 5804c6c..ebb5d03 100644 --- a/src/ffi/python/workspace.rs +++ b/src/ffi/python/workspace.rs @@ -3,17 +3,21 @@ use crate::cursor::Controller as CursorController; use crate::workspace::Workspace; use pyo3::prelude::*; +use crate::spawn_future; + #[pymethods] impl Workspace { // join a workspace #[pyo3(name = "create")] async fn pycreate(&self, path: String) -> crate::Result<()> { - self.create(path.as_str()).await + let rc = self.clone(); + spawn_future!(rc.create(path.as_str())).await.unwrap() } #[pyo3(name = "attach")] async fn pyattach(&self, path: String) -> crate::Result { - self.attach(path.as_str()).await + let rc = self.clone(); + spawn_future!(rc.attach(path.as_str())).await.unwrap() } #[pyo3(name = "detach")] @@ -27,27 +31,34 @@ impl Workspace { #[pyo3(name = "event")] async fn pyevent(&self) -> crate::Result { - self.event().await + let rc = self.clone(); + spawn_future!(rc.event()).await.unwrap() } #[pyo3(name = "fetch_buffers")] async fn pyfetch_buffers(&self) -> crate::Result<()> { - self.fetch_buffers().await + let rc = self.clone(); + spawn_future!(rc.fetch_buffers()).await.unwrap() } #[pyo3(name = "fetch_users")] async fn pyfetch_users(&self) -> crate::Result<()> { - self.fetch_users().await + let rc = self.clone(); + spawn_future!(rc.fetch_users()).await.unwrap() } #[pyo3(name = "list_buffer_users")] async fn pylist_buffer_users(&self, path: String) -> crate::Result> { - self.list_buffer_users(path.as_str()).await + let rc = self.clone(); + spawn_future!(rc.list_buffer_users(path.as_str())) + .await + .unwrap() } #[pyo3(name = "delete")] async fn pydelete(&self, path: String) -> crate::Result<()> { - self.delete(path.as_str()).await + let rc = self.clone(); + spawn_future!(rc.delete(path.as_str())).await.unwrap() } #[pyo3(name = "id")]