mirror of
https://github.com/hexedtech/codemp.git
synced 2024-11-26 00:44:48 +01:00
feat: split services in subdir and separate files
Having them all in main.rs was becoming unmanageable
This commit is contained in:
parent
8316439a3e
commit
f854b902a1
5 changed files with 303 additions and 239 deletions
|
@ -1,224 +1,16 @@
|
||||||
use std::collections::VecDeque;
|
pub mod actor;
|
||||||
use std::{pin::Pin, sync::Arc};
|
pub mod service;
|
||||||
|
|
||||||
use tokio_stream::wrappers::ReceiverStream;
|
use std::sync::Arc;
|
||||||
use tracing::{info, debug, warn, error};
|
|
||||||
|
|
||||||
use operational_transform::OperationSeq;
|
use tracing::{debug, error, info, warn};
|
||||||
use state::{AlterState, StateManager};
|
|
||||||
use tonic::{transport::Server, Request, Response, Status};
|
|
||||||
|
|
||||||
pub mod proto {
|
use tonic::transport::Server;
|
||||||
tonic::include_proto!("workspace");
|
|
||||||
tonic::include_proto!("buffer");
|
|
||||||
}
|
|
||||||
|
|
||||||
use tokio::sync::{mpsc, broadcast};
|
use crate::{
|
||||||
use tokio_stream::{Stream, StreamExt}; // TODO example used this?
|
actor::state::StateManager,
|
||||||
|
service::{buffer::BufferService, session::SessionService, workspace::WorkspaceService},
|
||||||
use proto::buffer_server::{Buffer, BufferServer};
|
};
|
||||||
use proto::workspace_server::{Workspace, WorkspaceServer};
|
|
||||||
use proto::{Operation, SessionRequest, SessionResponse};
|
|
||||||
|
|
||||||
use tonic::Streaming;
|
|
||||||
use workspace::BufferView;
|
|
||||||
//use futures::{Stream, StreamExt};
|
|
||||||
|
|
||||||
use crate::workspace::Workspace as WorkspaceInstance; // TODO fuck x2!
|
|
||||||
|
|
||||||
pub mod state;
|
|
||||||
pub mod workspace;
|
|
||||||
|
|
||||||
type OperationStream = Pin<Box<dyn Stream<Item = Result<Operation, Status>> + Send>>;
|
|
||||||
|
|
||||||
pub struct BufferService {
|
|
||||||
state: Arc<StateManager>,
|
|
||||||
}
|
|
||||||
|
|
||||||
fn op_seq(o: &Operation) -> OperationSeq { todo!() }
|
|
||||||
fn op_net(o: &OperationSeq) -> Operation { todo!() }
|
|
||||||
|
|
||||||
// async fn buffer_worker(tx: mpsc::Sender<Result<Operation, Status>>, mut rx:Streaming<Operation>, mut rx_core: mpsc::Receiver<Operation>) {
|
|
||||||
async fn buffer_worker(bv: BufferView, mut client_rx: Streaming<Operation>, tx_client:mpsc::Sender<Result<Operation, Status>>, mut rx_core:broadcast::Receiver<(String, OperationSeq)>) {
|
|
||||||
let mut queue : VecDeque<Operation> = VecDeque::new();
|
|
||||||
loop {
|
|
||||||
tokio::select! {
|
|
||||||
client_op = client_rx.next() => {
|
|
||||||
if let Some(result) = client_op {
|
|
||||||
match result {
|
|
||||||
Ok(op) => {
|
|
||||||
bv.op(op_seq(&op)).await.unwrap(); // TODO make OpSeq from network Operation pkt!
|
|
||||||
queue.push_back(op);
|
|
||||||
},
|
|
||||||
Err(status) => {
|
|
||||||
error!("error receiving op from client: {:?}", status);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
},
|
|
||||||
|
|
||||||
server_op = rx_core.recv() => {
|
|
||||||
if let Ok(oop) = server_op {
|
|
||||||
let mut send_op = true;
|
|
||||||
for (i, _op) in queue.iter().enumerate() {
|
|
||||||
if true { // TODO must compare underlying OperationSeq here! (op.equals(server_op))
|
|
||||||
queue.remove(i);
|
|
||||||
send_op = false;
|
|
||||||
break;
|
|
||||||
} else {
|
|
||||||
// serv_op.transform(op); // TODO transform OpSeq !
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if send_op {
|
|
||||||
tx_client.send(Ok(op_net(&oop.1))).await.unwrap();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tonic::async_trait]
|
|
||||||
impl Buffer for BufferService {
|
|
||||||
// type ServerStreamingEchoStream = ResponseStream;
|
|
||||||
type AttachStream = OperationStream;
|
|
||||||
|
|
||||||
async fn attach(
|
|
||||||
&self,
|
|
||||||
req: Request<Streaming<Operation>>,
|
|
||||||
) -> Result<tonic::Response<OperationStream>, Status> {
|
|
||||||
let session_id : String;
|
|
||||||
if let Some(sid) = req.metadata().get("session_id") {
|
|
||||||
session_id = sid.to_str().unwrap().to_string();
|
|
||||||
} else {
|
|
||||||
return Err(Status::failed_precondition("Missing metadata key 'session_id'"));
|
|
||||||
}
|
|
||||||
|
|
||||||
let path : String;
|
|
||||||
if let Some(p) = req.metadata().get("path") {
|
|
||||||
path = p.to_str().unwrap().to_string();
|
|
||||||
} else {
|
|
||||||
return Err(Status::failed_precondition("Missing metadata key 'path'"));
|
|
||||||
}
|
|
||||||
// TODO make these above nicer? more concise? idk
|
|
||||||
|
|
||||||
if let Some(workspace) = self.state.workspaces.borrow().get(&session_id) {
|
|
||||||
let in_stream = req.into_inner();
|
|
||||||
let (tx_og, rx) = mpsc::channel::<Result<Operation, Status>>(128);
|
|
||||||
|
|
||||||
let b: BufferView = workspace.buffers.borrow().get(&path).unwrap().clone();
|
|
||||||
let w = workspace.clone();
|
|
||||||
tokio::spawn(async move { buffer_worker(b, in_stream, tx_og, w.bus.subscribe()).await; });
|
|
||||||
|
|
||||||
// echo just write the same data that was received
|
|
||||||
let out_stream = ReceiverStream::new(rx);
|
|
||||||
|
|
||||||
return Ok(Response::new(Box::pin(out_stream) as Self::AttachStream));
|
|
||||||
} else {
|
|
||||||
return Err(Status::not_found(format!("Norkspace with session_id {}", session_id)));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub struct WorkspaceService {
|
|
||||||
state: Arc<StateManager>,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tonic::async_trait]
|
|
||||||
impl Workspace for WorkspaceService {
|
|
||||||
async fn create(
|
|
||||||
&self,
|
|
||||||
request: Request<SessionRequest>,
|
|
||||||
) -> Result<Response<SessionResponse>, Status> {
|
|
||||||
debug!("create request: {:?}", request);
|
|
||||||
let r = request.into_inner();
|
|
||||||
|
|
||||||
let w = WorkspaceInstance::new(r.session_key.clone());
|
|
||||||
|
|
||||||
let reply = proto::SessionResponse {
|
|
||||||
session_key: r.session_key.clone(),
|
|
||||||
accepted: true,
|
|
||||||
content: None, // Some(w.content.clone()),
|
|
||||||
hash: None,
|
|
||||||
};
|
|
||||||
|
|
||||||
// self.tx.send(AlterState::ADD{key: r.session_key.clone(), w}).await.unwrap();
|
|
||||||
|
|
||||||
Ok(Response::new(reply))
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn sync(
|
|
||||||
&self,
|
|
||||||
request: Request<SessionRequest>,
|
|
||||||
) -> Result<Response<SessionResponse>, Status> {
|
|
||||||
debug!("sync request: {:?}", request);
|
|
||||||
let r = request.into_inner();
|
|
||||||
|
|
||||||
if let Some(w) = self.state.workspaces.borrow().get(&r.session_key) {
|
|
||||||
if let Some(buf) = w.buffers.borrow().get(&r.session_key) {
|
|
||||||
let reply = proto::SessionResponse {
|
|
||||||
session_key: r.session_key,
|
|
||||||
accepted: true,
|
|
||||||
content: Some(buf.content.borrow().clone()),
|
|
||||||
hash: None,
|
|
||||||
};
|
|
||||||
|
|
||||||
Ok(Response::new(reply))
|
|
||||||
} else {
|
|
||||||
Err(Status::out_of_range("fuck you".to_string()))
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
Err(Status::out_of_range("fuck you".to_string()))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO make it do something
|
|
||||||
async fn join(
|
|
||||||
&self,
|
|
||||||
request: Request<SessionRequest>,
|
|
||||||
) -> Result<Response<SessionResponse>, Status> {
|
|
||||||
debug!("join request: {:?}", request);
|
|
||||||
|
|
||||||
let reply = proto::SessionResponse {
|
|
||||||
session_key: request.into_inner().session_key,
|
|
||||||
accepted: true,
|
|
||||||
content: None,
|
|
||||||
hash: None,
|
|
||||||
};
|
|
||||||
|
|
||||||
Ok(Response::new(reply))
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn leave(
|
|
||||||
&self,
|
|
||||||
request: Request<SessionRequest>,
|
|
||||||
) -> Result<Response<SessionResponse>, Status> {
|
|
||||||
debug!("leave request: {:?}", request);
|
|
||||||
let r = request.into_inner();
|
|
||||||
let mut removed = false;
|
|
||||||
|
|
||||||
if self.state.workspaces.borrow().get(&r.session_key).is_some() {
|
|
||||||
self.state.op_tx
|
|
||||||
.send(AlterState::REMOVE {
|
|
||||||
key: r.session_key.clone(),
|
|
||||||
})
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
removed = true; // TODO this is a lie! Verify it
|
|
||||||
}
|
|
||||||
|
|
||||||
let reply = proto::SessionResponse {
|
|
||||||
session_key: r.session_key,
|
|
||||||
accepted: removed,
|
|
||||||
content: None,
|
|
||||||
hash: None,
|
|
||||||
};
|
|
||||||
|
|
||||||
Ok(Response::new(reply))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||||
|
@ -228,33 +20,13 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||||
|
|
||||||
let state = Arc::new(StateManager::new());
|
let state = Arc::new(StateManager::new());
|
||||||
|
|
||||||
let greeter = WorkspaceService { state: state.clone() };
|
|
||||||
let processor = BufferService { state: state.clone() };
|
|
||||||
|
|
||||||
info!("Starting server");
|
info!("Starting server");
|
||||||
|
|
||||||
Server::builder()
|
Server::builder()
|
||||||
.add_service(WorkspaceServer::new(greeter))
|
.add_service(WorkspaceService::server(state.clone()))
|
||||||
.add_service(BufferServer::new(processor))
|
.add_service(BufferService::server(state.clone()))
|
||||||
.serve(addr)
|
.serve(addr)
|
||||||
.await?;
|
.await?;
|
||||||
/*
|
|
||||||
|
|
||||||
fn main() {
|
|
||||||
// install global collector configured based on RUST_LOG env var.
|
|
||||||
tracing_subscriber::fmt::init();
|
|
||||||
|
|
||||||
let number_of_yaks = 3;
|
|
||||||
// this creates a new event, outside of any spans.
|
|
||||||
info!(number_of_yaks, "preparing to shave yaks");
|
|
||||||
|
|
||||||
let number_shaved = yak_shave::shave_all(number_of_yaks);
|
|
||||||
info!(
|
|
||||||
all_yaks_shaved = number_shaved == number_of_yaks,
|
|
||||||
"yak shaving completed."
|
|
||||||
);
|
|
||||||
}
|
|
||||||
*/
|
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
153
src/server/service/buffer.rs
Normal file
153
src/server/service/buffer.rs
Normal file
|
@ -0,0 +1,153 @@
|
||||||
|
use std::collections::VecDeque;
|
||||||
|
use std::{pin::Pin, sync::Arc};
|
||||||
|
|
||||||
|
use tokio_stream::wrappers::ReceiverStream;
|
||||||
|
use tracing::{debug, error, info, warn};
|
||||||
|
|
||||||
|
use operational_transform::OperationSeq;
|
||||||
|
use tonic::{transport::Server, Request, Response, Status};
|
||||||
|
|
||||||
|
pub mod proto {
|
||||||
|
tonic::include_proto!("session");
|
||||||
|
tonic::include_proto!("workspace");
|
||||||
|
tonic::include_proto!("buffer");
|
||||||
|
}
|
||||||
|
|
||||||
|
use tokio::sync::{broadcast, mpsc};
|
||||||
|
use tokio_stream::{Stream, StreamExt}; // TODO example used this?
|
||||||
|
|
||||||
|
use proto::buffer_server::{Buffer, BufferServer};
|
||||||
|
use proto::session_server::{Session, SessionServer};
|
||||||
|
use proto::workspace_server::{Workspace, WorkspaceServer};
|
||||||
|
use proto::{BufferList, Event, Operation, SessionRequest, SessionResponse, WorkspaceRequest};
|
||||||
|
|
||||||
|
use tonic::Streaming;
|
||||||
|
//use futures::{Stream, StreamExt};
|
||||||
|
|
||||||
|
use crate::actor::{buffer::BufferView, state::{AlterState, StateManager}, workspace::Workspace as WorkspaceInstance};
|
||||||
|
|
||||||
|
use self::proto::{BufferPayload, BufferResponse}; // TODO fuck x2!
|
||||||
|
|
||||||
|
type OperationStream = Pin<Box<dyn Stream<Item = Result<Operation, Status>> + Send>>;
|
||||||
|
|
||||||
|
pub struct BufferService {
|
||||||
|
state: Arc<StateManager>,
|
||||||
|
}
|
||||||
|
|
||||||
|
fn op_seq(o: &Operation) -> OperationSeq {
|
||||||
|
todo!()
|
||||||
|
}
|
||||||
|
fn op_net(o: &OperationSeq) -> Operation {
|
||||||
|
todo!()
|
||||||
|
}
|
||||||
|
|
||||||
|
// async fn buffer_worker(tx: mpsc::Sender<Result<Operation, Status>>, mut rx:Streaming<Operation>, mut rx_core: mpsc::Receiver<Operation>) {
|
||||||
|
async fn buffer_worker(
|
||||||
|
bv: BufferView,
|
||||||
|
mut client_rx: Streaming<Operation>,
|
||||||
|
tx_client: mpsc::Sender<Result<Operation, Status>>,
|
||||||
|
mut rx_core: broadcast::Receiver<(String, OperationSeq)>,
|
||||||
|
) {
|
||||||
|
let mut queue: VecDeque<Operation> = VecDeque::new();
|
||||||
|
loop {
|
||||||
|
tokio::select! {
|
||||||
|
client_op = client_rx.next() => {
|
||||||
|
if let Some(result) = client_op {
|
||||||
|
match result {
|
||||||
|
Ok(op) => {
|
||||||
|
bv.op(op_seq(&op)).await.unwrap(); // TODO make OpSeq from network Operation pkt!
|
||||||
|
queue.push_back(op);
|
||||||
|
},
|
||||||
|
Err(status) => {
|
||||||
|
error!("error receiving op from client: {:?}", status);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
|
||||||
|
server_op = rx_core.recv() => {
|
||||||
|
if let Ok(oop) = server_op {
|
||||||
|
let mut send_op = true;
|
||||||
|
for (i, _op) in queue.iter().enumerate() {
|
||||||
|
if true { // TODO must compare underlying OperationSeq here! (op.equals(server_op))
|
||||||
|
queue.remove(i);
|
||||||
|
send_op = false;
|
||||||
|
break;
|
||||||
|
} else {
|
||||||
|
// serv_op.transform(op); // TODO transform OpSeq !
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if send_op {
|
||||||
|
tx_client.send(Ok(op_net(&oop.1))).await.unwrap();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tonic::async_trait]
|
||||||
|
impl Buffer for BufferService {
|
||||||
|
// type ServerStreamingEchoStream = ResponseStream;
|
||||||
|
type AttachStream = OperationStream;
|
||||||
|
|
||||||
|
async fn attach(
|
||||||
|
&self,
|
||||||
|
req: Request<Streaming<Operation>>,
|
||||||
|
) -> Result<tonic::Response<OperationStream>, Status> {
|
||||||
|
let session_id: String;
|
||||||
|
if let Some(sid) = req.metadata().get("session_id") {
|
||||||
|
session_id = sid.to_str().unwrap().to_string();
|
||||||
|
} else {
|
||||||
|
return Err(Status::failed_precondition(
|
||||||
|
"Missing metadata key 'session_id'",
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
let path: String;
|
||||||
|
if let Some(p) = req.metadata().get("path") {
|
||||||
|
path = p.to_str().unwrap().to_string();
|
||||||
|
} else {
|
||||||
|
return Err(Status::failed_precondition("Missing metadata key 'path'"));
|
||||||
|
}
|
||||||
|
// TODO make these above nicer? more concise? idk
|
||||||
|
|
||||||
|
if let Some(workspace) = self.state.workspaces.borrow().get(&session_id) {
|
||||||
|
let in_stream = req.into_inner();
|
||||||
|
let (tx_og, rx) = mpsc::channel::<Result<Operation, Status>>(128);
|
||||||
|
|
||||||
|
let b: BufferView = workspace.buffers.borrow().get(&path).unwrap().clone();
|
||||||
|
let w = workspace.clone();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
buffer_worker(b, in_stream, tx_og, w.bus.subscribe()).await;
|
||||||
|
});
|
||||||
|
|
||||||
|
// echo just write the same data that was received
|
||||||
|
let out_stream = ReceiverStream::new(rx);
|
||||||
|
|
||||||
|
return Ok(Response::new(Box::pin(out_stream) as Self::AttachStream));
|
||||||
|
} else {
|
||||||
|
return Err(Status::not_found(format!(
|
||||||
|
"Norkspace with session_id {}",
|
||||||
|
session_id
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn push(&self, req:Request<BufferPayload>) -> Result<Response<BufferResponse>, Status> {
|
||||||
|
todo!()
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn pull(&self, req:Request<BufferPayload>) -> Result<Response<BufferPayload>, Status> {
|
||||||
|
todo!()
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
impl BufferService {
|
||||||
|
// TODO is this smart? Should I let main() instantiate servers?
|
||||||
|
pub fn server(state: Arc<StateManager>) -> BufferServer<BufferService> {
|
||||||
|
BufferServer::new(BufferService { state })
|
||||||
|
}
|
||||||
|
}
|
3
src/server/service/mod.rs
Normal file
3
src/server/service/mod.rs
Normal file
|
@ -0,0 +1,3 @@
|
||||||
|
pub mod buffer;
|
||||||
|
pub mod session;
|
||||||
|
pub mod workspace;
|
88
src/server/service/session.rs
Normal file
88
src/server/service/session.rs
Normal file
|
@ -0,0 +1,88 @@
|
||||||
|
pub mod proto {
|
||||||
|
tonic::include_proto!("session");
|
||||||
|
}
|
||||||
|
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use tracing::{debug, error, info, warn};
|
||||||
|
|
||||||
|
use tokio_stream::{Stream, StreamExt}; // TODO example used this?
|
||||||
|
|
||||||
|
use tonic::{transport::Server, Request, Response, Status};
|
||||||
|
|
||||||
|
use proto::session_server::{Session, SessionServer};
|
||||||
|
use proto::{SessionRequest, SessionResponse};
|
||||||
|
|
||||||
|
use crate::actor::{
|
||||||
|
buffer::BufferView,
|
||||||
|
state::{AlterState, StateManager},
|
||||||
|
workspace::Workspace as WorkspaceInstance, // TODO fuck x2!
|
||||||
|
};
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct SessionService {
|
||||||
|
state: Arc<StateManager>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tonic::async_trait]
|
||||||
|
impl Session for SessionService {
|
||||||
|
async fn create(
|
||||||
|
&self,
|
||||||
|
request: Request<SessionRequest>,
|
||||||
|
) -> Result<Response<SessionResponse>, Status> {
|
||||||
|
debug!("create request: {:?}", request);
|
||||||
|
let r = request.into_inner();
|
||||||
|
|
||||||
|
let w = WorkspaceInstance::new(r.session_key.clone());
|
||||||
|
|
||||||
|
let reply = proto::SessionResponse {
|
||||||
|
session_key: r.session_key.clone(),
|
||||||
|
accepted: true,
|
||||||
|
};
|
||||||
|
|
||||||
|
// self.tx.send(AlterState::ADD{key: r.session_key.clone(), w}).await.unwrap();
|
||||||
|
|
||||||
|
Ok(Response::new(reply))
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn join(
|
||||||
|
&self,
|
||||||
|
request: Request<SessionRequest>,
|
||||||
|
) -> Result<Response<SessionResponse>, Status> {
|
||||||
|
debug!("join request: {:?}", request);
|
||||||
|
|
||||||
|
let reply = proto::SessionResponse {
|
||||||
|
session_key: request.into_inner().session_key,
|
||||||
|
accepted: true,
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok(Response::new(reply))
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn leave(
|
||||||
|
&self,
|
||||||
|
request: Request<SessionRequest>,
|
||||||
|
) -> Result<Response<SessionResponse>, Status> {
|
||||||
|
debug!("leave request: {:?}", request);
|
||||||
|
let r = request.into_inner();
|
||||||
|
let mut removed = false;
|
||||||
|
|
||||||
|
if self.state.workspaces.borrow().get(&r.session_key).is_some() {
|
||||||
|
self.state
|
||||||
|
.op_tx
|
||||||
|
.send(AlterState::REMOVE {
|
||||||
|
key: r.session_key.clone(),
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
removed = true; // TODO this is a lie! Verify it
|
||||||
|
}
|
||||||
|
|
||||||
|
let reply = proto::SessionResponse {
|
||||||
|
session_key: r.session_key,
|
||||||
|
accepted: removed,
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok(Response::new(reply))
|
||||||
|
}
|
||||||
|
}
|
48
src/server/service/workspace.rs
Normal file
48
src/server/service/workspace.rs
Normal file
|
@ -0,0 +1,48 @@
|
||||||
|
use std::{pin::Pin, sync::Arc};
|
||||||
|
|
||||||
|
// use tracing::{debug, error, info, warn};
|
||||||
|
|
||||||
|
use tonic::{Request, Response, Status};
|
||||||
|
|
||||||
|
pub mod proto {
|
||||||
|
tonic::include_proto!("workspace");
|
||||||
|
}
|
||||||
|
|
||||||
|
use tokio_stream::Stream; // TODO example used this?
|
||||||
|
|
||||||
|
use proto::workspace_server::{Workspace, WorkspaceServer};
|
||||||
|
use proto::{BufferList, Event, WorkspaceRequest};
|
||||||
|
|
||||||
|
use crate::actor::state::StateManager;
|
||||||
|
|
||||||
|
type EventStream = Pin<Box<dyn Stream<Item = Result<Event, Status>> + Send>>;
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct WorkspaceService {
|
||||||
|
state: Arc<StateManager>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tonic::async_trait]
|
||||||
|
impl Workspace for WorkspaceService {
|
||||||
|
type SubscribeStream = EventStream;
|
||||||
|
|
||||||
|
async fn subscribe(
|
||||||
|
&self,
|
||||||
|
req: Request<WorkspaceRequest>,
|
||||||
|
) -> Result<tonic::Response<EventStream>, Status> {
|
||||||
|
todo!()
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn buffers(
|
||||||
|
&self,
|
||||||
|
req: Request<WorkspaceRequest>,
|
||||||
|
) -> Result<Response<BufferList>, Status> {
|
||||||
|
todo!()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl WorkspaceService {
|
||||||
|
pub fn server(state: Arc<StateManager>) -> WorkspaceServer<WorkspaceService> {
|
||||||
|
WorkspaceServer::new(WorkspaceService { state })
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in a new issue