Skip to content

Commit

Permalink
jsonrpc: start reworking the jsonrpc
Browse files Browse the repository at this point in the history
Link: #182
Signed-off-by: Vincenzo Palazzo <[email protected]>
  • Loading branch information
vincenzopalazzo committed May 13, 2024
1 parent c7b09ed commit 54d2876
Showing 1 changed file with 26 additions and 96 deletions.
122 changes: 26 additions & 96 deletions lampo-jsonrpc/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
//! Full feature async JSON RPC 2.0 Server/client with a
//! minimal dependencies footprint.
use std::cell::{Cell, RefCell};
use std::collections::{HashMap, VecDeque};
use std::collections::HashMap;
use std::io::{self, ErrorKind};
use std::io::{Read, Write};
use std::os::unix::io::AsRawFd;
use std::os::unix::net::UnixListener;
use std::os::unix::net::{SocketAddr, UnixStream};
use std::sync::{Arc, Mutex};
use std::os::unix::net::UnixStream;
use std::sync::Arc;
use std::thread::JoinHandle;

// FIXME: use mio for a better platform support.
Expand All @@ -25,17 +26,15 @@ use crate::json_rpc2::{Request, Response};
#[derive(Debug, Clone, PartialEq)]
pub enum RPCEvent {
Listening,
Connect(String),
Connect(i32),
}

pub struct JSONRPCv2<T: Send + Sync + 'static> {
socket_path: String,
sources: Sources<RPCEvent>,
socket: UnixListener,
handler: Arc<Handler<T>>,
// FIXME: should be not the name but the fd int as key?
pub(crate) conn: HashMap<String, UnixStream>,
conn_queue: Mutex<Cell<HashMap<String, VecDeque<Response<Value>>>>>,
pub(crate) conn: HashMap<i32, UnixStream>,
}

pub struct Handler<T: Send + Sync + 'static> {
Expand Down Expand Up @@ -103,7 +102,6 @@ impl<T: Send + Sync + 'static> JSONRPCv2<T> {
handler: Arc::new(Handler::new(ctx)),
socket_path: path.to_owned(),
conn: HashMap::new(),
conn_queue: Mutex::new(Cell::new(HashMap::new())),
})
}

Expand All @@ -118,51 +116,12 @@ impl<T: Send + Sync + 'static> JSONRPCv2<T> {
Ok(())
}

pub fn add_connection(&mut self, key: &SocketAddr, stream: UnixStream) {
let path = if let Some(path) = key.as_pathname() {
path.to_str().unwrap()
} else {
"unnamed"
};
pub fn add_connection<F: AsRawFd>(&mut self, key: F, stream: UnixStream) {
let res = stream.set_nonblocking(true);
debug_assert!(res.is_ok());
let event = RPCEvent::Connect(path.to_string());
let event = RPCEvent::Connect(key.as_raw_fd());
self.sources.register(event, &stream, popol::interest::ALL);
self.conn.insert(path.to_owned(), stream);
}

pub fn send_resp(&self, key: String, resp: Response<Value>) {
let queue = self.conn_queue.lock().unwrap();

let mut conns = queue.take();
log::debug!(target: "jsonrpc", "{:?}", conns);
if conns.contains_key(&key) {
let Some(queue) = conns.get_mut(&key) else {
panic!("queue not found");
};
queue.push_back(resp);
} else {
let mut q = VecDeque::new();
q.push_back(resp);
conns.insert(key, q);
}
log::debug!(target: "jsonrpc", "{:?}", conns);
queue.set(conns);
}

pub fn pop_resp(&self, key: String) -> Option<Response<Value>> {
let queue = self.conn_queue.lock().unwrap();

let mut conns = queue.take();
if !conns.contains_key(&key) {
return None;
}
let Some(q) = conns.get_mut(&key) else {
return None;
};
let resp = q.pop_front();
queue.set(conns);
resp
self.conn.insert(key.as_raw_fd(), stream);
}

#[allow(dead_code)]
Expand All @@ -181,28 +140,25 @@ impl<T: Send + Sync + 'static> JSONRPCv2<T> {
// Blocking while we are waiting new events!
self.sources.poll(&mut events, Timeout::Never)?;

for mut event in events.drain(..) {
for event in events.drain(..) {
match &event.key {
RPCEvent::Listening => {
let conn = self.socket.accept();
let Ok((stream, addr)) = conn else {
let Ok((stream, _)) = conn else {
if let Err(err) = &conn {
if err.kind() == ErrorKind::WouldBlock {
break;
continue;
}
}
log::error!(target: "jsonrpc", "fail to accept the connection: {:?}", conn);
continue;
};
log::trace!(target: "jsonrpc", "new connection to unix rpc socket");
self.add_connection(&addr, stream);
self.add_connection(stream.as_raw_fd(), stream);
}
RPCEvent::Connect(addr) => {
if event.is_hangup() {
break;
}
if event.is_error() {
log::error!(target: "jsonrpc", "an error occurs");
log::error!(target: "jsonrpc", "an error occurs: `{:?}`", event);
continue;
}

Expand All @@ -222,14 +178,15 @@ impl<T: Send + Sync + 'static> JSONRPCv2<T> {
if err.kind() != ErrorKind::WouldBlock {
return Err(err);
}
log::info!(target: "jsonrpc", "blocking with err {:?}!", err);
continue;
}

let buff = buff.trim();
if buff.is_empty() {
log::warn!(target: "jsonrpc", "buffer is empty");
break;
continue;
}
let buff = buff.trim();
log::info!(target: "jsonrpc", "buffer read {buff}");
log::debug!(target: "jsonrpc", "buffer read `{buff}`");
let requ: Request<Value> =
serde_json::from_str(&buff).map_err(|err| {
io::Error::new(io::ErrorKind::Other, format!("{err}"))
Expand All @@ -255,47 +212,20 @@ impl<T: Send + Sync + 'static> JSONRPCv2<T> {
},
};
log::trace!(target: "jsonrpc", "send response: `{:?}`", response);
self.send_resp(addr.to_string(), response);
}

if event.is_writable() {
let stream = self.conn.get(addr);
if stream.is_none() {
log::error!(target: "jsonrpc", "connection not found `{addr}`");
continue;
};

let mut stream = stream.unwrap();
let Some(resp) = self.pop_resp(addr.to_string()) else {
break;
};
let buff = serde_json::to_string(&resp).unwrap();
let buff = serde_json::to_string(&response).unwrap();
if let Err(err) = stream.write_all(buff.as_bytes()) {
if err.kind() != ErrorKind::WouldBlock {
return Err(err);
}
continue;
}
match stream.flush() {
// In this case, we've written all the data, we
// are no longer interested in writing to this
// socket.
Ok(()) => {
event.source.unset(popol::interest::WRITE);
}
// In this case, the write couldn't complete. Set
// our interest to `WRITE` to be notified when the
// socket is ready to write again.
Err(err)
if [io::ErrorKind::WouldBlock, io::ErrorKind::WriteZero]
.contains(&err.kind()) =>
{
event.source.set(popol::interest::WRITE);
}
Err(err) => {
log::error!(target: "jsonrpc", "{}: Write error: {}", addr, err.to_string());

if let Err(err) = stream.flush() {
if err.kind() != ErrorKind::WouldBlock {
return Err(err);
}
continue;
}
stream.shutdown(std::net::Shutdown::Both)?;
}
}
}
Expand Down

0 comments on commit 54d2876

Please sign in to comment.