wip(python): moving the rest of the glue to promises (empty promises)

This commit is contained in:
cschen 2024-08-21 15:02:44 +02:00
parent 4a575c587f
commit a4eb58cd4b
5 changed files with 107 additions and 64 deletions

48
dist/py/codemp.pyi vendored
View file

@ -3,21 +3,24 @@ from typing import Tuple, Optional, Callable
class Driver: class Driver:
""" """
this is akin to a big red button with a white "STOP" on top of it. this is akin to a big red button with a white "STOP" on top of it.
it is used to stop the runtime it is used to stop the runtime.
""" """
def stop(self) -> None: ... def stop(self) -> None: ...
def init(logger_cb: Callable, debug: bool) -> Driver: ... def init(logger_cb: Callable, debug: bool) -> Driver: ...
class RustPromise[T]: class Promise[T]:
""" """
This is a class akin to a future, which wraps a join handle from a spawned This is a class akin to a future, which wraps a join handle from a spawned
task on the rust side. you may call .pyawait() on this promise to block task on the rust side. you may call .pyawait() on this promise to block
until we have a result, or return immediately if we already have one. until we have a result, or return immediately if we already have one.
This only goes one way rust -> python. This only goes one way rust -> python.
It can either be used directly or you can wrap it inside a future python side.
""" """
def pyawait(self) -> T: ... def wait(self) -> T: ...
def is_done(self) -> bool: ...
class TextChange: class TextChange:
""" """
@ -39,11 +42,15 @@ class BufferController:
Handle to the controller for a specific buffer, which manages the back and forth Handle to the controller for a specific buffer, which manages the back and forth
of operations to and from other peers. of operations to and from other peers.
""" """
async def content(self) -> str: ... def content(self) -> Promise[str]: ...
async def send(self, start: int, end: int, txt: str) -> None: ... def send(self,
async def try_recv(self) -> Optional[TextChange]: ... start: int,
async def recv(self) -> TextChange: ... end: int,
async def poll(self) -> None: ... txt: str) -> Promise[None]: ...
def try_recv(self) -> Optional[TextChange]: ...
def recv(self) -> Promise[TextChange]: ...
def poll(self) -> Promise[None]: ...
def stop(self) -> bool: ...
@ -62,10 +69,13 @@ class CursorController:
Handle to the controller for a workspace, which manages the back and forth of Handle to the controller for a workspace, which manages the back and forth of
cursor movements to and from other peers cursor movements to and from other peers
""" """
async def send(self, path: str, start: Tuple[int, int], end: Tuple[int, int]) -> None: ... def send(self,
async def try_recv(self) -> Optional[Cursor]: ... path: str,
async def recv(self) -> Cursor: ... start: Tuple[int, int],
async def poll(self) -> None: ... end: Tuple[int, int]) -> Promise[None]: ...
def try_recv(self) -> Optional[Cursor]: ...
def recv(self) -> Promise[Cursor]: ...
def poll(self) -> Promise[None]: ...
def stop(self) -> bool: ... def stop(self) -> bool: ...
@ -74,13 +84,13 @@ class Workspace:
Handle to a workspace inside codemp. It manages buffers. Handle to a workspace inside codemp. It manages buffers.
A cursor is tied to the single workspace. A cursor is tied to the single workspace.
""" """
async def create(self, path: str) -> None: ... def create(self, path: str) -> Promise[None]: ...
async def attach(self, path: str) -> BufferController: ... def attach(self, path: str) -> Promise[BufferController]: ...
def detach(self, path: str) -> bool: ... def detach(self, path: str) -> bool: ...
async def fetch_buffers(self) -> None: ... def fetch_buffers(self) -> Promise[None]: ...
async def fetch_users(self) -> None: ... def fetch_users(self) -> Promise[None]: ...
async def list_buffer_users(self, path: str) -> list[str]: ... def list_buffer_users(self, path: str) -> Promise[list[str]]: ...
async def delete(self, path: str) -> None: ... def delete(self, path: str) -> Promise[None]: ...
def id(self) -> str: ... def id(self) -> str: ...
def cursor(self) -> CursorController: ... def cursor(self) -> CursorController: ...
def buffer_by_name(self, path: str) -> Optional[BufferController]: ... def buffer_by_name(self, path: str) -> Optional[BufferController]: ...
@ -94,7 +104,7 @@ class Client:
to a server and joining/creating new workspaces to a server and joining/creating new workspaces
""" """
def __new__(cls, host: str, username: str, password: str) -> None: ... def __new__(cls, host: str, username: str, password: str) -> None: ...
def join_workspace(self, workspace: str) -> RustPromise: ... def join_workspace(self, workspace: str) -> Promise[Workspace]: ...
def leave_workspace(self, workspace: str) -> bool: ... def leave_workspace(self, workspace: str) -> bool: ...
def get_workspace(self, id: str) -> Workspace: ... def get_workspace(self, id: str) -> Workspace: ...
def active_workspaces(self) -> list[str]: ... def active_workspaces(self) -> list[str]: ...

View file

@ -11,15 +11,27 @@ impl Client {
tokio().block_on(Client::new(host, username, password)) tokio().block_on(Client::new(host, username, password))
} }
// #[pyo3(name = "join_workspace")]
// async fn pyjoin_workspace(&self, workspace: String) -> JoinHandle<crate::Result<Workspace>> {
// tracing::info!("attempting to join the workspace {}", workspace);
// let this = self.clone();
// async {
// tokio()
// .spawn(async move { this.join_workspace(workspace).await })
// .await
// }
// }
#[pyo3(name = "join_workspace")] #[pyo3(name = "join_workspace")]
fn pyjoin_workspace(&self, workspace: String) -> PyResult<super::RustPromise> { fn pyjoin_workspace(&self, workspace: String) -> PyResult<super::Promise> {
tracing::info!("attempting to join the workspace {}", workspace); tracing::info!("attempting to join the workspace {}", workspace);
let this = self.clone(); let this = self.clone();
crate::a_sync!(this.join_workspace(workspace).await) crate::a_sync!(this.join_workspace(workspace).await)
// let rc = self.clone(); // let this = self.clone();
// Ok(super::RustPromise(Some(tokio().spawn(async move { // Ok(super::Promise(Some(tokio().spawn(async move {
// Ok(rc // Ok(this
// .join_workspace(workspace) // .join_workspace(workspace)
// .await // .await
// .map(|f| Python::with_gil(|py| f.into_py(py)))?) // .map(|f| Python::with_gil(|py| f.into_py(py)))?)

View file

@ -5,39 +5,41 @@ use crate::buffer::Controller as BufferController;
use crate::cursor::Controller as CursorController; use crate::cursor::Controller as CursorController;
use pyo3::prelude::*; use pyo3::prelude::*;
use crate::spawn_future; use super::Promise;
use crate::a_sync;
// need to do manually since Controller is a trait implementation // need to do manually since Controller is a trait implementation
#[pymethods] #[pymethods]
impl CursorController { impl CursorController {
#[pyo3(name = "send")] #[pyo3(name = "send")]
async fn pysend(&self, path: String, start: (i32, i32), end: (i32, i32)) -> crate::Result<()> { fn pysend(&self, path: String, start: (i32, i32), end: (i32, i32)) -> PyResult<Promise> {
let pos = Cursor { let pos = Cursor {
start, start,
end, end,
buffer: path, buffer: path,
user: None, user: None,
}; };
let rc = self.clone(); let this = self.clone();
spawn_future!(rc.send(pos)).await.unwrap() a_sync!(this.send(pos).await)
} }
#[pyo3(name = "try_recv")] #[pyo3(name = "try_recv")]
async fn pytry_recv(&self) -> crate::Result<Option<Cursor>> { fn pytry_recv(&self) -> PyResult<Promise> {
let rc = self.clone(); let this = self.clone();
spawn_future!(rc.try_recv()).await.unwrap() a_sync!(this.try_recv().await)
} }
#[pyo3(name = "recv")] #[pyo3(name = "recv")]
async fn pyrecv(&self) -> crate::Result<Cursor> { fn pyrecv(&self) -> crate::Result<Option<Cursor>> {
let rc = self.clone(); Ok(super::tokio().block_on(self.try_recv())?)
spawn_future!(rc.recv()).await.unwrap() // let this = self.clone();
// a_sync!(this.recv().await)
} }
#[pyo3(name = "poll")] #[pyo3(name = "poll")]
async fn pypoll(&self) -> crate::Result<()> { fn pypoll(&self) -> PyResult<Promise> {
let rc = self.clone(); let this = self.clone();
spawn_future!(rc.poll()).await.unwrap() a_sync!(this.poll().await)
} }
#[pyo3(name = "stop")] #[pyo3(name = "stop")]
@ -50,39 +52,45 @@ impl CursorController {
#[pymethods] #[pymethods]
impl BufferController { impl BufferController {
#[pyo3(name = "content")] #[pyo3(name = "content")]
async fn pycontent(&self) -> crate::Result<String> { async fn pycontent(&self) -> PyResult<Promise> {
let rc = self.clone(); let this = self.clone();
spawn_future!(rc.content()).await.unwrap() a_sync!(this.content().await)
} }
#[pyo3(name = "send")] #[pyo3(name = "send")]
async fn pysend(&self, start: u32, end: u32, txt: String) -> crate::Result<()> { async fn pysend(&self, start: u32, end: u32, txt: String) -> PyResult<Promise> {
let op = TextChange { let op = TextChange {
start, start,
end, end,
content: txt, content: txt,
hash: None, hash: None,
}; };
let rc = self.clone(); let this = self.clone();
spawn_future!(rc.send(op)).await.unwrap() a_sync!(this.send(op).await)
} }
#[pyo3(name = "try_recv")] #[pyo3(name = "try_recv")]
async fn pytry_recv(&self) -> crate::Result<Option<TextChange>> { fn pytry_recv(&self) -> crate::Result<Option<TextChange>> {
let rc = self.clone(); Ok(super::tokio().block_on(self.try_recv())?)
spawn_future!(rc.try_recv()).await.unwrap() // let this = self.clone();
// a_sync!(this.try_recv().await)
} }
#[pyo3(name = "recv")] #[pyo3(name = "recv")]
async fn pyrecv(&self) -> crate::Result<TextChange> { async fn pyrecv(&self) -> PyResult<Promise> {
let rc = self.clone(); let this = self.clone();
spawn_future!(rc.recv()).await.unwrap() a_sync!(this.recv().await)
} }
#[pyo3(name = "poll")] #[pyo3(name = "poll")]
async fn pypoll(&self) -> crate::Result<()> { async fn pypoll(&self) -> PyResult<Promise> {
let rc = self.clone(); let this = self.clone();
spawn_future!(rc.poll()).await.unwrap() a_sync!(this.poll().await)
}
#[pyo3(name = "stop")]
fn pystop(&self) -> bool {
self.stop()
} }
} }

View file

@ -2,12 +2,18 @@ pub mod client;
pub mod controllers; pub mod controllers;
pub mod workspace; pub mod workspace;
use std::{
future::{poll_fn, Future},
task::Poll,
};
use crate::{ use crate::{
api::{Cursor, TextChange}, api::{Cursor, TextChange},
buffer::Controller as BufferController, buffer::Controller as BufferController,
cursor::Controller as CursorController, cursor::Controller as CursorController,
Client, Workspace, Client, Workspace,
}; };
use pyo3::prelude::*; use pyo3::prelude::*;
use pyo3::{ use pyo3::{
exceptions::{PyConnectionError, PyRuntimeError, PySystemError}, exceptions::{PyConnectionError, PyRuntimeError, PySystemError},
@ -41,7 +47,7 @@ pub fn tokio() -> &'static tokio::runtime::Runtime {
// { // {
// type Output = F::Output; // type Output = F::Output;
// fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { // fn poll(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// let waker = cx.waker(); // let waker = cx.waker();
// let fut = unsafe { self.map_unchecked_mut(|e| &mut e.0) }; // let fut = unsafe { self.map_unchecked_mut(|e| &mut e.0) };
// Python::with_gil(|py| py.allow_threads(|| fut.poll(&mut Context::from_waker(waker)))) // Python::with_gil(|py| py.allow_threads(|| fut.poll(&mut Context::from_waker(waker))))
@ -62,7 +68,7 @@ pub fn tokio() -> &'static tokio::runtime::Runtime {
#[macro_export] #[macro_export]
macro_rules! a_sync { macro_rules! a_sync {
($x:expr) => {{ ($x:expr) => {{
Ok($crate::ffi::python::RustPromise(Some( Ok($crate::ffi::python::Promise(Some(
$crate::ffi::python::tokio() $crate::ffi::python::tokio()
.spawn(async move { Ok($x.map(|f| Python::with_gil(|py| f.into_py(py)))?) }), .spawn(async move { Ok($x.map(|f| Python::with_gil(|py| f.into_py(py)))?) }),
))) )))
@ -148,11 +154,11 @@ fn init(logging_cb: Py<PyFunction>, debug: bool) -> PyResult<PyObject> {
} }
#[pyclass] #[pyclass]
pub struct RustPromise(Option<tokio::task::JoinHandle<PyResult<PyObject>>>); pub struct Promise(Option<tokio::task::JoinHandle<PyResult<PyObject>>>);
#[pymethods] #[pymethods]
impl RustPromise { impl Promise {
#[pyo3(name = "pyawait")] #[pyo3(name = "wait")]
fn _await(&mut self) -> PyResult<PyObject> { fn _await(&mut self) -> PyResult<PyObject> {
match self.0.take() { match self.0.take() {
None => Err(PySystemError::new_err( None => Err(PySystemError::new_err(
@ -166,6 +172,13 @@ impl RustPromise {
}, },
} }
} }
fn is_done(&self) -> bool {
if let Some(handle) = self.0 {
return handle.is_finished();
}
false
}
} }
impl From<crate::Error> for PyErr { impl From<crate::Error> for PyErr {

View file

@ -3,7 +3,7 @@ use crate::cursor::Controller as CursorController;
use crate::workspace::Workspace; use crate::workspace::Workspace;
use pyo3::prelude::*; use pyo3::prelude::*;
use super::RustPromise; use super::Promise;
use crate::a_sync; use crate::a_sync;
// use super::Promise; // use super::Promise;
@ -11,13 +11,13 @@ use crate::a_sync;
impl Workspace { impl Workspace {
// join a workspace // join a workspace
#[pyo3(name = "create")] #[pyo3(name = "create")]
fn pycreate(&self, path: String) -> PyResult<RustPromise> { fn pycreate(&self, path: String) -> PyResult<Promise> {
let this = self.clone(); let this = self.clone();
a_sync!(this.create(path.as_str()).await) a_sync!(this.create(path.as_str()).await)
} }
#[pyo3(name = "attach")] #[pyo3(name = "attach")]
fn pyattach(&self, path: String) -> PyResult<RustPromise> { fn pyattach(&self, path: String) -> PyResult<Promise> {
let this = self.clone(); let this = self.clone();
a_sync!(this.attach(path.as_str()).await) a_sync!(this.attach(path.as_str()).await)
} }
@ -32,32 +32,32 @@ impl Workspace {
} }
#[pyo3(name = "event")] #[pyo3(name = "event")]
fn pyevent(&self) -> PyResult<RustPromise> { fn pyevent(&self) -> PyResult<Promise> {
let this = self.clone(); let this = self.clone();
a_sync!(this.event().await) a_sync!(this.event().await)
} }
#[pyo3(name = "fetch_buffers")] #[pyo3(name = "fetch_buffers")]
fn pyfetch_buffers(&self) -> PyResult<RustPromise> { fn pyfetch_buffers(&self) -> PyResult<Promise> {
let this = self.clone(); let this = self.clone();
a_sync!(this.fetch_buffers().await) a_sync!(this.fetch_buffers().await)
} }
#[pyo3(name = "fetch_users")] #[pyo3(name = "fetch_users")]
fn pyfetch_users(&self) -> PyResult<RustPromise> { fn pyfetch_users(&self) -> PyResult<Promise> {
let this = self.clone(); let this = self.clone();
a_sync!(this.fetch_users().await) a_sync!(this.fetch_users().await)
} }
#[pyo3(name = "list_buffer_users")] #[pyo3(name = "list_buffer_users")]
fn pylist_buffer_users(&self, path: String) -> PyResult<RustPromise> { fn pylist_buffer_users(&self, path: String) -> PyResult<Promise> {
// crate::Result<Vec<crate::api::User>> { // crate::Result<Vec<crate::api::User>> {
let this = self.clone(); let this = self.clone();
a_sync!(this.list_buffer_users(path.as_str()).await) a_sync!(this.list_buffer_users(path.as_str()).await)
} }
#[pyo3(name = "delete")] #[pyo3(name = "delete")]
fn pydelete(&self, path: String) -> PyResult<RustPromise> { fn pydelete(&self, path: String) -> PyResult<Promise> {
let this = self.clone(); let this = self.clone();
a_sync!(this.delete(path.as_str()).await) a_sync!(this.delete(path.as_str()).await)
} }