Skip to content

Commit

Permalink
main loop listen also to block notify from zmq
Browse files Browse the repository at this point in the history
  • Loading branch information
RCasatta committed Sep 23, 2024
1 parent ae9de03 commit bfb2985
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 11 deletions.
35 changes: 28 additions & 7 deletions src/bin/electrs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ extern crate electrs;

use error_chain::ChainedError;
use std::process;
use std::sync::mpsc::channel;
use std::sync::{Arc, RwLock};
use std::time::Duration;

Expand Down Expand Up @@ -45,8 +46,9 @@ fn run_server(config: Arc<Config>) -> Result<()> {
let metrics = Metrics::new(config.monitoring_addr);
metrics.start();

let (block_hash_notify, block_hash_receive) = channel();
if let Some(zmq_addr) = config.zmq_addr.as_ref() {
zmq::start(&format!("tcp://{zmq_addr}"), None);
zmq::start(&format!("tcp://{zmq_addr}"), Some(block_hash_notify));
}

let daemon = Arc::new(Daemon::new(
Expand Down Expand Up @@ -123,14 +125,33 @@ fn run_server(config: Arc<Config>) -> Result<()> {
"count of iterations of electrs main loop each 5 seconds or after interrupts",
));

loop {
'outer: loop {
main_loop_count.inc();

if let Err(err) = signal.wait(Duration::from_secs(5), true) {
info!("stopping server: {}", err);
rest_server.stop();
// the electrum server is stopped when dropped
break;
// In the next for loop:
// We are going to wait 5 secs (50*100ms) if nothings happens.
// We are stopping if we receive a TERM or INT signal.
// We are interrupting the wait if we receive a block hash notification (if zmq enabled) or receiving a USR1 signal.
for _ in 0..50 {
match signal.wait(Duration::from_millis(100), true) {
Ok(is_sigusr) if is_sigusr => break,
Ok(_) => (),
Err(err) => {
info!("stopping server: {}", err);
rest_server.stop();
// the electrum server is stopped when dropped
break 'outer;
}
}

if let Ok(block_hash) = block_hash_receive.try_recv() {
debug!("Main loop notified of a new block {block_hash}");
while let Ok(block_hash) = block_hash_receive.try_recv() {
// let's deplete the queue in the case there is a block burst
debug!("Main loop notified of a new block {block_hash}");
}
break;
}
}

// Index new blocks
Expand Down
8 changes: 4 additions & 4 deletions src/signal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,24 +35,24 @@ impl Waiter {
}
}

pub fn wait(&self, duration: Duration, accept_sigusr: bool) -> Result<()> {
pub fn wait(&self, duration: Duration, accept_sigusr: bool) -> Result<bool> {
// Determine the deadline time based on the duration, so that it doesn't
// get pushed back when wait_deadline() recurses
self.wait_deadline(Instant::now() + duration, accept_sigusr)
}

fn wait_deadline(&self, deadline: Instant, accept_sigusr: bool) -> Result<()> {
fn wait_deadline(&self, deadline: Instant, accept_sigusr: bool) -> Result<bool> {
match self.receiver.recv_deadline(deadline) {
Ok(sig) if sig == SIGUSR1 => {
trace!("notified via SIGUSR1");
if accept_sigusr {
Ok(())
Ok(true)
} else {
self.wait_deadline(deadline, accept_sigusr)
}
}
Ok(sig) => bail!(ErrorKind::Interrupt(sig)),
Err(RecvTimeoutError::Timeout) => Ok(()),
Err(RecvTimeoutError::Timeout) => Ok(false),
Err(RecvTimeoutError::Disconnected) => bail!("signal hook channel disconnected"),
}
}
Expand Down

0 comments on commit bfb2985

Please sign in to comment.