mirror of
https://github.com/hexedtech/codemp-nvim.git
synced 2024-11-22 07:24:52 +01:00
feat: added tracing, added buffer_worker
This commit is contained in:
parent
60e6f4640c
commit
2287793cd9
2 changed files with 130 additions and 69 deletions
|
@ -16,6 +16,8 @@ name = "codemp-client"
|
|||
path = "src/client/main.rs"
|
||||
|
||||
[dependencies]
|
||||
tracing = "0.1"
|
||||
tracing-subscriber = "0.3"
|
||||
tonic = "0.7"
|
||||
prost = "0.10"
|
||||
futures = "0.3"
|
||||
|
|
|
@ -1,6 +1,11 @@
|
|||
use std::{collections::HashMap, pin::Pin, sync::Arc};
|
||||
use std::collections::VecDeque;
|
||||
use std::{pin::Pin, sync::Arc};
|
||||
|
||||
use state::AlterState;
|
||||
use tokio_stream::wrappers::ReceiverStream;
|
||||
use tracing::{info, debug, warn, error};
|
||||
|
||||
use operational_transform::OperationSeq;
|
||||
use state::{AlterState, StateManager};
|
||||
use tonic::{transport::Server, Request, Response, Status};
|
||||
|
||||
pub mod proto {
|
||||
|
@ -8,24 +13,71 @@ pub mod proto {
|
|||
tonic::include_proto!("buffer");
|
||||
}
|
||||
|
||||
use tokio::sync::{mpsc, watch};
|
||||
use tokio_stream::{wrappers::ReceiverStream, Stream, StreamExt}; // TODO example used this?
|
||||
use tokio::sync::{mpsc, broadcast};
|
||||
use tokio_stream::{Stream, StreamExt}; // TODO example used this?
|
||||
|
||||
use proto::buffer_server::{Buffer, BufferServer};
|
||||
use proto::workspace_server::{Workspace, WorkspaceServer};
|
||||
use proto::{Operation, SessionRequest, SessionResponse};
|
||||
|
||||
use tonic::Streaming;
|
||||
use workspace::BufferView;
|
||||
//use futures::{Stream, StreamExt};
|
||||
|
||||
use crate::workspace::Workspace as WorkspaceInstance; // TODO fuck!
|
||||
use crate::workspace::Workspace as WorkspaceInstance; // TODO fuck x2!
|
||||
|
||||
pub mod state;
|
||||
pub mod workspace;
|
||||
|
||||
type OperationStream = Pin<Box<dyn Stream<Item = Result<Operation, Status>> + Send>>;
|
||||
|
||||
pub struct BufferService {}
|
||||
pub struct BufferService {
|
||||
state: Arc<StateManager>,
|
||||
}
|
||||
|
||||
fn op_seq(o: &Operation) -> OperationSeq { todo!() }
|
||||
fn op_net(o: &OperationSeq) -> Operation { todo!() }
|
||||
|
||||
// async fn buffer_worker(tx: mpsc::Sender<Result<Operation, Status>>, mut rx:Streaming<Operation>, mut rx_core: mpsc::Receiver<Operation>) {
|
||||
async fn buffer_worker(bv: BufferView, mut client_rx: Streaming<Operation>, tx_client:mpsc::Sender<Result<Operation, Status>>, mut rx_core:broadcast::Receiver<(String, OperationSeq)>) {
|
||||
let mut queue : VecDeque<Operation> = VecDeque::new();
|
||||
loop {
|
||||
tokio::select! {
|
||||
client_op = client_rx.next() => {
|
||||
if let Some(result) = client_op {
|
||||
match result {
|
||||
Ok(op) => {
|
||||
bv.op(op_seq(&op)).await.unwrap(); // TODO make OpSeq from network Operation pkt!
|
||||
queue.push_back(op);
|
||||
},
|
||||
Err(status) => {
|
||||
error!("error receiving op from client: {:?}", status);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
|
||||
server_op = rx_core.recv() => {
|
||||
if let Ok(oop) = server_op {
|
||||
let mut send_op = true;
|
||||
for (i, _op) in queue.iter().enumerate() {
|
||||
if true { // TODO must compare underlying OperationSeq here! (op.equals(server_op))
|
||||
queue.remove(i);
|
||||
send_op = false;
|
||||
break;
|
||||
} else {
|
||||
// serv_op.transform(op); // TODO transform OpSeq !
|
||||
}
|
||||
}
|
||||
if send_op {
|
||||
tx_client.send(Ok(op_net(&oop.1))).await.unwrap();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[tonic::async_trait]
|
||||
impl Buffer for BufferService {
|
||||
|
@ -36,61 +88,42 @@ impl Buffer for BufferService {
|
|||
&self,
|
||||
req: Request<Streaming<Operation>>,
|
||||
) -> Result<tonic::Response<OperationStream>, Status> {
|
||||
println!("EchoServer::bidirectional_streaming_echo");
|
||||
let session_id : String;
|
||||
if let Some(sid) = req.metadata().get("session_id") {
|
||||
session_id = sid.to_str().unwrap().to_string();
|
||||
} else {
|
||||
return Err(Status::failed_precondition("Missing metadata key 'session_id'"));
|
||||
}
|
||||
|
||||
let mut in_stream = req.into_inner();
|
||||
let (tx_og, rx) = mpsc::channel(128);
|
||||
let path : String;
|
||||
if let Some(p) = req.metadata().get("path") {
|
||||
path = p.to_str().unwrap().to_string();
|
||||
} else {
|
||||
return Err(Status::failed_precondition("Missing metadata key 'path'"));
|
||||
}
|
||||
// TODO make these above nicer? more concise? idk
|
||||
|
||||
// this spawn here is required if you want to handle connection error.
|
||||
// If we just map `in_stream` and write it back as `out_stream` the `out_stream`
|
||||
// will be drooped when connection error occurs and error will never be propagated
|
||||
// to mapped version of `in_stream`.
|
||||
let tx = tx_og.clone();
|
||||
tokio::spawn(async move {
|
||||
while let Some(result) = in_stream.next().await {
|
||||
match result {
|
||||
Ok(v) => tx
|
||||
.send(Ok(Operation {
|
||||
action: 1,
|
||||
row: 0,
|
||||
column: 0,
|
||||
op_id: 0,
|
||||
text: None,
|
||||
}))
|
||||
.await
|
||||
.expect("working rx"),
|
||||
Err(err) => {
|
||||
// if let Some(io_err) = match_for_io_error(&err) {
|
||||
// if io_err.kind() == ErrorKind::BrokenPipe {
|
||||
// // here you can handle special case when client
|
||||
// // disconnected in unexpected way
|
||||
// eprintln!("\tclient disconnected: broken pipe");
|
||||
// break;
|
||||
// }
|
||||
// }
|
||||
eprintln!("Error receiving operation from client");
|
||||
if let Some(workspace) = self.state.workspaces.borrow().get(&session_id) {
|
||||
let in_stream = req.into_inner();
|
||||
let (tx_og, rx) = mpsc::channel::<Result<Operation, Status>>(128);
|
||||
|
||||
match tx.send(Err(err)).await {
|
||||
Ok(_) => (),
|
||||
Err(_err) => break, // response was droped
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
println!("\tstream ended");
|
||||
});
|
||||
let b: BufferView = workspace.buffers.borrow().get(&path).unwrap().clone();
|
||||
let w = workspace.clone();
|
||||
tokio::spawn(async move { buffer_worker(b, in_stream, tx_og, w.bus.subscribe()).await; });
|
||||
|
||||
// echo just write the same data that was received
|
||||
let out_stream = ReceiverStream::new(rx);
|
||||
// echo just write the same data that was received
|
||||
let out_stream = ReceiverStream::new(rx);
|
||||
|
||||
Ok(Response::new(Box::pin(out_stream) as Self::AttachStream))
|
||||
return Ok(Response::new(Box::pin(out_stream) as Self::AttachStream));
|
||||
} else {
|
||||
return Err(Status::not_found(format!("Norkspace with session_id {}", session_id)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct WorkspaceService {
|
||||
tx: mpsc::Sender<AlterState>,
|
||||
rx: watch::Receiver<HashMap<String, Arc<WorkspaceInstance>>>,
|
||||
state: Arc<StateManager>,
|
||||
}
|
||||
|
||||
#[tonic::async_trait]
|
||||
|
@ -99,10 +132,10 @@ impl Workspace for WorkspaceService {
|
|||
&self,
|
||||
request: Request<SessionRequest>,
|
||||
) -> Result<Response<SessionResponse>, Status> {
|
||||
println!("Got a request: {:?}", request);
|
||||
debug!("create request: {:?}", request);
|
||||
let r = request.into_inner();
|
||||
|
||||
// let w = WorkspaceInstance::new(r.session_key.clone(), r.content.unwrap_or("".to_string()));
|
||||
let w = WorkspaceInstance::new(r.session_key.clone());
|
||||
|
||||
let reply = proto::SessionResponse {
|
||||
session_key: r.session_key.clone(),
|
||||
|
@ -120,18 +153,22 @@ impl Workspace for WorkspaceService {
|
|||
&self,
|
||||
request: Request<SessionRequest>,
|
||||
) -> Result<Response<SessionResponse>, Status> {
|
||||
println!("Got a request: {:?}", request);
|
||||
debug!("sync request: {:?}", request);
|
||||
let r = request.into_inner();
|
||||
|
||||
if let Some(w) = self.rx.borrow().get(&r.session_key) {
|
||||
let reply = proto::SessionResponse {
|
||||
session_key: r.session_key,
|
||||
accepted: true,
|
||||
content: Some(w.content.clone()),
|
||||
hash: None,
|
||||
};
|
||||
if let Some(w) = self.state.workspaces.borrow().get(&r.session_key) {
|
||||
if let Some(buf) = w.buffers.borrow().get(&r.session_key) {
|
||||
let reply = proto::SessionResponse {
|
||||
session_key: r.session_key,
|
||||
accepted: true,
|
||||
content: Some(buf.content.borrow().clone()),
|
||||
hash: None,
|
||||
};
|
||||
|
||||
Ok(Response::new(reply))
|
||||
Ok(Response::new(reply))
|
||||
} else {
|
||||
Err(Status::out_of_range("fuck you".to_string()))
|
||||
}
|
||||
} else {
|
||||
Err(Status::out_of_range("fuck you".to_string()))
|
||||
}
|
||||
|
@ -142,7 +179,7 @@ impl Workspace for WorkspaceService {
|
|||
&self,
|
||||
request: Request<SessionRequest>,
|
||||
) -> Result<Response<SessionResponse>, Status> {
|
||||
println!("Got a request: {:?}", request);
|
||||
debug!("join request: {:?}", request);
|
||||
|
||||
let reply = proto::SessionResponse {
|
||||
session_key: request.into_inner().session_key,
|
||||
|
@ -158,12 +195,12 @@ impl Workspace for WorkspaceService {
|
|||
&self,
|
||||
request: Request<SessionRequest>,
|
||||
) -> Result<Response<SessionResponse>, Status> {
|
||||
println!("Got a request: {:?}", request);
|
||||
debug!("leave request: {:?}", request);
|
||||
let r = request.into_inner();
|
||||
let mut removed = false;
|
||||
|
||||
if self.rx.borrow().get(&r.session_key).is_some() {
|
||||
self.tx
|
||||
if self.state.workspaces.borrow().get(&r.session_key).is_some() {
|
||||
self.state.op_tx
|
||||
.send(AlterState::REMOVE {
|
||||
key: r.session_key.clone(),
|
||||
})
|
||||
|
@ -185,17 +222,39 @@ impl Workspace for WorkspaceService {
|
|||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
tracing_subscriber::fmt::init();
|
||||
|
||||
let addr = "[::1]:50051".parse()?;
|
||||
|
||||
let (tx, rx) = state::run_state_manager();
|
||||
let greeter = WorkspaceService { tx, rx };
|
||||
let processor = BufferService {};
|
||||
let state = Arc::new(StateManager::new());
|
||||
|
||||
let greeter = WorkspaceService { state: state.clone() };
|
||||
let processor = BufferService { state: state.clone() };
|
||||
|
||||
info!("Starting server");
|
||||
|
||||
Server::builder()
|
||||
.add_service(WorkspaceServer::new(greeter))
|
||||
.add_service(BufferServer::new(processor))
|
||||
.serve(addr)
|
||||
.await?;
|
||||
/*
|
||||
|
||||
fn main() {
|
||||
// install global collector configured based on RUST_LOG env var.
|
||||
tracing_subscriber::fmt::init();
|
||||
|
||||
let number_of_yaks = 3;
|
||||
// this creates a new event, outside of any spans.
|
||||
info!(number_of_yaks, "preparing to shave yaks");
|
||||
|
||||
let number_shaved = yak_shave::shave_all(number_of_yaks);
|
||||
info!(
|
||||
all_yaks_shaved = number_shaved == number_of_yaks,
|
||||
"yak shaving completed."
|
||||
);
|
||||
}
|
||||
*/
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue