Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: electrum server graceful shutdown doesn't work #62

Open
wants to merge 1 commit into
base: new-index
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions src/bin/electrs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,20 @@ fn run_server(config: Arc<Config>) -> Result<()> {
loop {
if let Err(err) = signal.wait(Duration::from_secs(5), true) {
info!("stopping server: {}", err);

electrs::util::spawn_thread("shutdown-thread-checker", || {
let mut counter = 40;
let interval_ms = 500;

while counter > 0 {
electrs::util::with_spawned_threads(|threads| {
debug!("Threads during shutdown: {:?}", threads);
});
std::thread::sleep(std::time::Duration::from_millis(interval_ms));
counter -= 1;
}
});

rest_server.stop();
// the electrum server is stopped when dropped
break;
Expand Down Expand Up @@ -133,4 +147,7 @@ fn main() {
error!("server failed: {}", e.display_chain());
process::exit(1);
}
electrs::util::with_spawned_threads(|threads| {
debug!("Threads before closing: {:?}", threads);
});
}
164 changes: 124 additions & 40 deletions src/electrum/server.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use std::collections::HashMap;
use std::io::{BufRead, BufReader, Write};
use std::net::{Shutdown, SocketAddr, TcpListener, TcpStream};
use std::sync::mpsc::{Sender, SyncSender, TrySendError};
use std::sync::atomic::AtomicBool;
use std::sync::mpsc::{Receiver, Sender};
use std::sync::{Arc, Mutex};
use std::thread;

Expand Down Expand Up @@ -100,6 +101,7 @@ struct Connection {
chan: SyncChannel<Message>,
stats: Arc<Stats>,
txs_limit: usize,
die_please: Option<Receiver<()>>,
#[cfg(feature = "electrum-discovery")]
discovery: Option<Arc<DiscoveryManager>>,
}
Expand All @@ -111,6 +113,7 @@ impl Connection {
addr: SocketAddr,
stats: Arc<Stats>,
txs_limit: usize,
die_please: Receiver<()>,
#[cfg(feature = "electrum-discovery")] discovery: Option<Arc<DiscoveryManager>>,
) -> Connection {
Connection {
Expand All @@ -122,6 +125,7 @@ impl Connection {
chan: SyncChannel::new(10),
stats,
txs_limit,
die_please: Some(die_please),
#[cfg(feature = "electrum-discovery")]
discovery,
}
Expand Down Expand Up @@ -501,40 +505,52 @@ impl Connection {
Ok(())
}

fn handle_replies(&mut self) -> Result<()> {
fn handle_replies(&mut self, shutdown: crossbeam_channel::Receiver<()>) -> Result<()> {
let empty_params = json!([]);
loop {
let msg = self.chan.receiver().recv().chain_err(|| "channel closed")?;
trace!("RPC {:?}", msg);
match msg {
Message::Request(line) => {
let cmd: Value = from_str(&line).chain_err(|| "invalid JSON format")?;
let reply = match (
cmd.get("method"),
cmd.get("params").unwrap_or_else(|| &empty_params),
cmd.get("id"),
) {
(
Some(&Value::String(ref method)),
&Value::Array(ref params),
Some(ref id),
) => self.handle_command(method, params, id)?,
_ => bail!("invalid command: {}", cmd),
};
self.send_values(&[reply])?
crossbeam_channel::select! {
recv(self.chan.receiver()) -> msg => {
let msg = msg.chain_err(|| "channel closed")?;
trace!("RPC {:?}", msg);
match msg {
Message::Request(line) => {
let cmd: Value = from_str(&line).chain_err(|| "invalid JSON format")?;
let reply = match (
cmd.get("method"),
cmd.get("params").unwrap_or(&empty_params),
cmd.get("id"),
) {
(Some(Value::String(method)), Value::Array(params), Some(id)) => {
self.handle_command(method, params, id)?
}
_ => bail!("invalid command: {}", cmd),
};
self.send_values(&[reply])?
}
Message::PeriodicUpdate => {
let values = self
.update_subscriptions()
.chain_err(|| "failed to update subscriptions")?;
self.send_values(&values)?
}
Message::Done => {
self.chan.close();
return Ok(());
}
}
}
Message::PeriodicUpdate => {
let values = self
.update_subscriptions()
.chain_err(|| "failed to update subscriptions")?;
self.send_values(&values)?
recv(shutdown) -> _ => {
self.chan.close();
return Ok(());
}
Message::Done => return Ok(()),
}
}
}

fn handle_requests(mut reader: BufReader<TcpStream>, tx: SyncSender<Message>) -> Result<()> {
fn handle_requests(
mut reader: BufReader<TcpStream>,
tx: crossbeam_channel::Sender<Message>,
) -> Result<()> {
loop {
let mut line = Vec::<u8>::new();
reader
Expand Down Expand Up @@ -566,8 +582,24 @@ impl Connection {
self.stats.clients.inc();
let reader = BufReader::new(self.stream.try_clone().expect("failed to clone TcpStream"));
let tx = self.chan.sender();

let die_please = self.die_please.take().unwrap();
let (reply_killer, reply_receiver) = crossbeam_channel::unbounded();

// We create a clone of the stream and put it in an Arc
// This will drop at the end of the function.
let arc_stream = Arc::new(self.stream.try_clone().expect("failed to clone TcpStream"));
// We don't want to keep the stream alive until SIGINT
// It should drop (close) no matter what.
let maybe_stream = Arc::downgrade(&arc_stream);
spawn_thread("properly-die", move || {
let _ = die_please.recv();
let _ = maybe_stream.upgrade().map(|s| s.shutdown(Shutdown::Both));
let _ = reply_killer.send(());
});

let child = spawn_thread("reader", || Connection::handle_requests(reader, tx));
if let Err(e) = self.handle_replies() {
if let Err(e) = self.handle_replies(reply_receiver) {
error!(
"[{}] connection handling failed: {}",
self.addr,
Expand All @@ -580,6 +612,8 @@ impl Connection {
.sub(self.status_hashes.len() as i64);

debug!("[{}] shutting down connection", self.addr);
// Drop the Arc so that the stream properly closes.
drop(arc_stream);
let _ = self.stream.shutdown(Shutdown::Both);
if let Err(err) = child.join().expect("receiver panicked") {
error!("[{}] receiver failed: {}", self.addr, err);
Expand Down Expand Up @@ -633,30 +667,38 @@ struct Stats {
impl RPC {
fn start_notifier(
notification: Channel<Notification>,
senders: Arc<Mutex<Vec<SyncSender<Message>>>>,
senders: Arc<Mutex<Vec<crossbeam_channel::Sender<Message>>>>,
acceptor: Sender<Option<(TcpStream, SocketAddr)>>,
acceptor_shutdown: Sender<()>,
) {
spawn_thread("notification", move || {
for msg in notification.receiver().iter() {
let mut senders = senders.lock().unwrap();
match msg {
Notification::Periodic => {
for sender in senders.split_off(0) {
if let Err(TrySendError::Disconnected(_)) =
if let Err(crossbeam_channel::TrySendError::Disconnected(_)) =
sender.try_send(Message::PeriodicUpdate)
{
continue;
}
senders.push(sender);
}
}
Notification::Exit => acceptor.send(None).unwrap(), // mark acceptor as done
Notification::Exit => {
acceptor_shutdown.send(()).unwrap(); // Stop the acceptor itself
acceptor.send(None).unwrap(); // mark acceptor as done
break;
}
}
}
});
}

fn start_acceptor(addr: SocketAddr) -> Channel<Option<(TcpStream, SocketAddr)>> {
fn start_acceptor(
addr: SocketAddr,
shutdown_channel: Channel<()>,
) -> Channel<Option<(TcpStream, SocketAddr)>> {
let chan = Channel::unbounded();
let acceptor = chan.sender();
spawn_thread("acceptor", move || {
Expand All @@ -666,10 +708,29 @@ impl RPC {
.set_nonblocking(false)
.expect("cannot set nonblocking to false");
let listener = TcpListener::from(socket);
let local_addr = listener.local_addr().unwrap();
let shutdown_bool = Arc::new(AtomicBool::new(false));

{
let shutdown_bool = Arc::clone(&shutdown_bool);
crate::util::spawn_thread("shutdown-acceptor", move || {
// Block until shutdown is sent.
let _ = shutdown_channel.receiver().recv();
// Store the bool so after the next accept it will break the loop
shutdown_bool.store(true, std::sync::atomic::Ordering::Release);
// Connect to the socket to cause it to unblock
let _ = TcpStream::connect(local_addr);
});
}

info!("Electrum RPC server running on {}", addr);
loop {
let (stream, addr) = listener.accept().expect("accept failed");

if shutdown_bool.load(std::sync::atomic::Ordering::Acquire) {
break;
}

stream
.set_nonblocking(false)
.expect("failed to set connection as blocking");
Expand Down Expand Up @@ -726,10 +787,18 @@ impl RPC {
RPC {
notification: notification.sender(),
server: Some(spawn_thread("rpc", move || {
let senders = Arc::new(Mutex::new(Vec::<SyncSender<Message>>::new()));

let acceptor = RPC::start_acceptor(rpc_addr);
RPC::start_notifier(notification, senders.clone(), acceptor.sender());
let senders =
Arc::new(Mutex::new(Vec::<crossbeam_channel::Sender<Message>>::new()));

let acceptor_shutdown = Channel::unbounded();
let acceptor_shutdown_sender = acceptor_shutdown.sender();
let acceptor = RPC::start_acceptor(rpc_addr, acceptor_shutdown);
RPC::start_notifier(
notification,
senders.clone(),
acceptor.sender(),
acceptor_shutdown_sender,
);

let mut threads = HashMap::new();
let (garbage_sender, garbage_receiver) = crossbeam_channel::unbounded();
Expand All @@ -740,6 +809,11 @@ impl RPC {
let senders = Arc::clone(&senders);
let stats = Arc::clone(&stats);
let garbage_sender = garbage_sender.clone();

// Kill the peers properly
let (killer, peace_receiver) = std::sync::mpsc::channel();
let killer_clone = killer.clone();

#[cfg(feature = "electrum-discovery")]
let discovery = discovery.clone();

Expand All @@ -751,34 +825,41 @@ impl RPC {
addr,
stats,
txs_limit,
peace_receiver,
#[cfg(feature = "electrum-discovery")]
discovery,
);
senders.lock().unwrap().push(conn.chan.sender());
conn.run();
info!("[{}] disconnected peer", addr);
let _ = killer_clone.send(());
let _ = garbage_sender.send(std::thread::current().id());
});

trace!("[{}] spawned {:?}", addr, spawned.thread().id());
threads.insert(spawned.thread().id(), spawned);
threads.insert(spawned.thread().id(), (spawned, killer));
while let Ok(id) = garbage_receiver.try_recv() {
if let Some(thread) = threads.remove(&id) {
if let Some((thread, killer)) = threads.remove(&id) {
trace!("[{}] joining {:?}", addr, id);
let _ = killer.send(());
if let Err(error) = thread.join() {
error!("failed to join {:?}: {:?}", id, error);
}
}
}
}
// Drop these
drop(acceptor);
drop(garbage_receiver);

trace!("closing {} RPC connections", senders.lock().unwrap().len());
for sender in senders.lock().unwrap().iter() {
let _ = sender.send(Message::Done);
let _ = sender.try_send(Message::Done);
}

for (id, thread) in threads {
for (id, (thread, killer)) in threads {
trace!("joining {:?}", id);
let _ = killer.send(());
if let Err(error) = thread.join() {
error!("failed to join {:?}: {:?}", id, error);
}
Expand All @@ -802,5 +883,8 @@ impl Drop for RPC {
handle.join().unwrap();
}
trace!("RPC server is stopped");
crate::util::with_spawned_threads(|threads| {
trace!("Threads after dropping RPC: {:?}", threads);
});
}
}
2 changes: 1 addition & 1 deletion src/elements/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ impl AssetRegistry {
}

pub fn spawn_sync(asset_db: Arc<RwLock<AssetRegistry>>) -> thread::JoinHandle<()> {
thread::spawn(move || loop {
crate::util::spawn_thread("asset-registry", move || loop {
if let Err(e) = asset_db.write().unwrap().fs_sync() {
error!("registry fs_sync failed: {:?}", e);
}
Expand Down
5 changes: 2 additions & 3 deletions src/new_index/fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use std::collections::HashMap;
use std::fs;
use std::io::Cursor;
use std::path::PathBuf;
use std::sync::mpsc::Receiver;
use std::thread;

use crate::chain::{Block, BlockHash};
Expand Down Expand Up @@ -44,12 +43,12 @@ pub struct BlockEntry {
type SizedBlock = (Block, u32);

pub struct Fetcher<T> {
receiver: Receiver<T>,
receiver: crossbeam_channel::Receiver<T>,
thread: thread::JoinHandle<()>,
}

impl<T> Fetcher<T> {
fn from(receiver: Receiver<T>, thread: thread::JoinHandle<()>) -> Self {
fn from(receiver: crossbeam_channel::Receiver<T>, thread: thread::JoinHandle<()>) -> Self {
Fetcher { receiver, thread }
}

Expand Down
2 changes: 1 addition & 1 deletion src/rest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -594,7 +594,7 @@ pub fn start(config: Arc<Config>, query: Arc<Query>) -> Handle {

Handle {
tx,
thread: thread::spawn(move || {
thread: crate::util::spawn_thread("rest-server", move || {
run_server(config, query, rx);
}),
}
Expand Down
3 changes: 1 addition & 2 deletions src/signal.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use crossbeam_channel as channel;
use crossbeam_channel::RecvTimeoutError;
use std::thread;
use std::time::{Duration, Instant};

use signal_hook::consts::{SIGINT, SIGTERM, SIGUSR1};
Expand All @@ -16,7 +15,7 @@ fn notify(signals: &[i32]) -> channel::Receiver<i32> {
let (s, r) = channel::bounded(1);
let mut signals =
signal_hook::iterator::Signals::new(signals).expect("failed to register signal hook");
thread::spawn(move || {
crate::util::spawn_thread("signal-notifier", move || {
for signal in signals.forever() {
s.send(signal)
.unwrap_or_else(|_| panic!("failed to send signal {}", signal));
Expand Down
Loading