mirror of
https://github.com/hexedtech/codemp-nvim.git
synced 2024-11-22 23:44:55 +01:00
Merge branch 'cleanup' into dev
drastically simplified project but produced first real PoC
This commit is contained in:
commit
c0285c08ff
29 changed files with 1089 additions and 1400 deletions
57
Cargo.toml
57
Cargo.toml
|
@ -1,37 +1,54 @@
|
||||||
[package]
|
[package]
|
||||||
name = "codemp"
|
name = "codemp"
|
||||||
version = "0.1.0"
|
version = "0.2.0"
|
||||||
edition = "2021"
|
edition = "2021"
|
||||||
|
|
||||||
[features]
|
# [features]
|
||||||
default = ["nvim"]
|
# default = ["nvim"]
|
||||||
nvim = []
|
# nvim = []
|
||||||
|
|
||||||
[lib]
|
[lib]
|
||||||
name = "library"
|
name = "codemp"
|
||||||
path = "src/lib/lib.rs"
|
path = "src/lib/lib.rs"
|
||||||
|
|
||||||
[[bin]] # Bin to run the CodeMP gRPC server
|
[[bin]] # Bin to run the CodeMP gRPC server
|
||||||
name = "server"
|
name = "server"
|
||||||
path = "src/server/main.rs"
|
path = "src/server/main.rs"
|
||||||
|
required-features = ["server"]
|
||||||
|
|
||||||
[[bin]] # Bin to run the CodeMP gRPC client
|
[[bin]]
|
||||||
name = "client-neovim"
|
name = "client-nvim"
|
||||||
path = "src/client/main.rs"
|
path = "src/client/nvim/main.rs"
|
||||||
|
required-features = ["nvim"]
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
|
# core
|
||||||
tracing = "0.1"
|
tracing = "0.1"
|
||||||
tracing-subscriber = "0.3"
|
tonic = "0.9"
|
||||||
tonic = "0.7"
|
prost = "0.11.8"
|
||||||
prost = "0.10"
|
md5 = "0.7.0"
|
||||||
futures = "0.3"
|
uuid = { version = "1.3.1", features = ["v4"] }
|
||||||
tokio = { version = "1.0", features = ["macros", "rt-multi-thread", "sync", "full"] }
|
operational-transform = { version = "0.6", features = ["serde"] }
|
||||||
tokio-stream = "0.1"
|
# can these be optional?
|
||||||
rmpv = "1"
|
tokio = { version = "1.0", features = ["macros", "rt-multi-thread", "sync", "full"], optional = false }
|
||||||
operational-transform = "0.6"
|
tokio-stream = { version = "0.1", optional = false }
|
||||||
nvim-rs = { version = "0.4", features = ["use_tokio"] } # TODO put this behind a conditional feature
|
serde = { version = "1", optional = false }
|
||||||
uuid = { version = "1", features = ["v4", "fast-rng", "macro-diagnostics"] }
|
serde_json = { version = "1", optional = false }
|
||||||
rand = "0.8.5"
|
# runtime
|
||||||
|
# logs
|
||||||
|
tracing-subscriber = { version = "0.3", optional = true }
|
||||||
|
# nvim
|
||||||
|
rmpv = { version = "1", optional = true }
|
||||||
|
clap = { version = "4.2.1", features = ["derive"], optional = true }
|
||||||
|
nvim-rs = { version = "0.5", features = ["use_tokio"], optional = true }
|
||||||
|
|
||||||
[build-dependencies]
|
[build-dependencies]
|
||||||
tonic-build = "0.7"
|
tonic-build = "0.9"
|
||||||
|
|
||||||
|
[features]
|
||||||
|
default = []
|
||||||
|
logs = ["dep:tracing-subscriber"]
|
||||||
|
# runtime = ["dep:tokio", "dep:tokio-stream"]
|
||||||
|
# serde = ["dep:serde", "dep:serde_json"]
|
||||||
|
server = ["logs", "dep:clap"]
|
||||||
|
nvim = ["logs", "dep:nvim-rs", "dep:clap", "dep:rmpv"]
|
||||||
|
|
2
build.rs
2
build.rs
|
@ -1,6 +1,4 @@
|
||||||
fn main() -> Result<(), Box<dyn std::error::Error>> {
|
fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||||
tonic_build::compile_protos("proto/session.proto")?;
|
|
||||||
tonic_build::compile_protos("proto/workspace.proto")?;
|
|
||||||
tonic_build::compile_protos("proto/buffer.proto")?;
|
tonic_build::compile_protos("proto/buffer.proto")?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,11 +1,3 @@
|
||||||
" Copyright 2017 Justin Charette
|
|
||||||
"
|
|
||||||
" Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
|
|
||||||
" http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
|
|
||||||
" <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
|
|
||||||
" option. This file may not be copied, modified, or distributed
|
|
||||||
" except according to those terms.
|
|
||||||
|
|
||||||
if ! exists('s:jobid')
|
if ! exists('s:jobid')
|
||||||
let s:jobid = 0
|
let s:jobid = 0
|
||||||
endif
|
endif
|
||||||
|
@ -95,6 +87,6 @@ function codemp#cursor()
|
||||||
endfunction
|
endfunction
|
||||||
|
|
||||||
function s:OnStderr(id, data, event) dict
|
function s:OnStderr(id, data, event) dict
|
||||||
let g:msg = 'codemp: stderr: ' . join(a:data, "\n")
|
let g:msg = 'codemp: ' . join(a:data, "\n")
|
||||||
echo g:msg
|
echo g:msg
|
||||||
endfunction
|
endfunction
|
||||||
|
|
|
@ -2,35 +2,40 @@ syntax = "proto3";
|
||||||
package buffer;
|
package buffer;
|
||||||
|
|
||||||
service Buffer {
|
service Buffer {
|
||||||
rpc Attach (stream Operation) returns (stream Operation);
|
rpc Attach (BufferPayload) returns (stream RawOp);
|
||||||
rpc Push (BufferPayload) returns (BufferResponse);
|
rpc Edit (OperationRequest) returns (BufferResponse);
|
||||||
rpc Pull (BufferPayload) returns (BufferPayload);
|
rpc Create (BufferPayload) returns (BufferResponse);
|
||||||
|
rpc Sync (BufferPayload) returns (BufferResponse);
|
||||||
|
rpc Cursor (CursorMov) returns (BufferResponse);
|
||||||
|
rpc Listen (BufferPayload) returns (stream CursorMov);
|
||||||
}
|
}
|
||||||
|
|
||||||
message Operation {
|
message RawOp {
|
||||||
int64 opId = 1;
|
string opseq = 1;
|
||||||
|
string user = 2;
|
||||||
|
}
|
||||||
|
|
||||||
enum Action {
|
message CursorMov {
|
||||||
RETAIN = 0;
|
string user = 1;
|
||||||
INSERT = 1;
|
string path = 2;
|
||||||
DELETE = 2;
|
int64 row = 3;
|
||||||
};
|
int64 col = 4;
|
||||||
Action action = 2;
|
}
|
||||||
|
|
||||||
int32 row = 3;
|
message OperationRequest {
|
||||||
int32 column = 4;
|
string path = 1;
|
||||||
|
string hash = 2;
|
||||||
optional string text = 5;
|
string opseq = 3;
|
||||||
|
string user = 4;
|
||||||
}
|
}
|
||||||
|
|
||||||
message BufferPayload {
|
message BufferPayload {
|
||||||
string sessionKey = 1;
|
string path = 1;
|
||||||
string path = 2;
|
string user = 2;
|
||||||
optional string content = 3;
|
optional string content = 3;
|
||||||
}
|
}
|
||||||
|
|
||||||
message BufferResponse {
|
message BufferResponse {
|
||||||
string sessionKey = 1;
|
bool accepted = 1;
|
||||||
string path = 2;
|
optional string content = 2;
|
||||||
bool accepted = 3;
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,25 +0,0 @@
|
||||||
syntax = "proto3";
|
|
||||||
package session;
|
|
||||||
|
|
||||||
service Session {
|
|
||||||
// rpc Authenticate(SessionRequest) returns (SessionResponse);
|
|
||||||
// rpc ListWorkspaces(SessionRequest) returns (WorkspaceList);
|
|
||||||
rpc CreateWorkspace(WorkspaceBuilderRequest) returns (SessionResponse);
|
|
||||||
}
|
|
||||||
|
|
||||||
message SessionRequest {
|
|
||||||
string sessionKey = 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
message SessionResponse {
|
|
||||||
string sessionKey = 1;
|
|
||||||
bool accepted = 2;
|
|
||||||
}
|
|
||||||
|
|
||||||
message WorkspaceBuilderRequest {
|
|
||||||
string name = 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
message WorkspaceList {
|
|
||||||
repeated string name = 1; // TODO add more fields
|
|
||||||
}
|
|
|
@ -1,51 +0,0 @@
|
||||||
syntax = "proto3";
|
|
||||||
package workspace;
|
|
||||||
|
|
||||||
service Workspace {
|
|
||||||
rpc Join (JoinRequest) returns (stream WorkspaceEvent);
|
|
||||||
rpc Subscribe (stream CursorUpdate) returns (stream CursorUpdate);
|
|
||||||
rpc ListUsers (WorkspaceRequest) returns (UsersList);
|
|
||||||
rpc Buffers (WorkspaceRequest) returns (BufferList);
|
|
||||||
rpc NewBuffer (BufferRequest) returns (WorkspaceResponse);
|
|
||||||
rpc RemoveBuffer (BufferRequest) returns (WorkspaceResponse);
|
|
||||||
}
|
|
||||||
|
|
||||||
message JoinRequest {
|
|
||||||
string name = 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
message WorkspaceEvent {
|
|
||||||
int32 id = 1;
|
|
||||||
optional string body = 2;
|
|
||||||
}
|
|
||||||
|
|
||||||
// nvim-rs passes everything as i64, so having them as i64 in the packet itself is convenient
|
|
||||||
// TODO can we make them i32 and save some space?
|
|
||||||
message CursorUpdate {
|
|
||||||
string username = 1;
|
|
||||||
int64 buffer = 2;
|
|
||||||
int64 col = 3;
|
|
||||||
int64 row = 4;
|
|
||||||
}
|
|
||||||
|
|
||||||
message WorkspaceRequest {
|
|
||||||
string sessionKey = 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
message BufferRequest {
|
|
||||||
string sessionKey = 1;
|
|
||||||
string path = 2;
|
|
||||||
}
|
|
||||||
|
|
||||||
message WorkspaceResponse {
|
|
||||||
bool accepted = 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
message BufferList {
|
|
||||||
repeated string path = 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
message UsersList {
|
|
||||||
repeated string name = 1;
|
|
||||||
}
|
|
||||||
|
|
|
@ -1,96 +0,0 @@
|
||||||
pub mod proto {
|
|
||||||
tonic::include_proto!("session");
|
|
||||||
tonic::include_proto!("workspace");
|
|
||||||
tonic::include_proto!("buffer");
|
|
||||||
}
|
|
||||||
use std::sync::Arc;
|
|
||||||
use tracing::error;
|
|
||||||
|
|
||||||
use tokio::sync::{mpsc, Mutex};
|
|
||||||
use tokio_stream::StreamExt;
|
|
||||||
use tokio_stream::wrappers::ReceiverStream;
|
|
||||||
|
|
||||||
use proto::{
|
|
||||||
workspace_client::WorkspaceClient,
|
|
||||||
session_client::SessionClient,
|
|
||||||
buffer_client::BufferClient,
|
|
||||||
WorkspaceBuilderRequest, JoinRequest, SessionResponse, CursorUpdate
|
|
||||||
};
|
|
||||||
use tonic::{transport::Channel, Status, Request, Response};
|
|
||||||
|
|
||||||
#[derive(Clone)]
|
|
||||||
pub struct Dispatcher {
|
|
||||||
name: String,
|
|
||||||
dp: Arc<Mutex<DispatcherWorker>>, // TODO use channels and don't lock
|
|
||||||
}
|
|
||||||
|
|
||||||
struct DispatcherWorker {
|
|
||||||
// TODO do I need all three? Did I design the server badly?
|
|
||||||
session: SessionClient<Channel>,
|
|
||||||
workspace: WorkspaceClient<Channel>,
|
|
||||||
_buffers: BufferClient<Channel>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Dispatcher {
|
|
||||||
pub async fn connect(addr:String) -> Result<Dispatcher, tonic::transport::Error> {
|
|
||||||
let (s, w, b) = tokio::join!(
|
|
||||||
SessionClient::connect(addr.clone()),
|
|
||||||
WorkspaceClient::connect(addr.clone()),
|
|
||||||
BufferClient::connect(addr.clone()),
|
|
||||||
);
|
|
||||||
Ok(
|
|
||||||
Dispatcher {
|
|
||||||
name: format!("User#{}", rand::random::<u16>()),
|
|
||||||
dp: Arc::new(
|
|
||||||
Mutex::new(
|
|
||||||
DispatcherWorker { session: s?, workspace: w?, _buffers: b? }
|
|
||||||
)
|
|
||||||
)
|
|
||||||
}
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn create_workspace(&self, name:String) -> Result<Response<SessionResponse>, Status> {
|
|
||||||
self.dp.lock().await.session.create_workspace(
|
|
||||||
Request::new(WorkspaceBuilderRequest { name })
|
|
||||||
).await
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn join_workspace(&self, session_id:String) -> Result<(), Status> {
|
|
||||||
let mut req = Request::new(JoinRequest { name: self.name.clone() });
|
|
||||||
req.metadata_mut().append("workspace", session_id.parse().unwrap());
|
|
||||||
let mut stream = self.dp.lock().await.workspace.join(req).await?.into_inner();
|
|
||||||
|
|
||||||
let _worker = tokio::spawn(async move {
|
|
||||||
while let Some(pkt) = stream.next().await {
|
|
||||||
match pkt {
|
|
||||||
Ok(_event) => {
|
|
||||||
// TODO do something with events when they will mean something!
|
|
||||||
},
|
|
||||||
Err(e) => error!("Error receiving event | {}", e),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn start_cursor_worker(&self, session_id:String, feed:mpsc::Receiver<CursorUpdate>) -> Result<mpsc::Receiver<CursorUpdate>, Status> {
|
|
||||||
let mut in_stream = Request::new(ReceiverStream::new(feed));
|
|
||||||
in_stream.metadata_mut().append("workspace", session_id.parse().unwrap());
|
|
||||||
|
|
||||||
let mut stream = self.dp.lock().await.workspace.subscribe(in_stream).await?.into_inner();
|
|
||||||
let (tx, rx) = mpsc::channel(50);
|
|
||||||
|
|
||||||
let _worker = tokio::spawn(async move {
|
|
||||||
while let Some(pkt) = stream.next().await {
|
|
||||||
match pkt {
|
|
||||||
Ok(update) => tx.send(update).await.unwrap(), // TODO how to handle an error here?
|
|
||||||
Err(e) => error!("Error receiving cursor update | {}", e),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
Ok(rx)
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,16 +0,0 @@
|
||||||
|
|
||||||
mod nvim;
|
|
||||||
pub mod dispatcher;
|
|
||||||
|
|
||||||
use dispatcher::Dispatcher;
|
|
||||||
|
|
||||||
#[tokio::main]
|
|
||||||
async fn main() -> Result<(), Box<(dyn std::error::Error + 'static)>> {
|
|
||||||
|
|
||||||
let dispatcher = Dispatcher::connect("http://[::1]:50051".into()).await.unwrap();
|
|
||||||
|
|
||||||
#[cfg(feature = "nvim")]
|
|
||||||
crate::nvim::run_nvim_client(dispatcher).await?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
130
src/client/nvim/codemp.lua
Normal file
130
src/client/nvim/codemp.lua
Normal file
|
@ -0,0 +1,130 @@
|
||||||
|
local BINARY = vim.g.codemp_binary or "/home/alemi/projects/codemp/target/debug/client-nvim"
|
||||||
|
|
||||||
|
local M = {}
|
||||||
|
M.jobid = nil
|
||||||
|
M.create = function(path, content) return vim.rpcrequest(M.jobid, "create", path, content) end
|
||||||
|
M.insert = function(path, txt, pos) return vim.rpcrequest(M.jobid, "insert", path, txt, pos) end
|
||||||
|
M.cursor = function(path, row, col) return vim.rpcrequest(M.jobid, "cursor", path, row, col) end
|
||||||
|
M.delete = function(path, pos, count) return vim.rpcrequest(M.jobid, "delete", path, pos, count) end
|
||||||
|
M.attach = function(path) return vim.rpcrequest(M.jobid, "attach", path) end
|
||||||
|
M.listen = function(path) return vim.rpcrequest(M.jobid, "listen", path) end
|
||||||
|
M.detach = function(path) return vim.rpcrequest(M.jobid, "detach", path) end
|
||||||
|
|
||||||
|
local function cursor_offset()
|
||||||
|
local cursor = vim.api.nvim_win_get_cursor(0)
|
||||||
|
return vim.fn.line2byte(cursor[1]) + cursor[2] - 1
|
||||||
|
end
|
||||||
|
|
||||||
|
local codemp_autocmds = vim.api.nvim_create_augroup("CodempAuGroup", { clear = true })
|
||||||
|
|
||||||
|
local function hook_callbacks(path, buffer)
|
||||||
|
vim.api.nvim_create_autocmd(
|
||||||
|
{ "InsertCharPre" },
|
||||||
|
{
|
||||||
|
callback = function(_) M.insert(path, vim.v.char, cursor_offset()) end,
|
||||||
|
buffer = buffer,
|
||||||
|
group = codemp_autocmds,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
vim.api.nvim_create_autocmd(
|
||||||
|
{ "CursorMoved", "CursorMovedI" },
|
||||||
|
{
|
||||||
|
callback = function(_)
|
||||||
|
local cursor = vim.api.nvim_win_get_cursor(0)
|
||||||
|
M.cursor(path, cursor[1], cursor[2])
|
||||||
|
end,
|
||||||
|
buffer = buffer,
|
||||||
|
group = codemp_autocmds,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
vim.keymap.set('i', '<BS>', function() M.delete(path, cursor_offset(), 1) return '<BS>' end, {expr = true, buffer = buffer})
|
||||||
|
vim.keymap.set('i', '<Del>', function() M.delete(path, cursor_offset() + 1, 1) return '<Del>' end, {expr = true, buffer = buffer})
|
||||||
|
vim.keymap.set('i', '<CR>', function() M.insert(path, "\n", cursor_offset()) return '<CR>'end, {expr = true, buffer = buffer})
|
||||||
|
end
|
||||||
|
|
||||||
|
local function unhook_callbacks(buffer)
|
||||||
|
vim.api.nvim_clear_autocmds({ group = codemp_autocmds, buffer = buffer })
|
||||||
|
vim.keymap.del('i', '<BS>', { buffer = buffer })
|
||||||
|
vim.keymap.del('i', '<Del>', { buffer = buffer })
|
||||||
|
vim.keymap.del('i', '<CR>', { buffer = buffer })
|
||||||
|
end
|
||||||
|
|
||||||
|
vim.api.nvim_create_user_command('Connect',
|
||||||
|
function(args)
|
||||||
|
if M.jobid ~= nil then
|
||||||
|
print("already connected, disconnect first")
|
||||||
|
return
|
||||||
|
end
|
||||||
|
local bin_args = { BINARY }
|
||||||
|
if #args.args > 0 then
|
||||||
|
table.insert(bin_args, "--host")
|
||||||
|
table.insert(bin_args, args.args[1])
|
||||||
|
end
|
||||||
|
if args.bang then
|
||||||
|
table.insert(bin_args, "--debug")
|
||||||
|
end
|
||||||
|
M.jobid = vim.fn.jobstart(
|
||||||
|
bin_args,
|
||||||
|
{
|
||||||
|
rpc = true,
|
||||||
|
on_stderr = function(_, data, _)
|
||||||
|
for _, line in pairs(data) do
|
||||||
|
print(line)
|
||||||
|
end
|
||||||
|
-- print(vim.fn.join(data, "\n"))
|
||||||
|
end,
|
||||||
|
stderr_buffered = false,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
if M.jobid <= 0 then
|
||||||
|
print("[!] could not start codemp client")
|
||||||
|
end
|
||||||
|
end,
|
||||||
|
{ nargs='?', bang=true })
|
||||||
|
|
||||||
|
vim.api.nvim_create_user_command('Stop',
|
||||||
|
function(_)
|
||||||
|
vim.fn.jobstop(M.jobid)
|
||||||
|
M.jobid = nil
|
||||||
|
end,
|
||||||
|
{ bang=true })
|
||||||
|
|
||||||
|
vim.api.nvim_create_user_command('Share',
|
||||||
|
function(args)
|
||||||
|
if M.jobid <= 0 then
|
||||||
|
print("[!] connect to codemp server first")
|
||||||
|
return
|
||||||
|
end
|
||||||
|
local path = args.fargs[1]
|
||||||
|
local bufnr = vim.api.nvim_get_current_buf()
|
||||||
|
local lines = vim.api.nvim_buf_get_lines(bufnr, 0, -1, false)
|
||||||
|
M.create(path, vim.fn.join(lines, "\n"))
|
||||||
|
hook_callbacks(path, bufnr)
|
||||||
|
M.attach(path)
|
||||||
|
M.listen(path)
|
||||||
|
end,
|
||||||
|
{ nargs=1 })
|
||||||
|
|
||||||
|
vim.api.nvim_create_user_command('Join',
|
||||||
|
function(args)
|
||||||
|
if M.jobid <= 0 then
|
||||||
|
print("[!] connect to codemp server first")
|
||||||
|
return
|
||||||
|
end
|
||||||
|
local path = args.fargs[1]
|
||||||
|
local bufnr = vim.api.nvim_get_current_buf()
|
||||||
|
hook_callbacks(path, bufnr)
|
||||||
|
M.attach(path)
|
||||||
|
M.listen(path)
|
||||||
|
end,
|
||||||
|
{ nargs=1 })
|
||||||
|
|
||||||
|
vim.api.nvim_create_user_command('Detach',
|
||||||
|
function(args)
|
||||||
|
local bufnr = vim.api.nvim_get_current_buf()
|
||||||
|
unhook_callbacks(bufnr)
|
||||||
|
M.detach(args.fargs[1])
|
||||||
|
end,
|
||||||
|
{ nargs=1 })
|
||||||
|
|
||||||
|
return M
|
258
src/client/nvim/main.rs
Normal file
258
src/client/nvim/main.rs
Normal file
|
@ -0,0 +1,258 @@
|
||||||
|
use std::{net::TcpStream, sync::Mutex};
|
||||||
|
|
||||||
|
use codemp::client::CodempClient;
|
||||||
|
use codemp::proto::buffer_client::BufferClient;
|
||||||
|
use rmpv::Value;
|
||||||
|
|
||||||
|
|
||||||
|
use tokio::io::Stdout;
|
||||||
|
use clap::Parser;
|
||||||
|
|
||||||
|
use nvim_rs::{compat::tokio::Compat, create::tokio as create, Handler, Neovim};
|
||||||
|
use tonic::async_trait;
|
||||||
|
use tracing::{error, warn, debug, info};
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
struct NeovimHandler {
|
||||||
|
client: CodempClient,
|
||||||
|
}
|
||||||
|
|
||||||
|
fn nullable_optional_str(args: &Vec<Value>, index: usize) -> Option<String> {
|
||||||
|
Some(args.get(index)?.as_str()?.to_string())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn default_empty_str(args: &Vec<Value>, index: usize) -> String {
|
||||||
|
nullable_optional_str(args, index).unwrap_or("".into())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn nullable_optional_number(args: &Vec<Value>, index: usize) -> Option<i64> {
|
||||||
|
Some(args.get(index)?.as_i64()?)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn default_zero_number(args: &Vec<Value>, index: usize) -> i64 {
|
||||||
|
nullable_optional_number(args, index).unwrap_or(0)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl Handler for NeovimHandler {
|
||||||
|
type Writer = Compat<Stdout>;
|
||||||
|
|
||||||
|
async fn handle_request(
|
||||||
|
&self,
|
||||||
|
name: String,
|
||||||
|
args: Vec<Value>,
|
||||||
|
nvim: Neovim<Compat<Stdout>>,
|
||||||
|
) -> Result<Value, Value> {
|
||||||
|
debug!("processing '{}' - {:?}", name, args);
|
||||||
|
match name.as_ref() {
|
||||||
|
"ping" => Ok(Value::from("pong")),
|
||||||
|
|
||||||
|
"create" => {
|
||||||
|
if args.len() < 1 {
|
||||||
|
return Err(Value::from("no path given"));
|
||||||
|
}
|
||||||
|
let path = default_empty_str(&args, 0);
|
||||||
|
let content = nullable_optional_str(&args, 1);
|
||||||
|
let mut c = self.client.clone();
|
||||||
|
match c.create(path, content).await {
|
||||||
|
Ok(r) => match r {
|
||||||
|
true => Ok(Value::Nil),
|
||||||
|
false => Err(Value::from("rejected")),
|
||||||
|
},
|
||||||
|
Err(e) => Err(Value::from(format!("could not create buffer: {}", e))),
|
||||||
|
}
|
||||||
|
},
|
||||||
|
|
||||||
|
"insert" => {
|
||||||
|
if args.len() < 3 {
|
||||||
|
return Err(Value::from("not enough arguments"));
|
||||||
|
}
|
||||||
|
let path = default_empty_str(&args, 0);
|
||||||
|
let txt = default_empty_str(&args, 1);
|
||||||
|
let pos = default_zero_number(&args, 2) as u64;
|
||||||
|
let mut c = self.client.clone();
|
||||||
|
info!("correctly parsed arguments: {} - {} - {}", path, txt, pos);
|
||||||
|
match c.insert(path, txt, pos).await {
|
||||||
|
Ok(res) => {
|
||||||
|
info!("RPC 'insert' completed");
|
||||||
|
match res {
|
||||||
|
true => Ok(Value::Nil),
|
||||||
|
false => Err(Value::from("rejected")),
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Err(e) => Err(Value::from(format!("could not send insert: {}", e))),
|
||||||
|
}
|
||||||
|
},
|
||||||
|
|
||||||
|
"delete" => {
|
||||||
|
if args.len() < 3 {
|
||||||
|
return Err(Value::from("not enough arguments"));
|
||||||
|
}
|
||||||
|
let path = default_empty_str(&args, 0);
|
||||||
|
let pos = default_zero_number(&args, 1) as u64;
|
||||||
|
let count = default_zero_number(&args, 2) as u64;
|
||||||
|
|
||||||
|
let mut c = self.client.clone();
|
||||||
|
match c.delete(path, pos, count).await {
|
||||||
|
Ok(res) => match res {
|
||||||
|
true => Ok(Value::Nil),
|
||||||
|
false => Err(Value::from("rejected")),
|
||||||
|
},
|
||||||
|
Err(e) => Err(Value::from(format!("could not send insert: {}", e))),
|
||||||
|
}
|
||||||
|
},
|
||||||
|
|
||||||
|
"attach" => {
|
||||||
|
if args.len() < 1 {
|
||||||
|
return Err(Value::from("no path given"));
|
||||||
|
}
|
||||||
|
let path = default_empty_str(&args, 0);
|
||||||
|
let buffer = match nvim.get_current_buf().await {
|
||||||
|
Ok(b) => b,
|
||||||
|
Err(e) => return Err(Value::from(format!("could not get current buffer: {}", e))),
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut c = self.client.clone();
|
||||||
|
|
||||||
|
let buf = buffer.clone();
|
||||||
|
match c.attach(path, move |x| {
|
||||||
|
let lines : Vec<String> = x.split("\n").map(|x| x.to_string()).collect();
|
||||||
|
let b = buf.clone();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
if let Err(e) = b.set_lines(0, -1, false, lines).await {
|
||||||
|
error!("could not update buffer: {}", e);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}).await {
|
||||||
|
Err(e) => Err(Value::from(format!("could not attach to stream: {}", e))),
|
||||||
|
Ok(content) => {
|
||||||
|
let lines : Vec<String> = content.split("\n").map(|x| x.to_string()).collect();
|
||||||
|
if let Err(e) = buffer.set_lines(0, -1, false, lines).await {
|
||||||
|
error!("could not update buffer: {}", e);
|
||||||
|
}
|
||||||
|
Ok(Value::Nil)
|
||||||
|
},
|
||||||
|
}
|
||||||
|
},
|
||||||
|
|
||||||
|
"detach" => {
|
||||||
|
if args.len() < 1 {
|
||||||
|
return Err(Value::from("no path given"));
|
||||||
|
}
|
||||||
|
let path = default_empty_str(&args, 0);
|
||||||
|
let mut c = self.client.clone();
|
||||||
|
c.detach(path);
|
||||||
|
Ok(Value::Nil)
|
||||||
|
},
|
||||||
|
|
||||||
|
"listen" => {
|
||||||
|
if args.len() < 1 {
|
||||||
|
return Err(Value::from("no path given"));
|
||||||
|
}
|
||||||
|
let path = default_empty_str(&args, 0);
|
||||||
|
let mut c = self.client.clone();
|
||||||
|
|
||||||
|
let ns = nvim.create_namespace("Cursor").await
|
||||||
|
.map_err(|e| Value::from(format!("could not create namespace: {}", e)))?;
|
||||||
|
|
||||||
|
let buf = nvim.get_current_buf().await
|
||||||
|
.map_err(|e| Value::from(format!("could not get current buf: {}", e)))?;
|
||||||
|
|
||||||
|
match c.listen(path, move |cur| {
|
||||||
|
let _b = buf.clone();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
if let Err(e) = _b.clear_namespace(ns, 0, -1).await {
|
||||||
|
error!("could not clear previous cursor highlight: {}", e);
|
||||||
|
}
|
||||||
|
if let Err(e) = _b.add_highlight(ns, "ErrorMsg", cur.row-1, cur.col, cur.col+1).await {
|
||||||
|
error!("could not create highlight for cursor: {}", e);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}).await {
|
||||||
|
Ok(()) => Ok(Value::Nil),
|
||||||
|
Err(e) => Err(Value::from(format!("could not listen cursors: {}", e))),
|
||||||
|
}
|
||||||
|
},
|
||||||
|
|
||||||
|
"cursor" => {
|
||||||
|
if args.len() < 3 {
|
||||||
|
return Err(Value::from("not enough args"));
|
||||||
|
}
|
||||||
|
let path = default_empty_str(&args, 0);
|
||||||
|
let row = default_zero_number(&args, 1);
|
||||||
|
let col = default_zero_number(&args, 2);
|
||||||
|
|
||||||
|
let mut c = self.client.clone();
|
||||||
|
match c.cursor(path, row, col).await {
|
||||||
|
Ok(()) => Ok(Value::Nil),
|
||||||
|
Err(e) => Err(Value::from(format!("could not send cursor update: {}", e))),
|
||||||
|
}
|
||||||
|
},
|
||||||
|
|
||||||
|
_ => Err(Value::from("unimplemented")),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn handle_notify(
|
||||||
|
&self,
|
||||||
|
_name: String,
|
||||||
|
_args: Vec<Value>,
|
||||||
|
_nvim: Neovim<Compat<Stdout>>,
|
||||||
|
) {
|
||||||
|
warn!("notify not handled");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Parser, Debug)]
|
||||||
|
struct CliArgs {
|
||||||
|
/// server host to connect to
|
||||||
|
#[arg(long, default_value = "http://[::1]:50051")]
|
||||||
|
host: String,
|
||||||
|
|
||||||
|
/// show debug level logs
|
||||||
|
#[arg(long, default_value_t = false)]
|
||||||
|
debug: bool,
|
||||||
|
|
||||||
|
/// dump raw tracing logs into this TCP host
|
||||||
|
#[arg(long)]
|
||||||
|
remote_debug: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
#[tokio::main]
|
||||||
|
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||||
|
let args = CliArgs::parse();
|
||||||
|
|
||||||
|
match args.remote_debug {
|
||||||
|
Some(host) =>
|
||||||
|
tracing_subscriber::fmt()
|
||||||
|
.with_writer(Mutex::new(TcpStream::connect(host)?))
|
||||||
|
.with_max_level(if args.debug { tracing::Level::DEBUG } else { tracing::Level::INFO })
|
||||||
|
.init(),
|
||||||
|
|
||||||
|
None =>
|
||||||
|
tracing_subscriber::fmt()
|
||||||
|
.compact()
|
||||||
|
.without_time()
|
||||||
|
.with_ansi(false)
|
||||||
|
.with_writer(std::io::stderr)
|
||||||
|
.with_max_level(if args.debug { tracing::Level::DEBUG } else { tracing::Level::INFO })
|
||||||
|
.init(),
|
||||||
|
}
|
||||||
|
|
||||||
|
let client = BufferClient::connect(args.host).await?;
|
||||||
|
|
||||||
|
let handler: NeovimHandler = NeovimHandler {
|
||||||
|
client: client.into(),
|
||||||
|
};
|
||||||
|
|
||||||
|
let (_nvim, io_handler) = create::new_parent(handler).await;
|
||||||
|
|
||||||
|
info!("++ codemp started");
|
||||||
|
|
||||||
|
if let Err(e) = io_handler.await? {
|
||||||
|
error!("worker stopped with error: {}", e);
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
|
@ -1,133 +0,0 @@
|
||||||
use std::sync::Arc;
|
|
||||||
|
|
||||||
use rmpv::Value;
|
|
||||||
|
|
||||||
use tokio::io::Stdout;
|
|
||||||
|
|
||||||
use nvim_rs::{compat::tokio::Compat, Handler, Neovim};
|
|
||||||
use nvim_rs::create::tokio::new_parent;
|
|
||||||
use tokio::sync::{mpsc, Mutex};
|
|
||||||
|
|
||||||
use crate::dispatcher::{Dispatcher, proto::CursorUpdate};
|
|
||||||
|
|
||||||
#[derive(Clone)]
|
|
||||||
pub struct NeovimHandler {
|
|
||||||
dispatcher: Dispatcher,
|
|
||||||
sink: Arc<Mutex<Option<mpsc::Sender<CursorUpdate>>>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl NeovimHandler {
|
|
||||||
pub fn new(dispatcher: Dispatcher) -> Self {
|
|
||||||
NeovimHandler {
|
|
||||||
dispatcher,
|
|
||||||
sink: Arc::new(Mutex::new(None)),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tonic::async_trait]
|
|
||||||
impl Handler for NeovimHandler {
|
|
||||||
type Writer = Compat<Stdout>;
|
|
||||||
|
|
||||||
async fn handle_request(
|
|
||||||
&self,
|
|
||||||
name: String,
|
|
||||||
args: Vec<Value>,
|
|
||||||
neovim: Neovim<Compat<Stdout>>,
|
|
||||||
) -> Result<Value, Value> {
|
|
||||||
match name.as_ref() {
|
|
||||||
"ping" => Ok(Value::from("pong")),
|
|
||||||
"create" => {
|
|
||||||
if args.len() < 1 {
|
|
||||||
return Err(Value::from("[!] no session key"));
|
|
||||||
}
|
|
||||||
let res = self.dispatcher.create_workspace(args[0].to_string())
|
|
||||||
.await
|
|
||||||
.map_err(|e| Value::from(e.to_string()))?
|
|
||||||
.into_inner();
|
|
||||||
|
|
||||||
Ok(res.session_key.into())
|
|
||||||
},
|
|
||||||
"join" => {
|
|
||||||
if args.len() < 1 {
|
|
||||||
return Err(Value::from("[!] no session key"));
|
|
||||||
}
|
|
||||||
|
|
||||||
self.dispatcher.join_workspace(
|
|
||||||
args[0].as_str().unwrap().to_string(), // TODO throw err if it's not a string?
|
|
||||||
).await.map_err(|e| Value::from(e.to_string()))?;
|
|
||||||
|
|
||||||
Ok("OK".into())
|
|
||||||
},
|
|
||||||
"cursor-start" => {
|
|
||||||
if args.len() < 1 {
|
|
||||||
return Err(Value::from("[!] no session key"));
|
|
||||||
}
|
|
||||||
let (tx, stream) = mpsc::channel(50);
|
|
||||||
let mut rx = self.dispatcher.start_cursor_worker(
|
|
||||||
args[0].as_str().unwrap().to_string(), stream
|
|
||||||
).await.map_err(|e| Value::from(e.to_string()))?;
|
|
||||||
let sink = self.sink.clone();
|
|
||||||
sink.lock().await.replace(tx);
|
|
||||||
let _worker = tokio::spawn(async move {
|
|
||||||
let mut col : i64;
|
|
||||||
let mut row : i64;
|
|
||||||
let ns = neovim.create_namespace("Cursor").await.unwrap();
|
|
||||||
while let Some(update) = rx.recv().await {
|
|
||||||
neovim.exec_lua(format!("print('{:?}')", update).as_str(), vec![]).await.unwrap();
|
|
||||||
let buf = neovim.get_current_buf().await.unwrap();
|
|
||||||
buf.clear_namespace(ns, 0, -1).await.unwrap();
|
|
||||||
row = update.row as i64;
|
|
||||||
col = update.col as i64;
|
|
||||||
buf.add_highlight(ns, "ErrorMsg", row-1, col-1, col).await.unwrap();
|
|
||||||
}
|
|
||||||
sink.lock().await.take();
|
|
||||||
});
|
|
||||||
Ok("OK".into())
|
|
||||||
},
|
|
||||||
_ => {
|
|
||||||
eprintln!("[!] unexpected call");
|
|
||||||
Ok(Value::from(""))
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn handle_notify(
|
|
||||||
&self,
|
|
||||||
name: String,
|
|
||||||
args: Vec<Value>,
|
|
||||||
_neovim: Neovim<Compat<Stdout>>,
|
|
||||||
) {
|
|
||||||
match name.as_ref() {
|
|
||||||
"insert" => {},
|
|
||||||
"cursor" => {
|
|
||||||
if args.len() >= 3 {
|
|
||||||
if let Some(sink) = self.sink.lock().await.as_ref() {
|
|
||||||
sink.send(CursorUpdate {
|
|
||||||
buffer: args[0].as_i64().unwrap(),
|
|
||||||
row: args[1].as_i64().unwrap(),
|
|
||||||
col: args[2].as_i64().unwrap(),
|
|
||||||
username: "root".into()
|
|
||||||
}).await.unwrap();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
},
|
|
||||||
"tick" => eprintln!("tock"),
|
|
||||||
_ => eprintln!("[!] unexpected notify",)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn run_nvim_client(dispatcher: Dispatcher) -> Result<(), Box<dyn std::error::Error + 'static>> {
|
|
||||||
let handler: NeovimHandler = NeovimHandler::new(dispatcher);
|
|
||||||
let (_nvim, io_handler) = new_parent(handler).await;
|
|
||||||
|
|
||||||
// Any error should probably be logged, as stderr is not visible to users.
|
|
||||||
match io_handler.await {
|
|
||||||
Err(err) => eprintln!("Error joining IO loop: {:?}", err),
|
|
||||||
Ok(Err(err)) => eprintln!("Process ended with error: {:?}", err),
|
|
||||||
Ok(Ok(())) => eprintln!("Finished"),
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
167
src/lib/client.rs
Normal file
167
src/lib/client.rs
Normal file
|
@ -0,0 +1,167 @@
|
||||||
|
/// TODO better name for this file
|
||||||
|
|
||||||
|
use std::{sync::{Arc, RwLock}, collections::BTreeMap};
|
||||||
|
use tracing::{error, warn, info};
|
||||||
|
use uuid::Uuid;
|
||||||
|
|
||||||
|
use crate::{
|
||||||
|
opfactory::AsyncFactory,
|
||||||
|
proto::{buffer_client::BufferClient, BufferPayload, OperationRequest, RawOp, CursorMov},
|
||||||
|
tonic::{transport::Channel, Status, Streaming},
|
||||||
|
};
|
||||||
|
|
||||||
|
pub type FactoryStore = Arc<RwLock<BTreeMap<String, Arc<AsyncFactory>>>>;
|
||||||
|
|
||||||
|
impl From::<BufferClient<Channel>> for CodempClient {
|
||||||
|
fn from(x: BufferClient<Channel>) -> CodempClient {
|
||||||
|
CodempClient {
|
||||||
|
id: Uuid::new_v4(),
|
||||||
|
client:x,
|
||||||
|
factories: Arc::new(RwLock::new(BTreeMap::new())),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct CodempClient {
|
||||||
|
id: Uuid,
|
||||||
|
client: BufferClient<Channel>,
|
||||||
|
factories: FactoryStore,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl CodempClient {
|
||||||
|
fn get_factory(&self, path: &String) -> Result<Arc<AsyncFactory>, Status> {
|
||||||
|
match self.factories.read().unwrap().get(path) {
|
||||||
|
Some(f) => Ok(f.clone()),
|
||||||
|
None => Err(Status::not_found("no active buffer for given path")),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn add_factory(&self, path: String, factory:Arc<AsyncFactory>) {
|
||||||
|
self.factories.write().unwrap().insert(path, factory);
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn create(&mut self, path: String, content: Option<String>) -> Result<bool, Status> {
|
||||||
|
let req = BufferPayload {
|
||||||
|
path: path.clone(),
|
||||||
|
content: content.clone(),
|
||||||
|
user: self.id.to_string(),
|
||||||
|
};
|
||||||
|
|
||||||
|
let res = self.client.create(req).await?.into_inner();
|
||||||
|
|
||||||
|
Ok(res.accepted)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn insert(&mut self, path: String, txt: String, pos: u64) -> Result<bool, Status> {
|
||||||
|
let factory = self.get_factory(&path)?;
|
||||||
|
match factory.insert(txt, pos).await {
|
||||||
|
Err(e) => Err(Status::internal(format!("invalid operation: {}", e))),
|
||||||
|
Ok(op) => {
|
||||||
|
let req = OperationRequest {
|
||||||
|
path,
|
||||||
|
hash: "".into(),
|
||||||
|
user: self.id.to_string(),
|
||||||
|
opseq: serde_json::to_string(&op)
|
||||||
|
.map_err(|_| Status::invalid_argument("could not serialize opseq"))?,
|
||||||
|
};
|
||||||
|
let res = self.client.edit(req).await?.into_inner();
|
||||||
|
Ok(res.accepted)
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn delete(&mut self, path: String, pos: u64, count: u64) -> Result<bool, Status> {
|
||||||
|
let factory = self.get_factory(&path)?;
|
||||||
|
match factory.delete(pos, count).await {
|
||||||
|
Err(e) => Err(Status::internal(format!("invalid operation: {}", e))),
|
||||||
|
Ok(op) => {
|
||||||
|
let req = OperationRequest {
|
||||||
|
path,
|
||||||
|
hash: "".into(),
|
||||||
|
user: self.id.to_string(),
|
||||||
|
opseq: serde_json::to_string(&op)
|
||||||
|
.map_err(|_| Status::invalid_argument("could not serialize opseq"))?,
|
||||||
|
};
|
||||||
|
let res = self.client.edit(req).await?.into_inner();
|
||||||
|
Ok(res.accepted)
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn cursor(&mut self, path: String, row: i64, col: i64) -> Result<(), Status> {
|
||||||
|
let req = CursorMov {
|
||||||
|
path, user: self.id.to_string(),
|
||||||
|
row, col,
|
||||||
|
};
|
||||||
|
let _res = self.client.cursor(req).await?.into_inner();
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn listen<F>(&mut self, path: String, callback: F) -> Result<(), Status>
|
||||||
|
where F : Fn(CursorMov) -> () + Send + 'static {
|
||||||
|
let req = BufferPayload {
|
||||||
|
path,
|
||||||
|
content: None,
|
||||||
|
user: self.id.to_string(),
|
||||||
|
};
|
||||||
|
let mut stream = self.client.listen(req).await?.into_inner();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
// TODO catch some errors
|
||||||
|
while let Ok(Some(x)) = stream.message().await {
|
||||||
|
callback(x)
|
||||||
|
}
|
||||||
|
});
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn attach<F>(&mut self, path: String, callback: F) -> Result<String, Status>
|
||||||
|
where F : Fn(String) -> () + Send + 'static {
|
||||||
|
let content = self.sync(path.clone()).await?;
|
||||||
|
let factory = Arc::new(AsyncFactory::new(Some(content.clone())));
|
||||||
|
self.add_factory(path.clone(), factory.clone());
|
||||||
|
let req = BufferPayload {
|
||||||
|
path,
|
||||||
|
content: None,
|
||||||
|
user: self.id.to_string(),
|
||||||
|
};
|
||||||
|
let stream = self.client.attach(req).await?.into_inner();
|
||||||
|
tokio::spawn(async move { Self::worker(stream, factory, callback).await } );
|
||||||
|
Ok(content)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn detach(&mut self, path: String) {
|
||||||
|
self.factories.write().unwrap().remove(&path);
|
||||||
|
info!("|| detached from buffer");
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn sync(&mut self, path: String) -> Result<String, Status> {
|
||||||
|
let res = self.client.sync(
|
||||||
|
BufferPayload {
|
||||||
|
path, content: None, user: self.id.to_string(),
|
||||||
|
}
|
||||||
|
).await?;
|
||||||
|
Ok(res.into_inner().content.unwrap_or("".into()))
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn worker<F>(mut stream: Streaming<RawOp>, factory: Arc<AsyncFactory>, callback: F)
|
||||||
|
where F : Fn(String) -> () {
|
||||||
|
info!("|> buffer worker started");
|
||||||
|
loop {
|
||||||
|
match stream.message().await {
|
||||||
|
Err(e) => break error!("error receiving change: {}", e),
|
||||||
|
Ok(v) => match v {
|
||||||
|
None => break warn!("stream closed"),
|
||||||
|
Some(operation) => match serde_json::from_str(&operation.opseq) {
|
||||||
|
Err(e) => break error!("could not deserialize opseq: {}", e),
|
||||||
|
Ok(op) => match factory.process(op).await {
|
||||||
|
Err(e) => break error!("desynched: {}", e),
|
||||||
|
Ok(x) => callback(x),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
info!("[] buffer worker stopped");
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,100 +0,0 @@
|
||||||
use std::fmt::Display;
|
|
||||||
use crate::user::User;
|
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
|
||||||
pub enum Event {
|
|
||||||
UserJoin { user: User },
|
|
||||||
UserLeave { name: String },
|
|
||||||
BufferNew { path: String },
|
|
||||||
BufferDelete { path: String },
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Display for Event {
|
|
||||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
|
||||||
match self {
|
|
||||||
Self::UserJoin { user } => write!(f, "UserJoin(user:{})", user),
|
|
||||||
Self::UserLeave { name } => write!(f, "UserLeave(user:{})", name),
|
|
||||||
Self::BufferNew { path } => write!(f, "BufferNew(path:{})", path),
|
|
||||||
Self::BufferDelete { path } => write!(f, "BufferDelete(path:{})", path),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// pub type Event = Box<dyn EventInterface>;
|
|
||||||
//
|
|
||||||
// pub trait EventInterface {
|
|
||||||
// fn class(&self) -> EventClass;
|
|
||||||
// fn unwrap(e: Event) -> Option<Self> where Self: Sized;
|
|
||||||
//
|
|
||||||
// fn wrap(self) -> Event {
|
|
||||||
// Box::new(self)
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
//
|
|
||||||
// // User joining workspace
|
|
||||||
//
|
|
||||||
// pub struct UserJoinEvent {
|
|
||||||
// user: User,
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// impl EventInterface for UserJoinEvent {
|
|
||||||
// fn class(&self) -> EventClass { EventClass::UserJoin }
|
|
||||||
// fn unwrap(e: Event) -> Option<Self> where Self: Sized {
|
|
||||||
// if matches!(e.class(), EventClass::UserJoin) {
|
|
||||||
// return Some(*e);
|
|
||||||
// }
|
|
||||||
// None
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
//
|
|
||||||
// // User leaving workspace
|
|
||||||
//
|
|
||||||
// pub struct UserLeaveEvent {
|
|
||||||
// name: String,
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// impl EventInterface for UserLeaveEvent {
|
|
||||||
// fn class(&self) -> EventClass { EventClass::UserLeave }
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
//
|
|
||||||
// // Cursor movement
|
|
||||||
//
|
|
||||||
// pub struct CursorEvent {
|
|
||||||
// user: String,
|
|
||||||
// cursor: UserCursor,
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// impl EventInterface for CursorEvent {
|
|
||||||
// fn class(&self) -> EventClass { EventClass::Cursor }
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// impl CursorEvent {
|
|
||||||
// pub fn new(user:String, cursor: UserCursor) -> Self {
|
|
||||||
// CursorEvent { user, cursor }
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
//
|
|
||||||
// // Buffer added
|
|
||||||
//
|
|
||||||
// pub struct BufferNewEvent {
|
|
||||||
// path: String,
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// impl EventInterface for BufferNewEvent {
|
|
||||||
// fn class(&self) -> EventClass { EventClass::BufferNew }
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
//
|
|
||||||
// // Buffer deleted
|
|
||||||
//
|
|
||||||
// pub struct BufferDeleteEvent {
|
|
||||||
// path: String,
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// impl EventInterface for BufferDeleteEvent {
|
|
||||||
// fn class(&self) -> EventClass { EventClass::BufferDelete }
|
|
||||||
// }
|
|
|
@ -1,2 +1,6 @@
|
||||||
pub mod events;
|
pub mod proto;
|
||||||
pub mod user;
|
pub mod opfactory;
|
||||||
|
pub mod client;
|
||||||
|
|
||||||
|
pub use tonic;
|
||||||
|
pub use tokio;
|
||||||
|
|
181
src/lib/opfactory.rs
Normal file
181
src/lib/opfactory.rs
Normal file
|
@ -0,0 +1,181 @@
|
||||||
|
use operational_transform::{OperationSeq, OTError};
|
||||||
|
use tokio::sync::{mpsc, watch, oneshot};
|
||||||
|
use tracing::error;
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct OperationFactory {
|
||||||
|
content: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl OperationFactory {
|
||||||
|
pub fn new(init: Option<String>) -> Self {
|
||||||
|
OperationFactory { content: init.unwrap_or(String::new()) }
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO remove the need for this
|
||||||
|
pub fn content(&self) -> String {
|
||||||
|
self.content.clone()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn check(&self, txt: &str) -> bool {
|
||||||
|
self.content == txt
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn replace(&mut self, txt: &str) -> OperationSeq {
|
||||||
|
let out = OperationSeq::default();
|
||||||
|
if self.content == txt {
|
||||||
|
return out; // nothing to do
|
||||||
|
}
|
||||||
|
|
||||||
|
todo!()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn insert(&mut self, txt: &str, pos: u64) -> Result<OperationSeq, OTError> {
|
||||||
|
let mut out = OperationSeq::default();
|
||||||
|
let total = self.content.len() as u64;
|
||||||
|
out.retain(pos);
|
||||||
|
out.insert(txt);
|
||||||
|
out.retain(total - pos);
|
||||||
|
self.content = out.apply(&self.content)?;
|
||||||
|
Ok(out)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn delete(&mut self, pos: u64, count: u64) -> Result<OperationSeq, OTError> {
|
||||||
|
let mut out = OperationSeq::default();
|
||||||
|
let len = self.content.len() as u64;
|
||||||
|
out.retain(pos - count);
|
||||||
|
out.delete(count);
|
||||||
|
out.retain(len - pos);
|
||||||
|
self.content = out.apply(&self.content)?;
|
||||||
|
Ok(out)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn cancel(&mut self, pos: u64, count: u64) -> Result<OperationSeq, OTError> {
|
||||||
|
let mut out = OperationSeq::default();
|
||||||
|
let len = self.content.len() as u64;
|
||||||
|
out.retain(pos);
|
||||||
|
out.delete(count);
|
||||||
|
out.retain(len - (pos+count));
|
||||||
|
self.content = out.apply(&self.content)?;
|
||||||
|
Ok(out)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn process(&mut self, op: OperationSeq) -> Result<String, OTError> {
|
||||||
|
self.content = op.apply(&self.content)?;
|
||||||
|
Ok(self.content.clone())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
pub struct AsyncFactory {
|
||||||
|
run: watch::Sender<bool>,
|
||||||
|
ops: mpsc::Sender<OpMsg>,
|
||||||
|
#[allow(unused)] // TODO is this necessary?
|
||||||
|
content: watch::Receiver<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Drop for AsyncFactory {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
self.run.send(false).unwrap_or(());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl AsyncFactory {
|
||||||
|
pub fn new(init: Option<String>) -> Self {
|
||||||
|
let (run_tx, run_rx) = watch::channel(true);
|
||||||
|
let (ops_tx, ops_rx) = mpsc::channel(64); // TODO hardcoded size
|
||||||
|
let (txt_tx, txt_rx) = watch::channel("".into());
|
||||||
|
|
||||||
|
let worker = AsyncFactoryWorker {
|
||||||
|
factory: OperationFactory::new(init),
|
||||||
|
ops: ops_rx,
|
||||||
|
run: run_rx,
|
||||||
|
content: txt_tx,
|
||||||
|
};
|
||||||
|
|
||||||
|
tokio::spawn(async move { worker.work().await });
|
||||||
|
|
||||||
|
AsyncFactory { run: run_tx, ops: ops_tx, content: txt_rx }
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn insert(&self, txt: String, pos: u64) -> Result<OperationSeq, OTError> {
|
||||||
|
let (tx, rx) = oneshot::channel();
|
||||||
|
self.ops.send(OpMsg::Exec(OpWrapper::Insert(txt, pos), tx)).await.map_err(|_| OTError)?;
|
||||||
|
rx.await.map_err(|_| OTError)?
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn delete(&self, pos: u64, count: u64) -> Result<OperationSeq, OTError> {
|
||||||
|
let (tx, rx) = oneshot::channel();
|
||||||
|
self.ops.send(OpMsg::Exec(OpWrapper::Delete(pos, count), tx)).await.map_err(|_| OTError)?;
|
||||||
|
rx.await.map_err(|_| OTError)?
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn cancel(&self, pos: u64, count: u64) -> Result<OperationSeq, OTError> {
|
||||||
|
let (tx, rx) = oneshot::channel();
|
||||||
|
self.ops.send(OpMsg::Exec(OpWrapper::Cancel(pos, count), tx)).await.map_err(|_| OTError)?;
|
||||||
|
rx.await.map_err(|_| OTError)?
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn process(&self, opseq: OperationSeq) -> Result<String, OTError> {
|
||||||
|
let (tx, rx) = oneshot::channel();
|
||||||
|
self.ops.send(OpMsg::Process(opseq, tx)).await.map_err(|_| OTError)?;
|
||||||
|
rx.await.map_err(|_| OTError)?
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
enum OpMsg {
|
||||||
|
Exec(OpWrapper, oneshot::Sender<Result<OperationSeq, OTError>>),
|
||||||
|
Process(OperationSeq, oneshot::Sender<Result<String, OTError>>),
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
enum OpWrapper {
|
||||||
|
Insert(String, u64),
|
||||||
|
Delete(u64, u64),
|
||||||
|
Cancel(u64, u64),
|
||||||
|
}
|
||||||
|
|
||||||
|
struct AsyncFactoryWorker {
|
||||||
|
factory: OperationFactory,
|
||||||
|
ops: mpsc::Receiver<OpMsg>,
|
||||||
|
run: watch::Receiver<bool>,
|
||||||
|
content: watch::Sender<String>
|
||||||
|
}
|
||||||
|
|
||||||
|
impl AsyncFactoryWorker {
|
||||||
|
async fn work(mut self) {
|
||||||
|
while *self.run.borrow() {
|
||||||
|
tokio::select! { // periodically check run so that we stop cleanly
|
||||||
|
|
||||||
|
recv = self.ops.recv() => {
|
||||||
|
match recv {
|
||||||
|
Some(msg) => {
|
||||||
|
match msg {
|
||||||
|
OpMsg::Exec(op, tx) => tx.send(self.exec(op)).unwrap_or(()),
|
||||||
|
OpMsg::Process(opseq, tx) => tx.send(self.factory.process(opseq)).unwrap_or(()),
|
||||||
|
}
|
||||||
|
if let Err(e) = self.content.send(self.factory.content()) {
|
||||||
|
error!("error updating content: {}", e);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
},
|
||||||
|
None => break,
|
||||||
|
}
|
||||||
|
},
|
||||||
|
|
||||||
|
_ = tokio::time::sleep(std::time::Duration::from_secs(1)) => {},
|
||||||
|
|
||||||
|
};
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn exec(&mut self, op: OpWrapper) -> Result<OperationSeq, OTError> {
|
||||||
|
match op {
|
||||||
|
OpWrapper::Insert(txt, pos) => Ok(self.factory.insert(&txt, pos)?),
|
||||||
|
OpWrapper::Delete(pos, count) => Ok(self.factory.delete(pos, count)?),
|
||||||
|
OpWrapper::Cancel(pos, count) => Ok(self.factory.cancel(pos, count)?),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
1
src/lib/proto.rs
Normal file
1
src/lib/proto.rs
Normal file
|
@ -0,0 +1 @@
|
||||||
|
tonic::include_proto!("buffer");
|
|
@ -1,27 +0,0 @@
|
||||||
use std::fmt::Display;
|
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
|
||||||
pub struct UserCursor{
|
|
||||||
pub buffer: i64,
|
|
||||||
pub x: i64,
|
|
||||||
pub y: i64
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Display for UserCursor {
|
|
||||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
|
||||||
write!(f, "Cursor(buffer:{}, x:{}, y:{})", self.buffer, self.x, self.y)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
|
||||||
pub struct User {
|
|
||||||
pub name: String,
|
|
||||||
pub cursor: UserCursor,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Display for User {
|
|
||||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
|
||||||
write!(f, "User(name:{}, cursor:{})", self.name, self.cursor)
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,67 +0,0 @@
|
||||||
use operational_transform::OperationSeq;
|
|
||||||
use tokio::sync::{broadcast, mpsc, watch};
|
|
||||||
use tracing::error;
|
|
||||||
|
|
||||||
use library::events::Event;
|
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
|
||||||
/// A view of a buffer, with references to access value and send operations
|
|
||||||
pub struct BufferView {
|
|
||||||
pub name: String,
|
|
||||||
pub content: watch::Receiver<String>,
|
|
||||||
op_tx: mpsc::Sender<OperationSeq>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl BufferView {
|
|
||||||
pub async fn op(&self, op: OperationSeq) -> Result<(), mpsc::error::SendError<OperationSeq>> {
|
|
||||||
self.op_tx.send(op).await
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub struct Buffer {
|
|
||||||
view: BufferView,
|
|
||||||
run: watch::Sender<bool>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Drop for Buffer {
|
|
||||||
fn drop(&mut self) {
|
|
||||||
self.run.send(false).unwrap_or_else(|e| {
|
|
||||||
error!("Could not stop Buffer worker task: {:?}", e);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Buffer {
|
|
||||||
pub fn new(name: String, _bus: broadcast::Sender<Event>) -> Self {
|
|
||||||
let (op_tx, mut op_rx) = mpsc::channel(32);
|
|
||||||
let (stop_tx, stop_rx) = watch::channel(true);
|
|
||||||
let (content_tx, content_rx) = watch::channel(String::new());
|
|
||||||
|
|
||||||
let b = Buffer {
|
|
||||||
run: stop_tx,
|
|
||||||
view: BufferView {
|
|
||||||
name: name.clone(),
|
|
||||||
op_tx,
|
|
||||||
content: content_rx,
|
|
||||||
},
|
|
||||||
};
|
|
||||||
|
|
||||||
tokio::spawn(async move {
|
|
||||||
let mut content = String::new();
|
|
||||||
while stop_rx.borrow().to_owned() {
|
|
||||||
// TODO handle these errors!!
|
|
||||||
let op = op_rx.recv().await.unwrap();
|
|
||||||
content = op.apply(content.as_str()).unwrap();
|
|
||||||
// bus.send((name.clone(), op)).unwrap(); // TODO fails when there are no receivers subscribed
|
|
||||||
content_tx.send(content.clone()).unwrap();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
return b;
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn view(&self) -> BufferView {
|
|
||||||
return self.view.clone();
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,3 +0,0 @@
|
||||||
pub mod buffer;
|
|
||||||
pub mod workspace;
|
|
||||||
pub mod state;
|
|
|
@ -1,106 +0,0 @@
|
||||||
|
|
||||||
use std::{collections::HashMap, sync::Arc};
|
|
||||||
use tokio::sync::{mpsc, watch};
|
|
||||||
use tracing::error;
|
|
||||||
use uuid::Uuid;
|
|
||||||
|
|
||||||
use crate::actor::workspace::Workspace;
|
|
||||||
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
|
||||||
enum WorkspaceAction {
|
|
||||||
ADD {
|
|
||||||
key: Uuid,
|
|
||||||
w: Box<Workspace>,
|
|
||||||
},
|
|
||||||
REMOVE {
|
|
||||||
key: Uuid
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
|
||||||
pub struct WorkspacesView {
|
|
||||||
watch: watch::Receiver<HashMap<Uuid, Arc<Workspace>>>,
|
|
||||||
op: mpsc::Sender<WorkspaceAction>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl WorkspacesView {
|
|
||||||
pub fn borrow(&self) -> watch::Ref<HashMap<Uuid, Arc<Workspace>>> {
|
|
||||||
self.watch.borrow()
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn add(&mut self, w: Workspace) {
|
|
||||||
self.op.send(WorkspaceAction::ADD { key: w.id, w: Box::new(w) }).await.unwrap();
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn remove(&mut self, key: Uuid) {
|
|
||||||
self.op.send(WorkspaceAction::REMOVE { key }).await.unwrap();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub struct StateManager {
|
|
||||||
pub workspaces: WorkspacesView,
|
|
||||||
pub run: watch::Receiver<bool>,
|
|
||||||
run_tx: watch::Sender<bool>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Drop for StateManager {
|
|
||||||
fn drop(&mut self) {
|
|
||||||
self.run_tx.send(false).unwrap_or_else(|e| {
|
|
||||||
error!("Could not stop StateManager worker: {:?}", e);
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl StateManager {
|
|
||||||
pub fn new() -> Self {
|
|
||||||
let (tx, rx) = mpsc::channel(32); // TODO quantify backpressure
|
|
||||||
let (workspaces_tx, workspaces_rx) = watch::channel(HashMap::new());
|
|
||||||
let (run_tx, run_rx) = watch::channel(true);
|
|
||||||
|
|
||||||
let s = StateManager {
|
|
||||||
workspaces: WorkspacesView { watch: workspaces_rx, op: tx },
|
|
||||||
run_tx, run: run_rx,
|
|
||||||
};
|
|
||||||
|
|
||||||
s.workspaces_worker(rx, workspaces_tx);
|
|
||||||
|
|
||||||
return s;
|
|
||||||
}
|
|
||||||
|
|
||||||
fn workspaces_worker(&self, mut rx: mpsc::Receiver<WorkspaceAction>, tx: watch::Sender<HashMap<Uuid, Arc<Workspace>>>) {
|
|
||||||
let run = self.run.clone();
|
|
||||||
tokio::spawn(async move {
|
|
||||||
let mut store = HashMap::new();
|
|
||||||
|
|
||||||
while run.borrow().to_owned() {
|
|
||||||
if let Some(event) = rx.recv().await {
|
|
||||||
match event {
|
|
||||||
WorkspaceAction::ADD { key, w } => {
|
|
||||||
store.insert(key, Arc::new(*w)); // TODO put in hashmap
|
|
||||||
},
|
|
||||||
WorkspaceAction::REMOVE { key } => {
|
|
||||||
store.remove(&key);
|
|
||||||
},
|
|
||||||
}
|
|
||||||
tx.send(store.clone()).unwrap();
|
|
||||||
} else {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn view(&self) -> WorkspacesView {
|
|
||||||
return self.workspaces.clone();
|
|
||||||
}
|
|
||||||
|
|
||||||
/// get a workspace Arc directly, without passing by the WorkspacesView
|
|
||||||
pub fn get(&self, key: &Uuid) -> Option<Arc<Workspace>> {
|
|
||||||
if let Some(w) = self.workspaces.borrow().get(key) {
|
|
||||||
return Some(w.clone());
|
|
||||||
}
|
|
||||||
return None;
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,228 +0,0 @@
|
||||||
use std::collections::HashMap;
|
|
||||||
|
|
||||||
use tokio::sync::{broadcast, mpsc, watch::{self, Ref}};
|
|
||||||
use tracing::{warn, info};
|
|
||||||
|
|
||||||
use library::{events::Event, user::{User, UserCursor}};
|
|
||||||
|
|
||||||
use crate::service::workspace::proto::CursorUpdate;
|
|
||||||
|
|
||||||
use super::buffer::{BufferView, Buffer};
|
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
|
||||||
pub struct UsersView {
|
|
||||||
watch: watch::Receiver<HashMap<String, User>>,
|
|
||||||
op: mpsc::Sender<UserAction>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl UsersView { // TODO don't unwrap everything!
|
|
||||||
pub fn borrow(&self) -> Ref<HashMap<String, User>> {
|
|
||||||
return self.watch.borrow();
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn add(&mut self, user: User) {
|
|
||||||
self.op.send(UserAction::ADD{ user }).await.unwrap();
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn remove(&mut self, name: String) {
|
|
||||||
self.op.send(UserAction::REMOVE{ name }).await.unwrap();
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn update(&mut self, user_name: String, cursor: UserCursor) {
|
|
||||||
self.op.send(UserAction::CURSOR { name: user_name, cursor }).await.unwrap();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
|
||||||
pub struct BuffersTreeView {
|
|
||||||
watch: watch::Receiver<HashMap<String, BufferView>>,
|
|
||||||
op: mpsc::Sender<BufferAction>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl BuffersTreeView {
|
|
||||||
pub fn borrow(&self) -> Ref<HashMap<String, BufferView>> {
|
|
||||||
return self.watch.borrow();
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn add(&mut self, buffer: Buffer) {
|
|
||||||
self.op.send(BufferAction::ADD { buffer }).await.unwrap();
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn remove(&mut self, path: String) {
|
|
||||||
self.op.send(BufferAction::REMOVE { path }).await.unwrap();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct WorkspaceView {
|
|
||||||
rx: broadcast::Receiver<Event>,
|
|
||||||
pub users: UsersView,
|
|
||||||
pub buffers: BuffersTreeView,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl WorkspaceView {
|
|
||||||
pub async fn event(&mut self) -> Result<Event, broadcast::error::RecvError> {
|
|
||||||
self.rx.recv().await
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Must be clonable, containing references to the actual state maybe? Or maybe give everyone an Arc, idk
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub struct Workspace {
|
|
||||||
pub id: uuid::Uuid,
|
|
||||||
pub name: String,
|
|
||||||
pub bus: broadcast::Sender<Event>,
|
|
||||||
pub cursors: broadcast::Sender<CursorUpdate>,
|
|
||||||
|
|
||||||
pub buffers: BuffersTreeView,
|
|
||||||
pub users: UsersView,
|
|
||||||
|
|
||||||
run_tx: watch::Sender<bool>,
|
|
||||||
run_rx: watch::Receiver<bool>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Drop for Workspace {
|
|
||||||
fn drop(&mut self) {
|
|
||||||
self.run_tx.send(false).unwrap_or_else(|e| warn!("could not stop workspace worker: {:?}", e));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Workspace {
|
|
||||||
pub fn new(name: String) -> Self {
|
|
||||||
let (op_buf_tx, op_buf_rx) = mpsc::channel::<BufferAction>(32);
|
|
||||||
let (op_usr_tx, op_usr_rx) = mpsc::channel::<UserAction>(32);
|
|
||||||
let (run_tx, run_rx) = watch::channel::<bool>(true);
|
|
||||||
let (buffer_tx, buffer_rx) = watch::channel::<HashMap<String, BufferView>>(HashMap::new());
|
|
||||||
let (users_tx, users_rx) = watch::channel(HashMap::new());
|
|
||||||
let (broadcast_tx, _broadcast_rx) = broadcast::channel::<Event>(32);
|
|
||||||
let (cursors_tx, _cursors_rx) = broadcast::channel::<CursorUpdate>(32);
|
|
||||||
|
|
||||||
let w = Workspace {
|
|
||||||
id: uuid::Uuid::new_v4(),
|
|
||||||
name,
|
|
||||||
bus: broadcast_tx,
|
|
||||||
cursors: cursors_tx,
|
|
||||||
buffers: BuffersTreeView{ op: op_buf_tx, watch: buffer_rx },
|
|
||||||
users: UsersView{ op: op_usr_tx, watch: users_rx },
|
|
||||||
run_tx,
|
|
||||||
run_rx,
|
|
||||||
};
|
|
||||||
|
|
||||||
w.users_worker(op_usr_rx, users_tx); // spawn worker to handle users
|
|
||||||
w.buffers_worker(op_buf_rx, buffer_tx); // spawn worker to handle buffers
|
|
||||||
//
|
|
||||||
info!("new workspace created: {}[{}]", w.name, w.id);
|
|
||||||
|
|
||||||
return w;
|
|
||||||
}
|
|
||||||
|
|
||||||
fn buffers_worker(&self, mut rx: mpsc::Receiver<BufferAction>, tx: watch::Sender<HashMap<String, BufferView>>) {
|
|
||||||
let bus = self.bus.clone();
|
|
||||||
let run = self.run_rx.clone();
|
|
||||||
tokio::spawn(async move {
|
|
||||||
let mut buffers : HashMap<String, Buffer> = HashMap::new();
|
|
||||||
|
|
||||||
while run.borrow().to_owned() {
|
|
||||||
// TODO handle these errors!!
|
|
||||||
let action = rx.recv().await.unwrap();
|
|
||||||
match action {
|
|
||||||
BufferAction::ADD { buffer } => {
|
|
||||||
let view = buffer.view();
|
|
||||||
buffers.insert(view.name.clone(), buffer);
|
|
||||||
bus.send(Event::BufferNew { path: view.name }).unwrap();
|
|
||||||
}
|
|
||||||
BufferAction::REMOVE { path } => {
|
|
||||||
buffers.remove(&path);
|
|
||||||
bus.send(Event::BufferDelete { path }).unwrap();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
tx.send(
|
|
||||||
buffers.iter()
|
|
||||||
.map(|(k, v)| (k.clone(), v.view()))
|
|
||||||
.collect()
|
|
||||||
).unwrap();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
fn users_worker(&self, mut rx: mpsc::Receiver<UserAction>, tx: watch::Sender<HashMap<String, User>>) {
|
|
||||||
let bus = self.bus.clone();
|
|
||||||
let cursors_tx = self.cursors.clone();
|
|
||||||
let run = self.run_rx.clone();
|
|
||||||
tokio::spawn(async move {
|
|
||||||
let mut cursors_rx = cursors_tx.subscribe();
|
|
||||||
let mut users : HashMap<String, User> = HashMap::new();
|
|
||||||
|
|
||||||
while run.borrow().to_owned() {
|
|
||||||
tokio::select!{
|
|
||||||
action = rx.recv() => {
|
|
||||||
match action.unwrap() {
|
|
||||||
UserAction::ADD { user } => {
|
|
||||||
users.insert(user.name.clone(), user.clone());
|
|
||||||
bus.send(Event::UserJoin { user }).unwrap();
|
|
||||||
},
|
|
||||||
UserAction::REMOVE { name } => {
|
|
||||||
if let None = users.remove(&name) {
|
|
||||||
continue; // don't update channel since this was a no-op
|
|
||||||
} else {
|
|
||||||
bus.send(Event::UserLeave { name }).unwrap();
|
|
||||||
}
|
|
||||||
},
|
|
||||||
UserAction::CURSOR { name, cursor } => {
|
|
||||||
if let Some(user) = users.get_mut(&name) {
|
|
||||||
user.cursor = cursor.clone();
|
|
||||||
} else {
|
|
||||||
continue; // don't update channel since this was a no-op
|
|
||||||
}
|
|
||||||
},
|
|
||||||
};
|
|
||||||
},
|
|
||||||
cursor = cursors_rx.recv() => {
|
|
||||||
let cursor = cursor.unwrap();
|
|
||||||
if let Some(user) = users.get_mut(&cursor.username) {
|
|
||||||
user.cursor = UserCursor { buffer: cursor.buffer, x:cursor.col, y:cursor.row };
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
tx.send(
|
|
||||||
users.iter()
|
|
||||||
.map(|(k, u)| (k.clone(), u.clone()))
|
|
||||||
.collect()
|
|
||||||
).unwrap();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn view(&self) -> WorkspaceView {
|
|
||||||
WorkspaceView {
|
|
||||||
rx: self.bus.subscribe(),
|
|
||||||
users: self.users.clone(),
|
|
||||||
buffers: self.buffers.clone(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
|
||||||
enum UserAction {
|
|
||||||
ADD {
|
|
||||||
user: User,
|
|
||||||
},
|
|
||||||
REMOVE {
|
|
||||||
name: String,
|
|
||||||
},
|
|
||||||
CURSOR {
|
|
||||||
name: String,
|
|
||||||
cursor: UserCursor,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
|
||||||
enum BufferAction {
|
|
||||||
ADD {
|
|
||||||
buffer: Buffer,
|
|
||||||
},
|
|
||||||
REMOVE {
|
|
||||||
path: String, // TODO remove by id?
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
98
src/server/buffer/actor.rs
Normal file
98
src/server/buffer/actor.rs
Normal file
|
@ -0,0 +1,98 @@
|
||||||
|
use codemp::proto::{RawOp, OperationRequest};
|
||||||
|
use tokio::sync::{mpsc, broadcast, watch};
|
||||||
|
use tracing::{error, warn};
|
||||||
|
use md5::Digest;
|
||||||
|
|
||||||
|
use operational_transform::OperationSeq;
|
||||||
|
|
||||||
|
pub trait BufferStore<T> {
|
||||||
|
fn get(&self, key: &T) -> Option<&BufferHandle>;
|
||||||
|
fn put(&mut self, key: T, handle: BufferHandle) -> Option<BufferHandle>;
|
||||||
|
|
||||||
|
fn handle(&mut self, key: T, content: Option<String>) {
|
||||||
|
let handle = BufferHandle::new(content);
|
||||||
|
self.put(key, handle);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct BufferHandle {
|
||||||
|
pub edit: mpsc::Sender<OperationRequest>,
|
||||||
|
events: broadcast::Sender<RawOp>,
|
||||||
|
pub digest: watch::Receiver<Digest>,
|
||||||
|
pub content: watch::Receiver<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl BufferHandle {
|
||||||
|
fn new(init: Option<String>) -> Self {
|
||||||
|
let init_val = init.unwrap_or("".into());
|
||||||
|
let (edits_tx, edits_rx) = mpsc::channel(64); // TODO hardcoded size
|
||||||
|
let (events_tx, _events_rx) = broadcast::channel(64); // TODO hardcoded size
|
||||||
|
let (digest_tx, digest_rx) = watch::channel(md5::compute(&init_val));
|
||||||
|
let (content_tx, content_rx) = watch::channel(init_val.clone());
|
||||||
|
|
||||||
|
let events_tx_clone = events_tx.clone();
|
||||||
|
|
||||||
|
tokio::spawn(async move {
|
||||||
|
let worker = BufferWorker {
|
||||||
|
store: init_val,
|
||||||
|
edits: edits_rx,
|
||||||
|
events: events_tx_clone,
|
||||||
|
digest: digest_tx,
|
||||||
|
content: content_tx,
|
||||||
|
};
|
||||||
|
worker.work().await
|
||||||
|
});
|
||||||
|
|
||||||
|
BufferHandle {
|
||||||
|
edit: edits_tx,
|
||||||
|
events: events_tx,
|
||||||
|
digest: digest_rx,
|
||||||
|
content: content_rx,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn subscribe(&self) -> broadcast::Receiver<RawOp> {
|
||||||
|
self.events.subscribe()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
struct BufferWorker {
|
||||||
|
store: String,
|
||||||
|
edits: mpsc::Receiver<OperationRequest>,
|
||||||
|
events: broadcast::Sender<RawOp>,
|
||||||
|
digest: watch::Sender<Digest>,
|
||||||
|
content: watch::Sender<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl BufferWorker {
|
||||||
|
async fn work(mut self) {
|
||||||
|
loop {
|
||||||
|
match self.edits.recv().await {
|
||||||
|
None => break warn!("channel closed"),
|
||||||
|
Some(v) => match serde_json::from_str::<OperationSeq>(&v.opseq) {
|
||||||
|
Err(e) => break error!("could not deserialize opseq: {}", e),
|
||||||
|
Ok(op) => match op.apply(&self.store) {
|
||||||
|
Err(e) => error!("coult not apply OpSeq '{:?}' on '{}' : {}", v, self.store, e),
|
||||||
|
Ok(res) => {
|
||||||
|
self.store = res;
|
||||||
|
let msg = RawOp {
|
||||||
|
opseq: v.opseq,
|
||||||
|
user: v.user
|
||||||
|
};
|
||||||
|
if let Err(e) = self.digest.send(md5::compute(&self.store)) {
|
||||||
|
error!("could not update digest: {}", e);
|
||||||
|
}
|
||||||
|
if let Err(e) = self.content.send(self.store.clone()) {
|
||||||
|
error!("could not update content: {}", e);
|
||||||
|
}
|
||||||
|
if let Err(e) = self.events.send(msg) {
|
||||||
|
error!("could not broadcast OpSeq: {}", e);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
}
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
2
src/server/buffer/mod.rs
Normal file
2
src/server/buffer/mod.rs
Normal file
|
@ -0,0 +1,2 @@
|
||||||
|
pub mod actor;
|
||||||
|
pub mod service;
|
159
src/server/buffer/service.rs
Normal file
159
src/server/buffer/service.rs
Normal file
|
@ -0,0 +1,159 @@
|
||||||
|
use std::{pin::Pin, sync::{Arc, RwLock}, collections::HashMap};
|
||||||
|
|
||||||
|
use tokio::sync::{mpsc, broadcast};
|
||||||
|
use tonic::{Request, Response, Status};
|
||||||
|
|
||||||
|
use tokio_stream::{Stream, wrappers::ReceiverStream}; // TODO example used this?
|
||||||
|
|
||||||
|
use codemp::proto::{buffer_server::{Buffer, BufferServer}, RawOp, BufferPayload, BufferResponse, OperationRequest, CursorMov};
|
||||||
|
use tracing::info;
|
||||||
|
|
||||||
|
use super::actor::{BufferHandle, BufferStore};
|
||||||
|
|
||||||
|
type OperationStream = Pin<Box<dyn Stream<Item = Result<RawOp, Status>> + Send>>;
|
||||||
|
type CursorStream = Pin<Box<dyn Stream<Item = Result<CursorMov, Status>> + Send>>;
|
||||||
|
|
||||||
|
struct BufferMap {
|
||||||
|
store: HashMap<String, BufferHandle>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From::<HashMap<String, BufferHandle>> for BufferMap {
|
||||||
|
fn from(value: HashMap<String, BufferHandle>) -> Self {
|
||||||
|
BufferMap { store: value }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl BufferStore<String> for BufferMap {
|
||||||
|
fn get(&self, key: &String) -> Option<&BufferHandle> {
|
||||||
|
self.store.get(key)
|
||||||
|
}
|
||||||
|
fn put(&mut self, key: String, handle: BufferHandle) -> Option<BufferHandle> {
|
||||||
|
self.store.insert(key, handle)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct BufferService {
|
||||||
|
map: Arc<RwLock<BufferMap>>,
|
||||||
|
cursor: broadcast::Sender<CursorMov>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl BufferService {
|
||||||
|
#[allow(unused)]
|
||||||
|
fn get_buffer(&self, path: &String) -> Result<BufferHandle, Status> {
|
||||||
|
match self.map.read().unwrap().get(path) {
|
||||||
|
Some(buf) => Ok(buf.clone()),
|
||||||
|
None => Err(Status::not_found("no buffer for given path")),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tonic::async_trait]
|
||||||
|
impl Buffer for BufferService {
|
||||||
|
type AttachStream = OperationStream;
|
||||||
|
type ListenStream = CursorStream;
|
||||||
|
|
||||||
|
async fn attach(&self, req: Request<BufferPayload>) -> Result<tonic::Response<OperationStream>, Status> {
|
||||||
|
let request = req.into_inner();
|
||||||
|
let myself = request.user;
|
||||||
|
match self.map.read().unwrap().get(&request.path) {
|
||||||
|
Some(handle) => {
|
||||||
|
let (tx, rx) = mpsc::channel(128);
|
||||||
|
let mut sub = handle.subscribe();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
loop {
|
||||||
|
match sub.recv().await {
|
||||||
|
Ok(v) => {
|
||||||
|
if v.user == myself { continue }
|
||||||
|
tx.send(Ok(v)).await.unwrap(); // TODO unnecessary channel?
|
||||||
|
}
|
||||||
|
Err(_e) => break,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
let output_stream = ReceiverStream::new(rx);
|
||||||
|
info!("registered new subscriber on buffer");
|
||||||
|
Ok(Response::new(Box::pin(output_stream)))
|
||||||
|
},
|
||||||
|
None => Err(Status::not_found("path not found")),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn listen(&self, req: Request<BufferPayload>) -> Result<tonic::Response<CursorStream>, Status> {
|
||||||
|
let mut sub = self.cursor.subscribe();
|
||||||
|
let myself = req.into_inner().user;
|
||||||
|
let (tx, rx) = mpsc::channel(128);
|
||||||
|
tokio::spawn(async move {
|
||||||
|
loop {
|
||||||
|
match sub.recv().await {
|
||||||
|
Ok(v) => {
|
||||||
|
if v.user == myself { continue }
|
||||||
|
tx.send(Ok(v)).await.unwrap(); // TODO unnecessary channel?
|
||||||
|
}
|
||||||
|
Err(_e) => break,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
let output_stream = ReceiverStream::new(rx);
|
||||||
|
info!("registered new subscriber to cursor updates");
|
||||||
|
Ok(Response::new(Box::pin(output_stream)))
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn cursor(&self, req:Request<CursorMov>) -> Result<Response<BufferResponse>, Status> {
|
||||||
|
match self.cursor.send(req.into_inner()) {
|
||||||
|
Ok(_) => Ok(Response::new(BufferResponse { accepted: true, content: None})),
|
||||||
|
Err(e) => Err(Status::internal(format!("could not broadcast cursor update: {}", e))),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn edit(&self, req:Request<OperationRequest>) -> Result<Response<BufferResponse>, Status> {
|
||||||
|
let request = req.into_inner();
|
||||||
|
let tx = match self.map.read().unwrap().get(&request.path) {
|
||||||
|
Some(handle) => {
|
||||||
|
// if format!("{:x}", *handle.digest.borrow()) != request.hash {
|
||||||
|
// return Ok(Response::new(BufferResponse { accepted : false } ));
|
||||||
|
// }
|
||||||
|
handle.edit.clone()
|
||||||
|
},
|
||||||
|
None => return Err(Status::not_found("path not found")),
|
||||||
|
};
|
||||||
|
info!("sending edit to buffer: {}", request.opseq);
|
||||||
|
match tx.send(request).await {
|
||||||
|
Ok(()) => Ok(Response::new(BufferResponse { accepted: true, content: None })),
|
||||||
|
Err(e) => Err(Status::internal(format!("error sending edit to buffer actor: {}", e))),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn create(&self, req:Request<BufferPayload>) -> Result<Response<BufferResponse>, Status> {
|
||||||
|
let request = req.into_inner();
|
||||||
|
let _handle = self.map.write().unwrap().handle(request.path, request.content);
|
||||||
|
info!("created new buffer");
|
||||||
|
let answ = BufferResponse { accepted: true, content: None };
|
||||||
|
Ok(Response::new(answ))
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn sync(&self, req: Request<BufferPayload>) -> Result<Response<BufferResponse>, Status> {
|
||||||
|
let request = req.into_inner();
|
||||||
|
match self.map.read().unwrap().get(&request.path) {
|
||||||
|
None => Err(Status::not_found("requested buffer does not exist")),
|
||||||
|
Some(buf) => {
|
||||||
|
info!("synching buffer");
|
||||||
|
let answ = BufferResponse { accepted: true, content: Some(buf.content.borrow().clone()) };
|
||||||
|
Ok(Response::new(answ))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl BufferService {
|
||||||
|
pub fn new() -> BufferService {
|
||||||
|
let (cur_tx, _cur_rx) = broadcast::channel(64); // TODO hardcoded capacity
|
||||||
|
BufferService {
|
||||||
|
map: Arc::new(RwLock::new(HashMap::new().into())),
|
||||||
|
cursor: cur_tx,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn server(self) -> BufferServer<BufferService> {
|
||||||
|
BufferServer::new(self)
|
||||||
|
}
|
||||||
|
}
|
|
@ -4,35 +4,40 @@
|
||||||
//! all clients and synching everyone's cursor.
|
//! all clients and synching everyone's cursor.
|
||||||
//!
|
//!
|
||||||
|
|
||||||
pub mod actor;
|
use clap::Parser;
|
||||||
pub mod service;
|
|
||||||
|
|
||||||
use std::sync::Arc;
|
|
||||||
|
|
||||||
use tracing::info;
|
use tracing::info;
|
||||||
|
|
||||||
use tonic::transport::Server;
|
use tonic::transport::Server;
|
||||||
|
|
||||||
use crate::{
|
mod buffer;
|
||||||
actor::state::StateManager,
|
|
||||||
service::{buffer::BufferService, workspace::WorkspaceService, session::SessionService},
|
use crate::buffer::service::BufferService;
|
||||||
};
|
|
||||||
|
#[derive(Parser, Debug)]
|
||||||
|
struct CliArgs {
|
||||||
|
|
||||||
|
/// address to listen on
|
||||||
|
#[arg(long, default_value = "[::1]:50051")]
|
||||||
|
host: String,
|
||||||
|
|
||||||
|
/// enable debug log level
|
||||||
|
#[arg(long, default_value_t = false)]
|
||||||
|
debug: bool,
|
||||||
|
}
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||||
tracing_subscriber::fmt::init();
|
let args = CliArgs::parse();
|
||||||
|
|
||||||
let addr = "[::1]:50051".parse()?;
|
tracing_subscriber::fmt()
|
||||||
|
.with_writer(std::io::stdout)
|
||||||
|
.with_max_level(if args.debug { tracing::Level::DEBUG } else { tracing::Level::INFO })
|
||||||
|
.init();
|
||||||
|
|
||||||
let state = Arc::new(StateManager::new());
|
info!("starting server");
|
||||||
|
|
||||||
info!("Starting server");
|
|
||||||
|
|
||||||
Server::builder()
|
Server::builder()
|
||||||
.add_service(SessionService::new(state.clone()).server())
|
.add_service(BufferService::new().server())
|
||||||
.add_service(WorkspaceService::new(state.clone()).server())
|
.serve(args.host.parse()?)
|
||||||
.add_service(BufferService::new(state.clone()).server())
|
|
||||||
.serve(addr)
|
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
|
|
@ -1,156 +0,0 @@
|
||||||
use std::collections::VecDeque;
|
|
||||||
use std::{pin::Pin, sync::Arc};
|
|
||||||
|
|
||||||
use uuid::Uuid;
|
|
||||||
|
|
||||||
use tokio_stream::wrappers::ReceiverStream;
|
|
||||||
use tracing::error;
|
|
||||||
|
|
||||||
use operational_transform::OperationSeq;
|
|
||||||
use tonic::{Request, Response, Status};
|
|
||||||
|
|
||||||
pub mod proto {
|
|
||||||
tonic::include_proto!("buffer");
|
|
||||||
}
|
|
||||||
|
|
||||||
use library::events::Event;
|
|
||||||
|
|
||||||
use tokio::sync::{broadcast, mpsc};
|
|
||||||
use tokio_stream::{Stream, StreamExt}; // TODO example used this?
|
|
||||||
|
|
||||||
use proto::buffer_server::{Buffer, BufferServer};
|
|
||||||
use proto::Operation;
|
|
||||||
|
|
||||||
use tonic::Streaming;
|
|
||||||
//use futures::{Stream, StreamExt};
|
|
||||||
|
|
||||||
use crate::actor::{buffer::BufferView, state::StateManager};
|
|
||||||
|
|
||||||
use self::proto::{BufferPayload, BufferResponse}; // TODO fuck x2!
|
|
||||||
|
|
||||||
type OperationStream = Pin<Box<dyn Stream<Item = Result<Operation, Status>> + Send>>;
|
|
||||||
|
|
||||||
pub struct BufferService {
|
|
||||||
state: Arc<StateManager>,
|
|
||||||
}
|
|
||||||
|
|
||||||
fn op_seq(_o: &Operation) -> OperationSeq {
|
|
||||||
todo!()
|
|
||||||
}
|
|
||||||
fn _op_net(_o: &OperationSeq) -> Operation {
|
|
||||||
todo!()
|
|
||||||
}
|
|
||||||
|
|
||||||
// async fn buffer_worker(tx: mpsc::Sender<Result<Operation, Status>>, mut rx:Streaming<Operation>, mut rx_core: mpsc::Receiver<Operation>) {
|
|
||||||
async fn buffer_worker(
|
|
||||||
bv: BufferView,
|
|
||||||
mut client_rx: Streaming<Operation>,
|
|
||||||
_tx_client: mpsc::Sender<Result<Operation, Status>>,
|
|
||||||
mut rx_core: broadcast::Receiver<Event>,
|
|
||||||
) {
|
|
||||||
let mut queue: VecDeque<Operation> = VecDeque::new();
|
|
||||||
loop {
|
|
||||||
tokio::select! {
|
|
||||||
client_op = client_rx.next() => {
|
|
||||||
if let Some(result) = client_op {
|
|
||||||
match result {
|
|
||||||
Ok(op) => {
|
|
||||||
bv.op(op_seq(&op)).await.unwrap(); // TODO make OpSeq from network Operation pkt!
|
|
||||||
queue.push_back(op);
|
|
||||||
},
|
|
||||||
Err(status) => {
|
|
||||||
error!("error receiving op from client: {:?}", status);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
},
|
|
||||||
|
|
||||||
server_op = rx_core.recv() => {
|
|
||||||
if let Ok(_oop) = server_op {
|
|
||||||
let mut send_op = true;
|
|
||||||
for (i, _op) in queue.iter().enumerate() {
|
|
||||||
if true { // TODO must compare underlying OperationSeq here! (op.equals(server_op))
|
|
||||||
queue.remove(i);
|
|
||||||
send_op = false;
|
|
||||||
break;
|
|
||||||
} else {
|
|
||||||
// serv_op.transform(op); // TODO transform OpSeq !
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if send_op {
|
|
||||||
// tx_client.send(Ok(op_net(&oop.1))).await.unwrap();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tonic::async_trait]
|
|
||||||
impl Buffer for BufferService {
|
|
||||||
// type ServerStreamingEchoStream = ResponseStream;
|
|
||||||
type AttachStream = OperationStream;
|
|
||||||
|
|
||||||
async fn attach(
|
|
||||||
&self,
|
|
||||||
req: Request<Streaming<Operation>>,
|
|
||||||
) -> Result<tonic::Response<OperationStream>, Status> {
|
|
||||||
let session_id: String;
|
|
||||||
if let Some(sid) = req.metadata().get("session_id") {
|
|
||||||
session_id = sid.to_str().unwrap().to_string();
|
|
||||||
} else {
|
|
||||||
return Err(Status::failed_precondition(
|
|
||||||
"Missing metadata key 'session_id'",
|
|
||||||
));
|
|
||||||
}
|
|
||||||
|
|
||||||
let path: String;
|
|
||||||
if let Some(p) = req.metadata().get("path") {
|
|
||||||
path = p.to_str().unwrap().to_string();
|
|
||||||
} else {
|
|
||||||
return Err(Status::failed_precondition("Missing metadata key 'path'"));
|
|
||||||
}
|
|
||||||
// TODO make these above nicer? more concise? idk
|
|
||||||
|
|
||||||
if let Some(workspace) = self.state.workspaces.borrow().get(&Uuid::parse_str(session_id.as_str()).unwrap()) {
|
|
||||||
let in_stream = req.into_inner();
|
|
||||||
let (tx_og, rx) = mpsc::channel::<Result<Operation, Status>>(128);
|
|
||||||
|
|
||||||
let b: BufferView = workspace.buffers.borrow().get(&path).unwrap().clone();
|
|
||||||
let w = workspace.clone();
|
|
||||||
tokio::spawn(async move {
|
|
||||||
buffer_worker(b, in_stream, tx_og, w.bus.subscribe()).await;
|
|
||||||
});
|
|
||||||
|
|
||||||
// echo just write the same data that was received
|
|
||||||
let out_stream = ReceiverStream::new(rx);
|
|
||||||
|
|
||||||
return Ok(Response::new(Box::pin(out_stream) as Self::AttachStream));
|
|
||||||
} else {
|
|
||||||
return Err(Status::not_found(format!(
|
|
||||||
"Norkspace with session_id {}",
|
|
||||||
session_id
|
|
||||||
)));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn push(&self, _req:Request<BufferPayload>) -> Result<Response<BufferResponse>, Status> {
|
|
||||||
todo!()
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn pull(&self, _req:Request<BufferPayload>) -> Result<Response<BufferPayload>, Status> {
|
|
||||||
todo!()
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
impl BufferService {
|
|
||||||
pub fn new(state: Arc<StateManager>) -> BufferService {
|
|
||||||
BufferService { state }
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn server(self) -> BufferServer<BufferService> {
|
|
||||||
BufferServer::new(self)
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,3 +0,0 @@
|
||||||
pub mod buffer;
|
|
||||||
pub mod session;
|
|
||||||
pub mod workspace;
|
|
|
@ -1,59 +0,0 @@
|
||||||
pub mod proto {
|
|
||||||
tonic::include_proto!("session");
|
|
||||||
}
|
|
||||||
|
|
||||||
use std::sync::Arc;
|
|
||||||
|
|
||||||
use proto::{session_server::Session, WorkspaceBuilderRequest, SessionResponse};
|
|
||||||
use tonic::{Request, Response, Status};
|
|
||||||
|
|
||||||
|
|
||||||
use crate::actor::{
|
|
||||||
state::StateManager, workspace::Workspace, // TODO fuck x2!
|
|
||||||
};
|
|
||||||
|
|
||||||
use self::proto::session_server::SessionServer;
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub struct SessionService {
|
|
||||||
state: Arc<StateManager>,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tonic::async_trait]
|
|
||||||
impl Session for SessionService {
|
|
||||||
async fn create_workspace(
|
|
||||||
&self,
|
|
||||||
_req: Request<WorkspaceBuilderRequest>,
|
|
||||||
) -> Result<Response<SessionResponse>, Status> {
|
|
||||||
// let name = req.extensions().get::<WorkspaceExtension>().unwrap().id.clone();
|
|
||||||
let w = Workspace::new("im lazy".into());
|
|
||||||
let res = SessionResponse { accepted:true, session_key: w.id.to_string() };
|
|
||||||
|
|
||||||
self.state.view().add(w).await;
|
|
||||||
Ok(Response::new(res))
|
|
||||||
}
|
|
||||||
|
|
||||||
// async fn authenticate(
|
|
||||||
// &self,
|
|
||||||
// req: Request<SessionRequest>,
|
|
||||||
// ) -> Result<Response<SessionResponse>, Status> {
|
|
||||||
// todo!()
|
|
||||||
// }
|
|
||||||
|
|
||||||
// async fn list_workspaces(
|
|
||||||
// &self,
|
|
||||||
// req: Request<SessionRequest>,
|
|
||||||
// ) -> Result<Response<WorkspaceList>, Status> {
|
|
||||||
// todo!()
|
|
||||||
// }
|
|
||||||
}
|
|
||||||
|
|
||||||
impl SessionService {
|
|
||||||
pub fn new(state: Arc<StateManager>) -> SessionService {
|
|
||||||
SessionService { state }
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn server(self) -> SessionServer<SessionService> {
|
|
||||||
SessionServer::new(self)
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,258 +0,0 @@
|
||||||
use std::{pin::Pin, sync::Arc};
|
|
||||||
|
|
||||||
use uuid::Uuid;
|
|
||||||
use tonic::codegen::InterceptedService;
|
|
||||||
use tonic::service::Interceptor;
|
|
||||||
use tracing::info;
|
|
||||||
|
|
||||||
use tokio_stream::wrappers::ReceiverStream;
|
|
||||||
use tonic::{Request, Response, Status, Streaming};
|
|
||||||
use tokio::sync::{watch, mpsc};
|
|
||||||
|
|
||||||
pub mod proto {
|
|
||||||
tonic::include_proto!("workspace");
|
|
||||||
}
|
|
||||||
|
|
||||||
use library::user::User;
|
|
||||||
|
|
||||||
use tokio_stream::{Stream, StreamExt}; // TODO example used this?
|
|
||||||
|
|
||||||
use proto::workspace_server::{Workspace, WorkspaceServer};
|
|
||||||
use proto::{BufferList, WorkspaceEvent, WorkspaceRequest, WorkspaceResponse, UsersList, BufferRequest, CursorUpdate, JoinRequest};
|
|
||||||
|
|
||||||
use library::user::UserCursor;
|
|
||||||
use crate::actor::{buffer::Buffer, state::StateManager}; // TODO fuck x2!
|
|
||||||
|
|
||||||
pub struct WorkspaceExtension {
|
|
||||||
pub id: String
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
|
||||||
pub struct WorkspaceInterceptor {
|
|
||||||
state: Arc<StateManager>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Interceptor for WorkspaceInterceptor {
|
|
||||||
fn call(&mut self, mut req: tonic::Request<()>) -> Result<tonic::Request<()>, Status> {
|
|
||||||
// Set an extension that can be retrieved by `say_hello`
|
|
||||||
let id;
|
|
||||||
|
|
||||||
// TODO this is kinda spaghetti but I can't borrow immutably and mutably req inside this match
|
|
||||||
// tree...
|
|
||||||
match req.metadata().get("workspace") {
|
|
||||||
Some(value) => {
|
|
||||||
info!("Metadata: {:?}", value);
|
|
||||||
match value.to_str() {
|
|
||||||
Ok(w_id) => {
|
|
||||||
id = w_id.to_string();
|
|
||||||
},
|
|
||||||
Err(_) => return Err(Status::invalid_argument("Workspace key is not valid")),
|
|
||||||
}
|
|
||||||
},
|
|
||||||
None => return Err(Status::unauthenticated("No workspace key included in request"))
|
|
||||||
}
|
|
||||||
|
|
||||||
info!("checking request : {}", id);
|
|
||||||
|
|
||||||
let uid = match Uuid::parse_str(id.as_str()) {
|
|
||||||
Ok(id) => id,
|
|
||||||
Err(e) => { return Err(Status::invalid_argument(format!("Invalid uuid : {}", e))); },
|
|
||||||
};
|
|
||||||
|
|
||||||
if !self.state.workspaces.borrow().contains_key(&uid) {
|
|
||||||
return Err(Status::not_found(format!("Workspace '{}' could not be found", id)));
|
|
||||||
}
|
|
||||||
|
|
||||||
req.extensions_mut().insert(WorkspaceExtension { id });
|
|
||||||
Ok(req)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
type EventStream = Pin<Box<dyn Stream<Item = Result<WorkspaceEvent, Status>> + Send>>;
|
|
||||||
type CursorUpdateStream = Pin<Box<dyn Stream<Item = Result<CursorUpdate, Status>> + Send>>;
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub struct WorkspaceService {
|
|
||||||
state: Arc<StateManager>,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tonic::async_trait]
|
|
||||||
impl Workspace for WorkspaceService {
|
|
||||||
type JoinStream = EventStream;
|
|
||||||
type SubscribeStream = CursorUpdateStream;
|
|
||||||
|
|
||||||
async fn join(
|
|
||||||
&self,
|
|
||||||
req: Request<JoinRequest>,
|
|
||||||
) -> Result<tonic::Response<Self::JoinStream>, Status> {
|
|
||||||
let session_id = Uuid::parse_str(req.extensions().get::<WorkspaceExtension>().unwrap().id.as_str()).unwrap();
|
|
||||||
let r = req.into_inner();
|
|
||||||
let run = self.state.run.clone();
|
|
||||||
let user_name = r.name.clone();
|
|
||||||
match self.state.get(&session_id) {
|
|
||||||
Some(w) => {
|
|
||||||
let (tx, rx) = mpsc::channel::<Result<WorkspaceEvent, Status>>(128);
|
|
||||||
tokio::spawn(async move {
|
|
||||||
let mut event_receiver = w.bus.subscribe();
|
|
||||||
w.view().users.add(
|
|
||||||
User {
|
|
||||||
name: r.name.clone(),
|
|
||||||
cursor: UserCursor { buffer:0, x:0, y:0 }
|
|
||||||
}
|
|
||||||
).await;
|
|
||||||
info!("User {} joined workspace {}", r.name, w.id);
|
|
||||||
while run.borrow().to_owned() {
|
|
||||||
let res = event_receiver.recv().await.unwrap();
|
|
||||||
let broadcasting = WorkspaceEvent { id: 1, body: Some(res.to_string()) }; // TODO actually process packet
|
|
||||||
tx.send(Ok(broadcasting)).await.unwrap();
|
|
||||||
}
|
|
||||||
w.view().users.remove(user_name).await;
|
|
||||||
});
|
|
||||||
return Ok(Response::new(Box::pin(ReceiverStream::new(rx))));
|
|
||||||
},
|
|
||||||
None => Err(Status::not_found(format!(
|
|
||||||
"No active workspace with session_key '{}'",
|
|
||||||
session_id
|
|
||||||
)))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn subscribe(
|
|
||||||
&self,
|
|
||||||
req: tonic::Request<Streaming<CursorUpdate>>,
|
|
||||||
) -> Result<Response<Self::SubscribeStream>, Status> {
|
|
||||||
let s_id = Uuid::parse_str(req.extensions().get::<WorkspaceExtension>().unwrap().id.as_str()).unwrap();
|
|
||||||
let mut r = req.into_inner();
|
|
||||||
match self.state.get(&s_id) {
|
|
||||||
Some(w) => {
|
|
||||||
let cursors_ref = w.cursors.clone();
|
|
||||||
let (_stop_tx, stop_rx) = watch::channel(true);
|
|
||||||
let (tx, rx) = mpsc::channel::<Result<CursorUpdate, Status>>(128);
|
|
||||||
tokio::spawn(async move {
|
|
||||||
let mut workspace_bus = cursors_ref.subscribe();
|
|
||||||
while stop_rx.borrow().to_owned() {
|
|
||||||
tokio::select!{
|
|
||||||
remote = workspace_bus.recv() => {
|
|
||||||
if let Ok(cur) = remote {
|
|
||||||
info!("Sending cursor update : {:?}", cur);
|
|
||||||
tx.send(Ok(cur)).await.unwrap();
|
|
||||||
}
|
|
||||||
},
|
|
||||||
local = r.next() => {
|
|
||||||
match local {
|
|
||||||
Some(request) => {
|
|
||||||
info!("Received cursor update : {:?}", request);
|
|
||||||
match request {
|
|
||||||
Ok(cur) => {
|
|
||||||
cursors_ref.send(cur).unwrap();
|
|
||||||
},
|
|
||||||
Err(_e) => {},
|
|
||||||
}
|
|
||||||
},
|
|
||||||
None => {},
|
|
||||||
}
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
return Ok(Response::new(Box::pin(ReceiverStream::new(rx))));
|
|
||||||
},
|
|
||||||
None => Err(Status::not_found(format!(
|
|
||||||
"No active workspace with session_key '{}'",
|
|
||||||
s_id
|
|
||||||
)))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn buffers(
|
|
||||||
&self,
|
|
||||||
req: Request<WorkspaceRequest>,
|
|
||||||
) -> Result<Response<BufferList>, Status> {
|
|
||||||
let r = req.into_inner();
|
|
||||||
match self.state.get(&Uuid::parse_str(r.session_key.as_str()).unwrap()) {
|
|
||||||
Some(w) => {
|
|
||||||
let mut out = Vec::new();
|
|
||||||
for (_k, v) in w.buffers.borrow().iter() {
|
|
||||||
out.push(v.name.clone());
|
|
||||||
}
|
|
||||||
Ok(Response::new(BufferList { path: out }))
|
|
||||||
}
|
|
||||||
None => Err(Status::not_found(format!(
|
|
||||||
"No active workspace with session_key '{}'",
|
|
||||||
r.session_key
|
|
||||||
))),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn new_buffer(
|
|
||||||
&self,
|
|
||||||
req: Request<BufferRequest>,
|
|
||||||
) -> Result<Response<WorkspaceResponse>, Status> {
|
|
||||||
let session_id = req.extensions().get::<WorkspaceExtension>().unwrap().id.clone();
|
|
||||||
let r = req.into_inner();
|
|
||||||
if let Some(w) = self.state.get(&Uuid::parse_str(session_id.as_str()).unwrap()) {
|
|
||||||
let mut view = w.view();
|
|
||||||
let buf = Buffer::new(r.path, w.bus.clone());
|
|
||||||
view.buffers.add(buf).await;
|
|
||||||
|
|
||||||
Ok(Response::new(WorkspaceResponse { accepted: true }))
|
|
||||||
} else {
|
|
||||||
return Err(Status::not_found(format!(
|
|
||||||
"No active workspace with session_key '{}'",
|
|
||||||
r.session_key
|
|
||||||
)));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn remove_buffer(
|
|
||||||
&self,
|
|
||||||
req: Request<BufferRequest>,
|
|
||||||
) -> Result<Response<WorkspaceResponse>, Status> {
|
|
||||||
let session_id = req.extensions().get::<WorkspaceExtension>().unwrap().id.clone();
|
|
||||||
let r = req.into_inner();
|
|
||||||
match self.state.get(&Uuid::parse_str(session_id.as_str()).unwrap()) {
|
|
||||||
Some(w) => {
|
|
||||||
w.view().buffers.remove(r.path).await;
|
|
||||||
Ok(Response::new(WorkspaceResponse { accepted: true }))
|
|
||||||
}
|
|
||||||
None => Err(Status::not_found(format!(
|
|
||||||
"No active workspace with session_key '{}'",
|
|
||||||
r.session_key
|
|
||||||
))),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn list_users(
|
|
||||||
&self,
|
|
||||||
req: Request<WorkspaceRequest>,
|
|
||||||
) -> Result<Response<UsersList>, Status> {
|
|
||||||
let session_id = req.extensions().get::<WorkspaceExtension>().unwrap().id.clone();
|
|
||||||
let r = req.into_inner();
|
|
||||||
match self.state.get(&Uuid::parse_str(session_id.as_str()).unwrap()) {
|
|
||||||
Some(w) => {
|
|
||||||
let mut out = Vec::new();
|
|
||||||
for (_k, v) in w.users.borrow().iter() {
|
|
||||||
out.push(v.name.clone());
|
|
||||||
}
|
|
||||||
Ok(Response::new(UsersList { name: out }))
|
|
||||||
},
|
|
||||||
None => Err(Status::not_found(format!(
|
|
||||||
"No active workspace with session_key '{}'",
|
|
||||||
r.session_key
|
|
||||||
))),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
impl WorkspaceService {
|
|
||||||
pub fn new(state: Arc<StateManager>) -> WorkspaceService {
|
|
||||||
WorkspaceService { state }
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn server(self) -> InterceptedService<WorkspaceServer<WorkspaceService>, WorkspaceInterceptor> {
|
|
||||||
let state = self.state.clone();
|
|
||||||
WorkspaceServer::with_interceptor(self, WorkspaceInterceptor { state })
|
|
||||||
}
|
|
||||||
}
|
|
Loading…
Reference in a new issue