From f46cffcae1baec935b1508c7af05c0c9fd8e41a4 Mon Sep 17 00:00:00 2001 From: Antoni Spaanderman <56turtle56@gmail.com> Date: Tue, 22 Aug 2023 22:01:21 +0200 Subject: [PATCH] allow breaking inside for_blocks closures using ControlFlow --- src/daemon.rs | 5 +++-- src/index.rs | 1 + src/p2p.rs | 21 +++++++++++++++------ src/status.rs | 11 +++++++++-- src/tracker.rs | 19 ++++++++++++++----- 5 files changed, 42 insertions(+), 15 deletions(-) diff --git a/src/daemon.rs b/src/daemon.rs index 19e440308..011abdd7a 100644 --- a/src/daemon.rs +++ b/src/daemon.rs @@ -8,6 +8,7 @@ use serde_json::{json, Value}; use std::fs::File; use std::io::Read; +use std::ops::ControlFlow; use std::path::Path; use crate::{ @@ -229,10 +230,10 @@ impl Daemon { self.p2p.lock().get_new_headers(chain) } - pub(crate) fn for_blocks(&self, blockhashes: B, func: F) -> Result<()> + pub(crate) fn for_blocks(&self, blockhashes: B, func: F) -> Result> where B: IntoIterator, - F: FnMut(BlockHash, SerBlock), + F: FnMut(BlockHash, SerBlock) -> ControlFlow, { self.p2p.lock().for_blocks(blockhashes, func) } diff --git a/src/index.rs b/src/index.rs index d48bf2150..00ff770f2 100644 --- a/src/index.rs +++ b/src/index.rs @@ -210,6 +210,7 @@ impl Index { index_single_block(blockhash, block, height, &mut batch); }); self.stats.height.set("tip", height as f64); + ControlFlow::Continue::<()>(()) })?; let heights: Vec<_> = heights.collect(); assert!( diff --git a/src/p2p.rs b/src/p2p.rs index e3c28cf09..10cf73f55 100644 --- a/src/p2p.rs +++ b/src/p2p.rs @@ -22,6 +22,7 @@ use crossbeam_channel::{bounded, select, Receiver, Sender}; use std::io::{self, ErrorKind, Write}; use std::net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream}; +use std::ops::ControlFlow; use std::sync::Arc; use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; @@ -94,21 +95,26 @@ impl Connection { /// Request and process the specified blocks (in the specified order). /// See https://en.bitcoin.it/wiki/Protocol_documentation#getblocks for details. /// Defined as `&mut self` to prevent concurrent invocations (https://github.com/romanz/electrs/pull/526#issuecomment-934685515). - pub(crate) fn for_blocks(&mut self, blockhashes: B, mut func: F) -> Result<()> + pub(crate) fn for_blocks( + &mut self, + blockhashes: B, + mut func: F, + ) -> Result> where B: IntoIterator, - F: FnMut(BlockHash, SerBlock), + F: FnMut(BlockHash, SerBlock) -> ControlFlow, { self.blocks_duration.observe_duration("total", || { let blockhashes: Vec = blockhashes.into_iter().collect(); if blockhashes.is_empty() { - return Ok(()); + return Ok(ControlFlow::Continue(())); } self.blocks_duration.observe_duration("request", || { debug!("loading {} blocks", blockhashes.len()); self.req_send.send(Request::get_blocks(&blockhashes)) })?; + let mut ret = ControlFlow::Continue(()); for hash in blockhashes { let block = self.blocks_duration.observe_duration("response", || { let block = self @@ -124,10 +130,13 @@ impl Connection { ); Ok(block) })?; - self.blocks_duration - .observe_duration("process", || func(hash, block)); + if ret.is_continue() { + ret = self + .blocks_duration + .observe_duration("process", || func(hash, block)); + } } - Ok(()) + Ok(ret) }) } diff --git a/src/status.rs b/src/status.rs index da6065349..821835c09 100644 --- a/src/status.rs +++ b/src/status.rs @@ -308,10 +308,15 @@ impl ScriptHashStatus { } /// Apply func only on the new blocks (fetched from daemon). - fn for_new_blocks(&self, blockhashes: B, daemon: &Daemon, func: F) -> Result<()> + fn for_new_blocks( + &self, + blockhashes: B, + daemon: &Daemon, + func: F, + ) -> Result> where B: IntoIterator, - F: FnMut(BlockHash, SerBlock), + F: FnMut(BlockHash, SerBlock) -> ControlFlow, { daemon.for_blocks( blockhashes @@ -347,6 +352,7 @@ impl ScriptHashStatus { .or_insert_with(|| TxEntry::new(filtered_outputs.txid)) .outputs = filtered_outputs.result; } + ControlFlow::Continue::<()>(()) })?; let spending_blockhashes: HashSet = outpoints .par_iter() @@ -361,6 +367,7 @@ impl ScriptHashStatus { .or_insert_with(|| TxEntry::new(filtered_inputs.txid)) .spent = filtered_inputs.result; } + ControlFlow::Continue::<()>(()) })?; Ok(result diff --git a/src/tracker.rs b/src/tracker.rs index ea32cf88c..68f5cf4be 100644 --- a/src/tracker.rs +++ b/src/tracker.rs @@ -5,6 +5,7 @@ use bitcoin_slices::{ Error::VisitBreak, Visit, }; +use std::ops::ControlFlow; use crate::{ cache::Cache, @@ -105,17 +106,25 @@ impl Tracker { ) -> Result> { // Note: there are two blocks with coinbase transactions having same txid (see BIP-30) let blockhashes = self.index.filter_by_txid(txid); - let mut result = None; - daemon.for_blocks(blockhashes, |blockhash, block| { + let result = daemon.for_blocks(blockhashes, |blockhash, block| { let mut visitor = FindTransaction::new(txid); match bsl::Block::visit(&block, &mut visitor) { Ok(_) | Err(VisitBreak) => (), Err(e) => panic!("core returned invalid block: {:?}", e), } - if let Some(tx) = visitor.tx_found() { - result = Some((blockhash, tx)); + match visitor.tx_found() { + Some(tx) => ControlFlow::Break((blockhash, tx)), + None => ControlFlow::Continue(()), } })?; - Ok(result) + Ok(control_flow_break_value(result)) + } +} + +/// See unstable ControlFlow::break_value (https://github.com/rust-lang/rust/issues/75744) +fn control_flow_break_value(value: ControlFlow) -> Option { + match value { + ControlFlow::Continue(..) => None, + ControlFlow::Break(x) => Some(x), } }