diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 019fb6c..34cb213 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -50,6 +50,7 @@ target_link_libraries(monero-lws-daemon-common monero-lws-common monero-lws-db monero-lws-rpc + monero-lws-rpc-scanner monero-lws-wire-json monero-lws-util ${Boost_CHRONO_LIBRARY} @@ -66,6 +67,7 @@ target_link_libraries(monero-lws-daemon PRIVATE monero::libraries monero-lws-daemon-common + monero-lws-rpc-scanner ${Boost_PROGRAM_OPTIONS_LIBRARY} ${Boost_FILESYSTEM_LIBRARY} ) @@ -82,5 +84,18 @@ target_link_libraries(monero-lws-admin Threads::Threads ) +add_executable(monero-lws-client client_main.cpp) +target_link_libraries(monero-lws-client + PRIVATE + monero::libraries + monero-lws-common + monero-lws-daemon-common + monero-lws-rpc + monero-lws-rpc-scanner + ${Boost_PROGRAM_OPTIONS_LIBRARY} + Threads::Threads +) + install(TARGETS monero-lws-daemon DESTINATION bin) install(TARGETS monero-lws-admin DESTINATION bin) +install(TARGETS monero-lws-client DESTINATION bin) diff --git a/src/client_main.cpp b/src/client_main.cpp new file mode 100644 index 0000000..08bf9f2 --- /dev/null +++ b/src/client_main.cpp @@ -0,0 +1,390 @@ +// Copyright (c) 2024, The Monero Project +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without modification, are +// permitted provided that the following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of +// conditions and the following disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list +// of conditions and the following disclaimer in the documentation and/or other +// materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be +// used to endorse or promote products derived from this software without specific +// prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY +// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL +// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, +// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF +// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "common/command_line.h" // monero/src/ +#include "common/expect.h" // monero/src/ +#include "common/util.h" // monero/src/ +#include "config.h" +#include "cryptonote_config.h" // monero/src/ +#include "db/storage.h" +#include "error.h" +#include "options.h" +#include "misc_log_ex.h" // monero/contrib/epee/include +#include "rpc/scanner/client.h" +#include "rpc/scanner/write_commands.h" +#include "scanner.h" + +#undef MONERO_DEFAULT_LOG_CATEGORY +#define MONERO_DEFAULT_LOG_CATEGORY "lws" + +namespace +{ + struct options + { + const command_line::arg_descriptor config_file; + const command_line::arg_descriptor log_level; + const command_line::arg_descriptor lws_daemon; + const command_line::arg_descriptor lws_pass; + const command_line::arg_descriptor monerod_rpc; + const command_line::arg_descriptor monerod_sub; + const command_line::arg_descriptor network; + const command_line::arg_descriptor scan_threads; + + static std::string get_default_zmq() + { + static constexpr const char base[] = "tcp://127.0.0.1:"; + switch (lws::config::network) + { + case cryptonote::TESTNET: + return base + std::to_string(config::testnet::ZMQ_RPC_DEFAULT_PORT); + case cryptonote::STAGENET: + return base + std::to_string(config::stagenet::ZMQ_RPC_DEFAULT_PORT); + case cryptonote::MAINNET: + default: + break; + } + return base + std::to_string(config::ZMQ_RPC_DEFAULT_PORT); + } + + options() + : config_file{"config-file", "Specify any option in a config file; = on separate lines"} + , log_level{"log-level", "Log level [0-4]", 1} + , lws_daemon{"lws-daemon", "Specify monero-lws-daemon main process <[ip:]port>", ""} + , lws_pass{"lws-pass", "Specify monero-lws-daemon password", ""} + , monerod_rpc{"monerod-rpc", "Specify monero ZMQ RPC server or ", get_default_zmq()} + , monerod_sub{"monerod-sub", "Specify monero ZMQ RPC server or ", ""} + , network{"network", "<\"main\"|\"stage\"|\"test\"> - Blockchain net type", "main"} + , scan_threads{"scan-threads", "Number of scan threads", boost::thread::hardware_concurrency()} + {} + + void prepare(boost::program_options::options_description& description) const + { + command_line::add_arg(description, config_file); + command_line::add_arg(description, log_level); + command_line::add_arg(description, lws_daemon); + command_line::add_arg(description, lws_pass); + command_line::add_arg(description, monerod_rpc); + command_line::add_arg(description, monerod_sub); + command_line::add_arg(description, network); + command_line::add_arg(description, scan_threads); + command_line::add_arg(description, command_line::arg_help); + } + + void set_network(boost::program_options::variables_map const& args) const + { + const std::string net = command_line::get_arg(args, network); + if (net == "main") + lws::config::network = cryptonote::MAINNET; + else if (net == "stage") + lws::config::network = cryptonote::STAGENET; + else if (net == "test") + lws::config::network = cryptonote::TESTNET; + else + throw std::runtime_error{"Bad --network value"}; + } + }; + + struct program + { + std::string lws_daemon; + std::string lws_pass; + std::string monerod_rpc; + std::string monerod_sub; + std::size_t scan_threads; + }; + + void print_help(std::ostream& out) + { + boost::program_options::options_description description{"Options"}; + options{}.prepare(description); + + out << "Usage: [options]" << std::endl; + out << description; + } + + std::optional get_program(int argc, char** argv) + { + namespace po = boost::program_options; + + const options opts{}; + po::variables_map args{}; + { + po::options_description description{"Options"}; + opts.prepare(description); + + po::store( + po::command_line_parser(argc, argv).options(description).run(), args + ); + po::notify(args); + + if (!command_line::is_arg_defaulted(args, opts.config_file)) + { + boost::filesystem::path config_path{command_line::get_arg(args, opts.config_file)}; + if (!boost::filesystem::exists(config_path)) + MONERO_THROW(lws::error::configuration, "Config file does not exist"); + + po::store( + po::parse_config_file(config_path.string().c_str(), description), args + ); + po::notify(args); + } + } + + if (command_line::get_arg(args, command_line::arg_help)) + { + print_help(std::cout); + return std::nullopt; + } + + opts.set_network(args); // do this first, sets global variable :/ + mlog_set_log_level(command_line::get_arg(args, opts.log_level)); + + program prog{ + command_line::get_arg(args, opts.lws_daemon), + command_line::get_arg(args, opts.lws_pass), + command_line::get_arg(args, opts.monerod_rpc), + command_line::get_arg(args, opts.monerod_sub), + command_line::get_arg(args, opts.scan_threads) + }; + prog.scan_threads = std::max(std::size_t(1), prog.scan_threads); + + if (command_line::is_arg_defaulted(args, opts.monerod_rpc)) + prog.monerod_rpc = options::get_default_zmq(); + + return prog; + } + + struct send_users + { + std::shared_ptr client_; + + bool operator()(lws::rpc::client&, epee::span chain, epee::span users, epee::span pow, const lws::scanner_options&) + { + if (!client_) + return false; + if (users.empty()) + return true; + if (!pow.empty() || chain.empty()) + return false; + + std::vector chain_copy{}; + chain_copy.reserve(chain.size()); + std::copy(chain.begin(), chain.end(), std::back_inserter(chain_copy)); + + std::vector users_copy{}; + users_copy.reserve(users.size()); + for (const auto& user : users) + users_copy.push_back(user.clone()); + + MINFO("Processed " << chain.size() << " block(s) against " << users.size() << " account(s)"); + lws::rpc::scanner::client::send_update(client_, std::move(users_copy), std::move(chain_copy)); + return true; + } + }; + + void run_thread(lws::scanner_sync& self, std::shared_ptr client, lws::rpc::client& zclient, std::shared_ptr queue) + { + struct stop_ + { + lws::scanner_sync& self; + ~stop_() { self.stop(); } + } stop{self}; + + if (!client || !queue) + return; + + try + { + while (self.is_running()) + { + std::vector users{}; + auto status = queue->wait_for_accounts(); + if (status.replace) + users = std::move(*status.replace); + users.insert( + users.end(), + std::make_move_iterator(status.push.begin()), + std::make_move_iterator(status.push.end()) + ); + + if (!users.empty()) + { + static constexpr const lws::scanner_options opts{ + epee::net_utils::ssl_verification_t::system_ca, false, false + }; + + auto new_client = MONERO_UNWRAP(zclient.clone()); + MONERO_UNWRAP(new_client.watch_scan_signals()); + send_users send{client}; + if (!lws::scanner::loop(self.stop_, std::move(send), std::nullopt, std::move(new_client), std::move(users), *queue, opts, false)) + return; + } + } + } + catch (const std::exception& e) + { + self.shutdown(); + MERROR(e.what()); + } + catch (...) + { + self.shutdown(); + MERROR("Unknown error"); + } + } + + void run(program prog) + { + MINFO("Using monerod ZMQ RPC at " << prog.monerod_rpc); + auto ctx = lws::rpc::context::make(std::move(prog.monerod_rpc), std::move(prog.monerod_sub), {}, {}, std::chrono::minutes{0}, false); + + lws::scanner_sync self{}; + + /*! \NOTE `ctx will need a strand or lock if multiple threads use + `self.io.run()` in the future. */ + + boost::asio::signal_set signals{self.io_}; + signals.add(SIGINT); + signals.async_wait([&self] (const boost::system::error_code& error, int) + { + if (error != boost::asio::error::operation_aborted) + self.shutdown(); + }); + + for (;;) + { + if (self.stop_ && self.is_running()) + boost::this_thread::sleep_for(boost::chrono::seconds{1}); + + self.stop_ = false; + self.io_.reset(); + if (self.has_shutdown()) + return; + + std::vector zclients; + zclients.reserve(prog.scan_threads); + + std::vector> queues{}; + queues.resize(prog.scan_threads); + + for (auto& queue : queues) + { + queue = std::make_shared(); + zclients.push_back(MONERO_UNWRAP(ctx.connect())); + } + + auto client = std::make_shared( + self.io_, prog.lws_daemon, prog.lws_pass, queues + ); + lws::rpc::scanner::client::connect(client); + + std::vector threads{}; + threads.reserve(prog.scan_threads); + + struct stop_ + { + lws::scanner_sync& self; + lws::rpc::context& ctx; + std::vector> queues; + std::vector& threads; + + ~stop_() + { + self.stop(); + if (self.has_shutdown()) + ctx.raise_abort_process(); + else + ctx.raise_abort_scan(); + for (const auto& queue : queues) + { + if (queue) + queue->stop(); + } + for (auto& thread : threads) + thread.join(); + } + } stop{self, ctx, queues, threads}; + + boost::thread::attributes attrs; + attrs.set_stack_size(THREAD_STACK_SIZE); + + for (std::size_t i = 0; i < prog.scan_threads; ++i) + threads.emplace_back(attrs, std::bind(&run_thread, std::ref(self), client, std::ref(zclients[i]), queues[i])); + + self.io_.run(); + } // while scanner running + } +} // anonymous + +int main(int argc, char** argv) +{ + tools::on_startup(); // if it throws, don't use MERROR just print default msg + + try + { + std::optional prog; + + try + { + prog = get_program(argc, argv); + } + catch (std::exception const& e) + { + std::cerr << e.what() << std::endl << std::endl; + print_help(std::cerr); + return EXIT_FAILURE; + } + + if (prog) + run(std::move(*prog)); + } + catch (std::exception const& e) + { + MERROR(e.what()); + return EXIT_FAILURE; + } + catch (...) + { + MERROR("Unknown exception"); + return EXIT_FAILURE; + } + return EXIT_SUCCESS; + +} diff --git a/src/db/account.h b/src/db/account.h index 1904744..c78b2d9 100644 --- a/src/db/account.h +++ b/src/db/account.h @@ -40,7 +40,7 @@ #include "wire/msgpack/fwd.h" namespace lws -{ +{ //! Tracks a subset of DB account info for scanning/updating. class account { @@ -127,4 +127,18 @@ namespace lws //! Track a possible `spend`. void add_spend(db::spend const& spend); }; + + struct by_height + { + bool operator()(account const& left, account const& right) const noexcept + { + return left.scan_height() < right.scan_height(); + } + + bool operator()(db::account const& left, db::account const& right) const noexcept + { + return left.scan_height < right.scan_height; + } + }; + } // lws diff --git a/src/db/data.cpp b/src/db/data.cpp index 8b0b348..6c39207 100644 --- a/src/db/data.cpp +++ b/src/db/data.cpp @@ -457,20 +457,37 @@ namespace db map_webhook_value(dest, source, payment_id); } - void write_bytes(wire::writer& dest, const webhook_tx_confirmation& self) + namespace + { + template + void map_webhook_confirmation(F& format, T& self, U& payment_id) + { + wire::object(format, + wire::field<0>("event", std::ref(self.key.type)), + wire::field<1>("payment_id", std::ref(payment_id)), + wire::field<2>("token", std::ref(self.value.second.token)), + wire::field<3>("confirmations", std::ref(self.value.second.confirmations)), + wire::field<4>("event_id", std::ref(self.value.first.event_id)), + WIRE_FIELD_ID(5, tx_info) + ); + } + } + + void read_bytes(wire::reader& source, webhook_tx_confirmation& dest) + { + crypto::hash8 payment_id{}; + map_webhook_confirmation(source, dest, payment_id); + + static_assert(sizeof(payment_id) == sizeof(dest.value.first.payment_id), "bad memcpy"); + std::memcpy(std::addressof(dest.value.first.payment_id), std::addressof(payment_id), sizeof(payment_id)); + } + void write_bytes(wire::writer& dest, const webhook_tx_confirmation& source) { crypto::hash8 payment_id; - static_assert(sizeof(payment_id) == sizeof(self.value.first.payment_id), "bad memcpy"); - std::memcpy(std::addressof(payment_id), std::addressof(self.value.first.payment_id), sizeof(payment_id)); - // to be sent to remote url - wire::object(dest, - wire::field<0>("event", std::cref(self.key.type)), - wire::field<1>("payment_id", std::cref(payment_id)), - wire::field<2>("token", std::cref(self.value.second.token)), - wire::field<3>("confirmations", std::cref(self.value.second.confirmations)), - wire::field<4>("event_id", std::cref(self.value.first.event_id)), - WIRE_FIELD_ID(5, tx_info) - ); + static_assert(sizeof(payment_id) == sizeof(source.value.first.payment_id), "bad memcpy"); + std::memcpy(std::addressof(payment_id), std::addressof(source.value.first.payment_id), sizeof(payment_id)); + + map_webhook_confirmation(dest, source, payment_id); } static void write_bytes(wire::writer& dest, const output::spend_meta_& self) diff --git a/src/db/data.h b/src/db/data.h index 82c6b6e..9b96ee7 100644 --- a/src/db/data.h +++ b/src/db/data.h @@ -291,8 +291,7 @@ namespace db sizeof(output) == 8 + 32 + (8 * 3) + (4 * 2) + 32 + (8 * 2) + (32 * 3) + 7 + 1 + 32 + 8 + 2 * 4, "padding in output" ); - void read_bytes(wire::reader&, output&); - void write_bytes(wire::writer&, const output&); + WIRE_DECLARE_OBJECT(output); //! Information about a possible spend of a received `output`. struct spend @@ -384,7 +383,7 @@ namespace db webhook_value value; output tx_info; }; - void write_bytes(wire::writer&, const webhook_tx_confirmation&); + WIRE_DECLARE_OBJECT(webhook_tx_confirmation); //! Returned by DB when a webhook event "tripped" struct webhook_tx_spend diff --git a/src/db/storage.cpp b/src/db/storage.cpp index da4b590..caeac1c 100644 --- a/src/db/storage.cpp +++ b/src/db/storage.cpp @@ -934,6 +934,29 @@ namespace db return accounts.get_value(value); } + expect storage_reader::get_full_account(const account& user) + { + std::vector> receives{}; + std::vector pubs{}; + auto receive_list = get_outputs(user.id); + if (!receive_list) + return receive_list.error(); + + const std::size_t elems = receive_list->count(); + receives.reserve(elems); + pubs.reserve(elems); + + for (auto output = receive_list->make_iterator(); !output.is_end(); ++output) + { + auto id = output.get_value(); + auto subaddr = output.get_value(); + receives.emplace_back(std::move(id), std::move(subaddr)); + pubs.emplace_back(output.get_value()); + } + + return lws::account{user, std::move(receives), std::move(pubs)}; + } + expect> storage_reader::get_account(account_address const& address) noexcept { @@ -2810,8 +2833,13 @@ namespace db accounts_by_address.get_value(temp_value).value().status; MONERO_LMDB_CHECK(mdb_cursor_get(accounts_cur.get(), &key, &value, MDB_GET_BOTH)); } + + /* The check below is `<` instead of `!=` because of remote scanning - + a "check-in" can occur before the user accounts are replaced. + Duplicate writes should be supported as this (duplicate writes) + happened historically due to a different bug involving scan heights.*/ expect existing = accounts.get_value(value); - if (!existing || existing->scan_height != user->scan_height()) + if (!existing || existing->scan_height < user->scan_height()) continue; // to next account // Don't re-store data if already scanned diff --git a/src/db/storage.h b/src/db/storage.h index e813e57..6c1c587 100644 --- a/src/db/storage.h +++ b/src/db/storage.h @@ -40,6 +40,7 @@ #include "lmdb/transaction.h" #include "lmdb/key_stream.h" #include "lmdb/value_stream.h" +#include "wire/msgpack/fwd.h" namespace cryptonote { class checkpoints; } namespace lws @@ -132,6 +133,9 @@ namespace db //! \return Info for account `id` iff it has `status`. expect get_account(const account_status status, const account_id id) noexcept; + //! \return Account with outputs and spends + expect get_full_account(const account&); + //! \return Info related to `address`. expect> get_account(account_address const& address) noexcept; diff --git a/src/rpc/CMakeLists.txt b/src/rpc/CMakeLists.txt index ec6a3cb..e869fb7 100644 --- a/src/rpc/CMakeLists.txt +++ b/src/rpc/CMakeLists.txt @@ -1,4 +1,4 @@ -# Copyright (c) 2020, The Monero Project +# Copyright (c) 2020-2024, The Monero Project # # All rights reserved. # @@ -26,6 +26,8 @@ # STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF # THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +add_subdirectory(scanner) + set(monero-lws-rpc_sources admin.cpp client.cpp daemon_pub.cpp daemon_zmq.cpp light_wallet.cpp lws_pub.cpp rates.cpp) set(monero-lws-rpc_headers admin.h client.h daemon_pub.h daemon_zmq.h fwd.h json.h light_wallet.h lws_pub.h rates.h) diff --git a/src/rpc/client.cpp b/src/rpc/client.cpp index f2fb0c3..c38239f 100644 --- a/src/rpc/client.cpp +++ b/src/rpc/client.cpp @@ -223,63 +223,6 @@ namespace rpc }; } // detail - expect account_push::make(std::shared_ptr ctx) noexcept - { - MONERO_PRECOND(ctx != nullptr); - - account_push out{ctx}; - out.sock.reset(zmq_socket(ctx->comm.get(), ZMQ_PUSH)); - if (out.sock == nullptr) - return {net::zmq::get_error_code()}; - - const std::string bind = account_endpoint + std::to_string(++ctx->account_counter); - MONERO_CHECK(do_set_option(out.sock.get(), ZMQ_LINGER, account_zmq_linger)); - MONERO_ZMQ_CHECK(zmq_bind(out.sock.get(), bind.c_str())); - return {std::move(out)}; - } - - account_push::~account_push() noexcept - {} - - expect account_push::push(epee::span accounts, std::chrono::seconds timeout) - { - MONERO_PRECOND(ctx != nullptr); - assert(sock.get() != nullptr); - - for (const lws::account& account : accounts) - { - // use integer id values (quick and fast) - wire::msgpack_slice_writer dest{true}; - try - { - wire_write::bytes(dest, account); - } - catch (const wire::exception& e) - { - return {e.code()}; - } - epee::byte_slice message{dest.take_sink()}; - - /* This is being pushed by the thread that monitors for shutdown, so - no signal is expected. */ - expect sent; - const auto start = std::chrono::steady_clock::now(); - while (!(sent = net::zmq::send(message.clone(), sock.get(), ZMQ_DONTWAIT))) - { - if (sent != net::zmq::make_error_code(EAGAIN)) - return sent.error(); - if (!scanner::is_running()) - return {error::signal_abort_process}; - const auto elapsed = std::chrono::steady_clock::now() - start; - if (timeout <= elapsed) - return {error::daemon_timeout}; - - boost::this_thread::sleep_for(boost::chrono::milliseconds{10}); - } - } - return success(); - } - expect client::get_response(cryptonote::rpc::Message& response, const std::chrono::seconds timeout, const source_location loc) { expect message = get_message(timeout); @@ -376,18 +319,6 @@ namespace rpc return do_subscribe(signal_sub.get(), abort_scan_signal); } - expect client::enable_pull_accounts() - { - detail::socket new_sock{zmq_socket(ctx->comm.get(), ZMQ_PULL)}; - if (new_sock == nullptr) - return {net::zmq::get_error_code()}; - const std::string connect = - account_endpoint + std::to_string(ctx->account_counter); - MONERO_ZMQ_CHECK(zmq_connect(new_sock.get(), connect.c_str())); - account_pull = std::move(new_sock); - return success(); - } - expect>> client::wait_for_block() { MONERO_PRECOND(ctx != nullptr); @@ -501,32 +432,6 @@ namespace rpc return rc; } - expect> client::pull_accounts() - { - MONERO_PRECOND(ctx != nullptr); - - if (!account_pull) - MONERO_CHECK(enable_pull_accounts()); - - std::vector out{}; - for (;;) - { - expect next = net::zmq::receive(account_pull.get(), ZMQ_DONTWAIT); - if (!next) - { - if (net::zmq::make_error_code(EAGAIN)) - break; - return next.error(); - } - out.emplace_back(); - const std::error_code error = - wire::msgpack::from_bytes(epee::byte_slice{std::move(*next)}, out.back()); - if (error) - return error; - } - return {std::move(out)}; - } - expect client::get_rates() const { MONERO_PRECOND(ctx != nullptr); @@ -643,6 +548,13 @@ namespace rpc return ctx->daemon_addr; } + std::chrono::minutes context::cache_interval() const + { + if (ctx == nullptr) + MONERO_THROW(common_error::kInvalidArgument, "Invalid lws::rpc::context"); + return ctx->cache_interval; + } + expect context::raise_abort_scan() noexcept { MONERO_PRECOND(ctx != nullptr); diff --git a/src/rpc/client.h b/src/rpc/client.h index 9b9a161..6b992dd 100644 --- a/src/rpc/client.h +++ b/src/rpc/client.h @@ -68,31 +68,6 @@ namespace rpc std::string routing; }; - //! Every scanner "reset", a new socket is created so old messages are discarded - class account_push - { - std::shared_ptr ctx; - detail::socket sock; - - explicit account_push(std::shared_ptr ctx) noexcept - : ctx(std::move(ctx)), sock() - {} - - public: - static expect make(std::shared_ptr ctx) noexcept; - - account_push(const account_push&) = delete; - account_push(account_push&&) = default; - - ~account_push() noexcept; - - account_push& operator=(const account_push&) = delete; - account_push& operator=(account_push&&) = default; - - //! Push new `accounts` to worker threads. Each account is sent in unique message - expect push(epee::span accounts, std::chrono::seconds timeout); - }; - //! Abstraction for ZMQ RPC client. Only `get_rates()` thread-safe; use `clone()`. class client { @@ -100,10 +75,9 @@ namespace rpc detail::socket daemon; detail::socket daemon_sub; detail::socket signal_sub; - detail::socket account_pull; explicit client(std::shared_ptr ctx) noexcept - : ctx(std::move(ctx)), daemon(), daemon_sub(), signal_sub(), account_pull() + : ctx(std::move(ctx)), daemon(), daemon_sub(), signal_sub() {} //! Expect `response` as the next message payload unless error. @@ -156,9 +130,6 @@ namespace rpc //! `wait`, `send`, and `receive` will watch for `raise_abort_scan()`. expect watch_scan_signals() noexcept; - //! Register `this` client as listening for new accounts - expect enable_pull_accounts(); - //! Wait for new block announce or internal timeout. expect>> wait_for_block(); @@ -259,18 +230,15 @@ namespace rpc //! \return The full address of the monerod ZMQ daemon. std::string const& daemon_address() const; + //! \return Exchange rate checking interval + std::chrono::minutes cache_interval() const; + //! \return Client connection. Thread-safe. expect connect() const noexcept { return client::make(ctx); } - //! Create a new account push state - expect bind_push() const noexcept - { - return account_push::make(ctx); - } - /*! All block `client::send`, `client::receive`, and `client::wait` calls originating from `this` object AND whose `watch_scan_signal` method was diff --git a/src/rpc/fwd.h b/src/rpc/fwd.h index 7713c3a..a60fc84 100644 --- a/src/rpc/fwd.h +++ b/src/rpc/fwd.h @@ -34,4 +34,5 @@ namespace lws class client; } struct rates; + class scan_manager; } diff --git a/src/rpc/scanner/CMakeLists.txt b/src/rpc/scanner/CMakeLists.txt new file mode 100644 index 0000000..732dc3d --- /dev/null +++ b/src/rpc/scanner/CMakeLists.txt @@ -0,0 +1,37 @@ +# Copyright (c) 2024, The Monero Project +# +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without modification, are +# permitted provided that the following conditions are met: +# +# 1. Redistributions of source code must retain the above copyright notice, this list of +# conditions and the following disclaimer. +# +# 2. Redistributions in binary form must reproduce the above copyright notice, this list +# of conditions and the following disclaimer in the documentation and/or other +# materials provided with the distribution. +# +# 3. Neither the name of the copyright holder nor the names of its contributors may be +# used to endorse or promote products derived from this software without specific +# prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY +# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +# MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL +# THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, +# STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF +# THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +set(monero-lws-rpc-scanner_sources + client.cpp commands.cpp connection.cpp queue.cpp server.cpp write_commands.cpp +) +set(monero-lws-rpc-scanner_headers + client.h commands.h connection.h fwd.h queue.h read_commands.h server.h write_commands.h +) + +add_library(monero-lws-rpc-scanner ${monero-lws-rpc-scanner_sources} ${monero-lws-rpc-scanner_headers}) +target_link_libraries(monero-lws-rpc-scanner monero::libraries monero-lws-wire-msgpack) diff --git a/src/rpc/scanner/client.cpp b/src/rpc/scanner/client.cpp new file mode 100644 index 0000000..989cc7a --- /dev/null +++ b/src/rpc/scanner/client.cpp @@ -0,0 +1,254 @@ +// Copyright (c) 2024, The Monero Project +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without modification, are +// permitted provided that the following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of +// conditions and the following disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list +// of conditions and the following disclaimer in the documentation and/or other +// materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be +// used to endorse or promote products derived from this software without specific +// prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY +// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL +// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, +// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF +// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#include "client.h" + +#include +#include +#include +#include + +#include "common/expect.h" // monero/src +#include "misc_log_ex.h" // monero/contrib/epee/include +#include "net/net_utils_base.h" // monero/contrib/epee/include +#include "rpc/scanner/commands.h" +#include "rpc/scanner/connection.h" +#include "rpc/scanner/read_commands.h" +#include "rpc/scanner/server.h" +#include "rpc/scanner/write_commands.h" + +namespace lws { namespace rpc { namespace scanner +{ + namespace + { + //! Connection completion timeout + constexpr const std::chrono::seconds connect_timeout{5}; + + //! Retry connection timeout + constexpr const std::chrono::seconds reconnect_interval{10}; + + struct push_accounts_handler + { + using input = push_accounts; + static bool handle(const std::shared_ptr& self, input msg) + { + if (!self) + return false; + if (msg.users.empty()) + return true; + client::push_accounts(self, std::move(msg.users)); + return true; + } + }; + + struct replace_accounts_handler + { + using input = replace_accounts; + static bool handle(const std::shared_ptr& self, input msg) + { + if (!self) + return false; + // push empty accounts too, indicates we should stop scanning + client::replace_accounts(self, std::move(msg.users)); + return true; + } + }; + } // anonymous + + //! \brief Closes the socket, forcing all outstanding ops to cancel. + struct client::close + { + std::shared_ptr self_; + + void operator()(const boost::system::error_code& error = {}) const + { + if (self_ && error != boost::asio::error::operation_aborted) + { + // The `cleanup()` function is meant to cleanup then destruct connection + assert(self_->strand_.running_in_this_thread()); + boost::system::error_code error{}; + self_->sock_.shutdown(boost::asio::ip::tcp::socket::shutdown_both, error); + error = boost::system::error_code{}; + self_->sock_.close(error); + if (error) + MERROR("Error when closing socket: " << error.message()); + } + } + }; + + //! \brief + class client::connector : public boost::asio::coroutine + { + std::shared_ptr self_; + public: + explicit connector(std::shared_ptr self) + : boost::asio::coroutine(), self_(std::move(self)) + {} + + void operator()(const boost::system::error_code& error = {}) + { + if (!self_ || error == boost::asio::error::operation_aborted) + return; + + assert(self_->strand_.running_in_this_thread()); + BOOST_ASIO_CORO_REENTER(*this) + { + for (;;) + { + MINFO("Attempting connection to " << self_->server_address_); + self_->connect_timer_.expires_from_now(connect_timeout); + self_->connect_timer_.async_wait(self_->strand_.wrap(close{self_})); + + BOOST_ASIO_CORO_YIELD self_->sock_.async_connect(self_->server_address_, self_->strand_.wrap(*this)); + + if (!self_->connect_timer_.cancel() || error) + { + MERROR("Connection attempt failed: " << error.message()); + close{self_}(); + } + else + break; + + MINFO("Retrying connection in " << std::chrono::seconds{reconnect_interval}.count() << " seconds"); + self_->connect_timer_.expires_from_now(reconnect_interval); + BOOST_ASIO_CORO_YIELD self_->connect_timer_.async_wait(self_->strand_.wrap(*this)); + } + + MINFO("Connection made to " << self_->server_address_); + self_->connected_ = true; + const auto threads = boost::numeric_cast(self_->local_.size()); + write_command(self_, initialize{self_->pass_, threads}); + read_commands(self_); + } + } + }; + + client::client(boost::asio::io_service& io, const std::string& address, std::string pass, std::vector> local) + : connection(io), + local_(std::move(local)), + pass_(std::move(pass)), + next_push_(0), + connect_timer_(io), + server_address_(rpc::scanner::server::get_endpoint(address)), + connected_(false) + { + for (const auto& queue : local_) + { + if (!queue) + MONERO_THROW(common_error::kInvalidArgument, "nullptr local queue"); + } + } + + client::~client() + {} + + //! \return Handlers for commands from server + const std::array& client::commands() noexcept + { + static constexpr const std::array value{{ + call, + call + }}; + static_assert(push_accounts_handler::input::id() == 0); + static_assert(replace_accounts_handler::input::id() == 1); + return value; + } + + void client::connect(const std::shared_ptr& self) + { + if (!self) + MONERO_THROW(common_error::kInvalidArgument, "nullptr self"); + self->strand_.dispatch([self] () + { + if (!self->sock_.is_open()) + connector{self}(); + }); + } + + void client::push_accounts(const std::shared_ptr& self, std::vector users) + { + if (!self) + MONERO_THROW(common_error::kInvalidArgument, "nullptr self"); + + self->strand_.dispatch([self, users = std::move(users)] () mutable + { + /* Keep this algorithm simple, one user at a time. A little more difficult + to do multiples at once */ + MINFO("Adding " << users.size() << " new accounts to workload"); + for (std::size_t i = 0; i < users.size(); ++i) + { + self->local_[self->next_push_++]->push_accounts( + std::make_move_iterator(users.begin() + i), + std::make_move_iterator(users.begin() + i + 1) + ); + self->next_push_ %= self->local_.size(); + } + }); + } + + void client::replace_accounts(const std::shared_ptr& self, std::vector users) + { + if (!self) + MONERO_THROW(common_error::kInvalidArgument, "nullptr self"); + + self->strand_.dispatch([self, users = std::move(users)] () mutable + { + MINFO("Received " << users.size() << " accounts as new workload"); + for (std::size_t i = 0; i < self->local_.size(); ++i) + { + // count == 0 is OK. This will tell the thread to stop working + const auto count = users.size() / (self->local_.size() - i); + std::vector next{ + std::make_move_iterator(users.end() - count), + std::make_move_iterator(users.end()) + }; + users.erase(users.end() - count, users.end()); + self->local_[i]->replace_accounts(std::move(next)); + } + self->next_push_ = 0; + }); + } + + void client::send_update(const std::shared_ptr& self, std::vector users, std::vector blocks) + { + if (!self) + MONERO_THROW(common_error::kInvalidArgument, "nullptr self"); + + self->strand_.dispatch([self, users = std::move(users), blocks = std::move(blocks)] () mutable + { + if (!self->connected_) + MONERO_THROW(common_error::kInvalidArgument, "not connected"); + write_command(self, update_accounts{std::move(users), std::move(blocks)}); + }); + } + + void client::cleanup() + { + base_cleanup(); + GET_IO_SERVICE(sock_).stop(); + } +}}} // lws // rpc // scanner diff --git a/src/rpc/scanner/client.h b/src/rpc/scanner/client.h new file mode 100644 index 0000000..fa0831b --- /dev/null +++ b/src/rpc/scanner/client.h @@ -0,0 +1,87 @@ +// Copyright (c) 2024, The Monero Project +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without modification, are +// permitted provided that the following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of +// conditions and the following disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list +// of conditions and the following disclaimer in the documentation and/or other +// materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be +// used to endorse or promote products derived from this software without specific +// prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY +// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL +// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, +// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF +// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +#include "crypto/hash.h" // monero/src +#include "db/fwd.h" +#include "rpc/scanner/connection.h" +#include "rpc/scanner/queue.h" + +namespace lws { namespace rpc { namespace scanner +{ + //! \brief + class client : public connection + { + const std::vector> local_; + const std::string pass_; + std::size_t next_push_; + boost::asio::steady_timer connect_timer_; + const boost::asio::ip::tcp::endpoint server_address_; + bool connected_; + + struct close; + class connector; + + public: + using command = bool(*)(const std::shared_ptr&); + + //! Does not start connection to `address`, see `connect`. + explicit client(boost::asio::io_context& io, const std::string& address, std::string pass, std::vector> local); + + client(const client&) = delete; + client(client&&) = delete; + ~client(); + client& operator=(const client&) = delete; + client& operator=(client&&) = delete; + + //! \return Handlers for client commands + static const std::array& commands() noexcept; + + //! Start a connect loop on `self`. + static void connect(const std::shared_ptr& self); + + //! Push `users` to local queues. Synchronizes with latest connection. + static void push_accounts(const std::shared_ptr& self, std::vector users); + + //! Replace `users` on local queues. Synchronizes with latest connection. + static void replace_accounts(const std::shared_ptr& self, std::vector users); + + //! Send `users` upstream for disk storage + static void send_update(const std::shared_ptr& self, std::vector users, std::vector blocks); + + //! Closes socket and calls stop on `io_service`. + void cleanup(); + }; +}}} // lws // rpc // scanner diff --git a/src/rpc/scanner/commands.cpp b/src/rpc/scanner/commands.cpp new file mode 100644 index 0000000..3a13ec5 --- /dev/null +++ b/src/rpc/scanner/commands.cpp @@ -0,0 +1,85 @@ +// Copyright (c) 2024, The Monero Project +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without modification, are +// permitted provided that the following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of +// conditions and the following disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list +// of conditions and the following disclaimer in the documentation and/or other +// materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be +// used to endorse or promote products derived from this software without specific +// prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY +// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL +// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, +// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF +// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#include "commands.h" + +#include "db/account.h" +#include "db/data.h" +#include "wire/adapted/crypto.h" +#include "wire/vector.h" +#include "wire/msgpack.h" +#include "wire/wrapper/trusted_array.h" + +namespace lws { namespace rpc { namespace scanner +{ + namespace + { + template + void map_initialize(F& format, T& self) + { + wire::object(format, WIRE_FIELD_ID(0, pass), WIRE_FIELD_ID(1, threads)); + } + } + WIRE_MSGPACK_DEFINE_OBJECT(initialize, map_initialize); + + namespace + { + template + void map_account_update(F& format, T& self) + { + wire::object(format, + wire::optional_field<0>("users", wire::trusted_array(std::ref(self.users))), + wire::optional_field<1>("blocks", wire::trusted_array(std::ref(self.blocks))) + ); + } + } + WIRE_MSGPACK_DEFINE_OBJECT(update_accounts, map_account_update) + + namespace + { + template + void map_push_accounts(F& format, T& self) + { + wire::object(format, + wire::optional_field<0>("users", wire::trusted_array(std::ref(self.users))) + ); + } + } + WIRE_MSGPACK_DEFINE_OBJECT(push_accounts, map_push_accounts); + + namespace + { + template + void map_replace_accounts(F& format, T& self) + { + wire::object(format, + wire::optional_field<0>("users", wire::trusted_array(std::ref(self.users))) + ); + } + } + WIRE_MSGPACK_DEFINE_OBJECT(replace_accounts, map_replace_accounts); +}}} // lws // rpc // scanner diff --git a/src/rpc/scanner/commands.h b/src/rpc/scanner/commands.h new file mode 100644 index 0000000..60461f9 --- /dev/null +++ b/src/rpc/scanner/commands.h @@ -0,0 +1,101 @@ +// Copyright (c) 2024, The Monero Project +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without modification, are +// permitted provided that the following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of +// conditions and the following disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list +// of conditions and the following disclaimer in the documentation and/or other +// materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be +// used to endorse or promote products derived from this software without specific +// prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY +// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL +// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, +// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF +// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +#pragma once + +#include +#include +#include + +#include "crypto/hash.h" // monero/src +#include "db/fwd.h" +#include "wire/msgpack/fwd.h" + +namespace lws { namespace rpc { namespace scanner +{ + /* + `monero-lws-daemon` is "server" and `monero-lws-scanner` is "client". + */ + + //! \brief Data sent before msgpack payload + struct header + { + using length_type = boost::endian::little_uint32_buf_t; + header() = delete; + std::uint8_t version; + std::uint8_t id; + length_type length; + }; + static_assert(sizeof(header) == 6); + + + /* + Client to server messages. + */ + + //! \brief Inform server of info needed to spawn work to client. + struct initialize + { + initialize() = delete; + static constexpr std::uint8_t id() noexcept { return 0; } + std::string pass; + std::uint32_t threads; + }; + WIRE_MSGPACK_DECLARE_OBJECT(initialize); + + //! Command that updates database account records + struct update_accounts + { + update_accounts() = delete; + static constexpr std::uint8_t id() noexcept { return 1; } + std::vector users; + std::vector blocks; + }; + WIRE_MSGPACK_DECLARE_OBJECT(update_accounts); + + + /* + Server to client messages. + */ + + //! \brief New accounts to add/push to scanning list + struct push_accounts + { + push_accounts() = delete; + static constexpr std::uint8_t id() noexcept { return 0; } + std::vector users; + }; + WIRE_MSGPACK_DECLARE_OBJECT(push_accounts); + + //! \brief Replace account scanning list with this new one + struct replace_accounts + { + replace_accounts() = delete; + static constexpr const std::uint8_t id() noexcept { return 1; } + std::vector users; + }; + WIRE_MSGPACK_DECLARE_OBJECT(replace_accounts); +}}} // lws // rpc // scanner diff --git a/src/rpc/scanner/connection.cpp b/src/rpc/scanner/connection.cpp new file mode 100644 index 0000000..48a1f3b --- /dev/null +++ b/src/rpc/scanner/connection.cpp @@ -0,0 +1,83 @@ +// Copyright (c) 2024, The Monero Project +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without modification, are +// permitted provided that the following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of +// conditions and the following disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list +// of conditions and the following disclaimer in the documentation and/or other +// materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be +// used to endorse or promote products derived from this software without specific +// prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY +// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL +// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, +// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF +// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#include "connection.h" + +#include "misc_log_ex.h" // monero/contrib/epee/include + +namespace lws { namespace rpc { namespace scanner +{ + connection::connection(boost::asio::io_service& io) + : read_buf_(), + write_bufs_(), + sock_(io), + write_timeout_(io), + strand_(io), + next_{}, + cleanup_(false) + {} + + connection::~connection() + {} + + boost::asio::ip::tcp::endpoint connection::remote_endpoint() + { + boost::system::error_code error{}; + return sock_.remote_endpoint(error); + } + + boost::asio::mutable_buffer connection::read_buffer(const std::size_t size) + { + assert(strand_.running_in_this_thread()); + read_buf_.clear(); + read_buf_.put_n(0, size); + return boost::asio::mutable_buffer(read_buf_.data(), size); + } + + boost::asio::const_buffer connection::write_buffer() const + { + assert(strand_.running_in_this_thread()); + if (write_bufs_.empty()) + return boost::asio::const_buffer(nullptr, 0); + return boost::asio::const_buffer(write_bufs_.front().data(), write_bufs_.front().size()); + } + + void connection::base_cleanup() + { + assert(strand_.running_in_this_thread()); + if (!cleanup_) + MINFO("Disconnected from " << remote_endpoint() << " / " << this); + cleanup_ = true; + boost::system::error_code error{}; + sock_.shutdown(boost::asio::ip::tcp::socket::shutdown_both, error); + error = boost::system::error_code{}; + sock_.close(error); + if (error) + MERROR("Error when closing socket: " << error.message()); + write_timeout_.cancel(); + } +}}} // lws // rpc // scanner diff --git a/src/rpc/scanner/connection.h b/src/rpc/scanner/connection.h new file mode 100644 index 0000000..379d50f --- /dev/null +++ b/src/rpc/scanner/connection.h @@ -0,0 +1,71 @@ +// Copyright (c) 2024, The Monero Project +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without modification, are +// permitted provided that the following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of +// conditions and the following disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list +// of conditions and the following disclaimer in the documentation and/or other +// materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be +// used to endorse or promote products derived from this software without specific +// prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY +// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL +// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, +// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF +// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "byte_slice.h" // monero/contrib/epee/include +#include "byte_stream.h" // monero/contrib/epee/include +#include "rpc/scanner/commands.h" + +namespace lws { namespace rpc { namespace scanner +{ + //! \brief Base class for `client_connection` and `server_connection`. Always use `strand_`. + struct connection + { + // Leave public for coroutines `read_commands` and `write_commands` + epee::byte_stream read_buf_; + std::deque write_bufs_; + boost::asio::ip::tcp::socket sock_; + boost::asio::steady_timer write_timeout_; + boost::asio::io_service::strand strand_; + header next_; + bool cleanup_; + + explicit connection(boost::asio::io_service& io); + ~connection(); + + boost::asio::ip::tcp::endpoint remote_endpoint(); + + //! \return ASIO compatible read buffer of `size`. + boost::asio::mutable_buffer read_buffer(const std::size_t size); + + //! \return ASIO compatible write buffer + boost::asio::const_buffer write_buffer() const; + + //! Cancels operations on socket and timer. Also updates `cleanup_ = true`. + void base_cleanup(); + }; +}}} // lws // rpc // scanner diff --git a/src/rpc/scanner/fwd.h b/src/rpc/scanner/fwd.h new file mode 100644 index 0000000..d4bdd55 --- /dev/null +++ b/src/rpc/scanner/fwd.h @@ -0,0 +1,42 @@ +// Copyright (c) 2024, The Monero Project +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without modification, are +// permitted provided that the following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of +// conditions and the following disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list +// of conditions and the following disclaimer in the documentation and/or other +// materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be +// used to endorse or promote products derived from this software without specific +// prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY +// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL +// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, +// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF +// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +#pragma once + +namespace lws { namespace rpc { namespace scanner +{ + class client; + struct connection; + struct give_accounts; + struct header; + struct push_accounts; + class queue; + template struct do_read_commands; + class server; + struct take_accounts; + struct update_accounts; + template struct write_commands; +}}} // lws // rpc // scanner diff --git a/src/rpc/scanner/queue.cpp b/src/rpc/scanner/queue.cpp new file mode 100644 index 0000000..2e0a135 --- /dev/null +++ b/src/rpc/scanner/queue.cpp @@ -0,0 +1,91 @@ +// Copyright (c) 2024, The Monero Project +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without modification, are +// permitted provided that the following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of +// conditions and the following disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list +// of conditions and the following disclaimer in the documentation and/or other +// materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be +// used to endorse or promote products derived from this software without specific +// prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY +// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL +// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, +// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF +// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#include "queue.h" + +#include "db/account.h" + +namespace lws { namespace rpc { namespace scanner +{ + queue::status queue::do_get_accounts() + { + status out{ + std::move(replace_), std::move(push_), user_count_ + }; + replace_ = std::nullopt; + push_.clear(); + push_.shrink_to_fit(); + return out; + } + + queue::queue() + : replace_(), push_(), user_count_(0), sync_(), poll_(), stop_(false) + {} + + queue::~queue() + {} + + void queue::stop() + { + { + const boost::lock_guard lock{sync_}; + stop_ = true; + } + poll_.notify_all(); + } + + std::size_t queue::user_count() + { + const boost::lock_guard lock{sync_}; + return user_count_; + } + + queue::status queue::get_accounts() + { + const boost::lock_guard lock{sync_}; + return do_get_accounts(); + } + + queue::status queue::wait_for_accounts() + { + boost::unique_lock lock{sync_}; + if (!replace_ && push_.empty() && !stop_) + poll_.wait(lock, [this] () { return replace_ || !push_.empty() || stop_; }); + return do_get_accounts(); + } + + void queue::replace_accounts(std::vector users) + { + { + const boost::lock_guard lock{sync_}; + replace_ = std::move(users); + user_count_ = replace_->size(); + push_.clear(); + } + poll_.notify_all(); + } +}}} // lws // rpc // scanner diff --git a/src/rpc/scanner/queue.h b/src/rpc/scanner/queue.h new file mode 100644 index 0000000..e00c2f3 --- /dev/null +++ b/src/rpc/scanner/queue.h @@ -0,0 +1,94 @@ +// Copyright (c) 2024, The Monero Project +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without modification, are +// permitted provided that the following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of +// conditions and the following disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list +// of conditions and the following disclaimer in the documentation and/or other +// materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be +// used to endorse or promote products derived from this software without specific +// prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY +// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL +// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, +// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF +// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +#pragma once + +#include +#include +#include +#include +#include +#include + +#include "db/fwd.h" +#include "rpc/scanner/commands.h" + +namespace lws { namespace rpc { namespace scanner +{ + //! Notifies worker thread of new accounts to scan. All functions thread-safe. + class queue + { + public: + //! Status of upstream scan requests. + struct status + { + std::optional> replace; //!< Empty optional means replace **not** requested. + std::vector push; + std::size_t user_count; + }; + + private: + std::optional> replace_; + std::vector push_; + std::size_t user_count_; + boost::mutex sync_; + boost::condition_variable poll_; + bool stop_; + + status do_get_accounts(); + + public: + queue(); + ~queue(); + + //! `wait_for_accounts()` will return immediately, permanently. + void stop(); + + //! \return Total number of users given to this local thread + std::size_t user_count(); + + //! \return Replacement and "push" accounts + status get_accounts(); + + //! Blocks until replace or push accounts become available OR `stop()` is called. + status wait_for_accounts(); + + //! Replace existing accounts on thread with new `users` + void replace_accounts(std::vector users); + + //! Push/add new accounts to scan on thread + template + void push_accounts(T begin, T end) + { + { + const boost::lock_guard lock{sync_}; + user_count_ += (end - begin); + push_.insert(push_.end(), std::make_move_iterator(begin), std::make_move_iterator(end)); + } + poll_.notify_all(); + } + }; +}}} // lws // rpc // scanner diff --git a/src/rpc/scanner/read_commands.h b/src/rpc/scanner/read_commands.h new file mode 100644 index 0000000..2a6acb0 --- /dev/null +++ b/src/rpc/scanner/read_commands.h @@ -0,0 +1,148 @@ +// Copyright (c) 2024, The Monero Project +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without modification, are +// permitted provided that the following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of +// conditions and the following disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list +// of conditions and the following disclaimer in the documentation and/or other +// materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be +// used to endorse or promote products derived from this software without specific +// prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY +// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL +// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, +// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF +// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "byte_slice.h" // monero/contrib/epee/include +#include "db/account.h" +#include "misc_log_ex.h" +#include "rpc/scanner/connection.h" +#include "wire/msgpack/base.h" +#include "wire/msgpack/read.h" + +namespace lws { namespace rpc { namespace scanner +{ + /*! Function for binding to command callables. Must be exeucting "inside of" + connection strand. + + \tparam F concept requirements: + * Must have inner `typedef` named `input` which specifies a type + that can read from msgpack bytes. + * Must have static function `handle` with interface + `bool(std::shared_ptr, F::input)`. + \tparam T concept requirements: + * Must be derived from `lws::rpc::scanner::connection`. */ + template + bool call(const std::shared_ptr& self) + { + static_assert(std::is_base_of{}); + if (!self) + return false; + + assert(self->strand_.running_in_this_thread()); + typename F::input data{}; + const std::error_code error = + wire::msgpack::from_bytes(epee::byte_slice{std::move(self->read_buf_)}, data); + self->read_buf_.clear(); + if (error) + { + MERROR("Failed to unpack message (from " << self->remote_endpoint() << "): " << error.message()); + return false; + } + + return F::handle(self, std::move(data)); + } + + /*! \brief ASIO coroutine for reading remote client OR server commands. + + \tparam T concept requirements: + * Must be derived from `lws::rpc::scanner::connection`. + * Must have `cleanup()` function that invokes `base_cleanup()`, and + does any other necessary work given that the socket connection is being + terminated. + * Must have a static `commands()` function, which returns a `std::array` + of `bool(std::shared_ptr)` callables. The position in the array + determines the command number. */ + template + class do_read_commands : public boost::asio::coroutine + { + static_assert(std::is_base_of{}); + const std::shared_ptr self_; + public: + explicit do_read_commands(std::shared_ptr self) + : boost::asio::coroutine(), self_(std::move(self)) + {} + + //! Invoke with no arguments to start read commands loop + void operator()(const boost::system::error_code& error = {}, const std::size_t transferred = 0) + { + if (!self_) + return; + + assert(self_->strand_.running_in_this_thread()); + if (error) + { + if (error != boost::asio::error::operation_aborted) + { + MERROR("Read error on socket (" << self_->remote_endpoint() << "): " << error.message()); + self_->cleanup(); + } + return; + } + if (self_->cleanup_) + return; // callback queued before cancellation + + BOOST_ASIO_CORO_REENTER(*this) + { + for (;;) // multiple commands + { + // indefinite read timeout (waiting for next command) + BOOST_ASIO_CORO_YIELD boost::asio::async_read(self_->sock_, self_->read_buffer(sizeof(self_->next_)), self_->strand_.wrap(*this)); + + std::memcpy(std::addressof(self_->next_), self_->read_buf_.data(), sizeof(self_->next_)); + static_assert(std::numeric_limits::max() <= std::numeric_limits::max()); + BOOST_ASIO_CORO_YIELD boost::asio::async_read(self_->sock_, self_->read_buffer(self_->next_.length.value()), self_->strand_.wrap(*this)); + + const auto& commands = T::commands(); + if (commands.size() <= self_->next_.id || !commands[self_->next_.id](self_)) + { + self_->cleanup(); + return; // stop reading commands + } + } + } + } + }; + + template + bool read_commands(const std::shared_ptr& self) + { + if (!self) + return false; + self->strand_.dispatch(do_read_commands{self}); + return true; + } +}}} // lws // rpc // scanner diff --git a/src/rpc/scanner/server.cpp b/src/rpc/scanner/server.cpp new file mode 100644 index 0000000..15888c1 --- /dev/null +++ b/src/rpc/scanner/server.cpp @@ -0,0 +1,528 @@ +// Copyright (c) 2024, The Monero Project +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without modification, are +// permitted provided that the following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of +// conditions and the following disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list +// of conditions and the following disclaimer in the documentation and/or other +// materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be +// used to endorse or promote products derived from this software without specific +// prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY +// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL +// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, +// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF +// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#include "server.h" + +#include +#include +#include +#include +#include +#include +#include "byte_slice.h" // monero/contrib/epee/include +#include "byte_stream.h" // monero/contrib/epee/include +#include "common/expect.h" // monero/src/ +#include "error.h" +#include "misc_log_ex.h" // monero/contrib/epee/include +#include "net/net_utils_base.h" // monero/contrib/epee/include +#include "rpc/scanner/commands.h" +#include "rpc/scanner/connection.h" +#include "rpc/scanner/read_commands.h" +#include "rpc/scanner/write_commands.h" +#include "scanner.h" + +namespace lws { namespace rpc { namespace scanner +{ + namespace + { + //! Use remote scanning only if users-per-local-thread exceeds this + constexpr const std::size_t remote_threshold = 100; + + //! Threshold for resetting/replacing state instead of pushing + constexpr const std::size_t replace_threshold = 10000; + + //! \brief Handler for server to initialize new scanner + struct initialize_handler + { + using input = initialize; + static bool handle(const std::shared_ptr& self, input msg); + }; + + //! \brief Handler for request to update accounts + struct update_accounts_handler + { + using input = update_accounts; + static bool handle(const std::shared_ptr& self, input msg); + }; + + using command = bool(*)(const std::shared_ptr&); + } // anonymous + + //! \brief Context/state for remote `monero-lws-scanner` instance. + struct server_connection : connection + { + const std::shared_ptr parent_; + std::size_t threads_; //!< Number of scan threads at remote process + + public: + explicit server_connection(std::shared_ptr parent, boost::asio::io_service& io) + : connection(io), + parent_(std::move(parent)), + threads_(0) + { + if (!parent_) + MONERO_THROW(common_error::kInvalidArgument, "nullptr parent"); + } + + //! \return Handlers for commands from client + static const std::array& commands() noexcept + { + static constexpr const std::array value{{ + call, + call + }}; + static_assert(initialize_handler::input::id() == 0); + static_assert(update_accounts_handler::input::id() == 1); + return value; + } + + //! Cancels pending operations and "pushes" accounts to other processes + void cleanup() + { + base_cleanup(); + } + }; + + namespace + { + bool initialize_handler::handle(const std::shared_ptr& self, const input msg) + { + if (!self) + return false; + + assert(self->strand_.running_in_this_thread()); + if (self->threads_) + { + MERROR("Client ( " << self->remote_endpoint() << ") invoked initialize twice, closing connection"); + return false; + } + + if (!msg.threads) + { + MERROR("Client (" << self->remote_endpoint() << ") intialized with 0 threads"); + return false; + } + + if (!self->parent_->check_pass(msg.pass)) + { + MERROR("Client (" << self->remote_endpoint() << ") provided invalid pass"); + return false; + } + + self->threads_ = boost::numeric_cast(msg.threads); + server::replace_users(self->parent_); + return true; + } + + bool update_accounts_handler::handle(const std::shared_ptr& self, input msg) + { + if (!self) + return false; + + if (msg.users.empty()) + return true; + + server::store(self->parent_, std::move(msg.users), std::move(msg.blocks)); + return true; + } + } // anonymous + + class server::acceptor : public boost::asio::coroutine + { + std::shared_ptr self_; + std::shared_ptr next_; + + public: + explicit acceptor(std::shared_ptr self) + : boost::asio::coroutine(), self_(std::move(self)), next_(nullptr) + {} + + void operator()(const boost::system::error_code& error = {}) + { + if (!self_ || error) + { + if (error == boost::asio::error::operation_aborted) + return; // exiting + MONERO_THROW(error, "server acceptor failed"); + } + assert(self_->strand_.running_in_this_thread()); + BOOST_ASIO_CORO_REENTER(*this) + { + for (;;) + { + next_ = std::make_shared(self_, GET_IO_SERVICE(self_->check_timer_)); + BOOST_ASIO_CORO_YIELD self_->acceptor_.async_accept(next_->sock_, self_->strand_.wrap(*this)); + + MINFO("New connection to " << next_->remote_endpoint() << " / " << next_.get()); + + self_->remote_.emplace(next_); + read_commands(std::move(next_)); + } + } + } + }; + + struct server::check_users + { + std::shared_ptr self_; + + void operator()(const boost::system::error_code& error = {}) const + { + if (!self_ || error == boost::asio::error::operation_aborted) + return; + + assert(self_->strand_.running_in_this_thread()); + self_->check_timer_.expires_from_now(account_poll_interval); + self_->check_timer_.async_wait(self_->strand_.wrap(*this)); + + std::size_t total_threads = self_->local_.size(); + std::vector> remotes{}; + remotes.reserve(self_->remote_.size()); + for (auto& remote : self_->remote_) + { + auto conn = remote.lock(); + if (!conn) + { + // connection loss detected, re-shuffle accounts + self_->do_replace_users(); + return; + } + if (std::numeric_limits::max() - total_threads < conn->threads_) + MONERO_THROW(error::configuration, "Exceeded max threads (size_t) across all systems"); + total_threads += conn->threads_; + remotes.push_back(std::move(conn)); + } + + if (!total_threads) + { + MWARNING("Currently no worker threads, waiting for new clients"); + return; + } + + auto reader = self_->disk_.start_read(std::move(self_->read_txn_)); + if (!reader) + { + if (reader.matches(std::errc::no_lock_available)) + { + MWARNING("Failed to open DB read handle, retrying later"); + return; + } + MONERO_THROW(reader.error(), "Failed to open DB read handle"); + } + + auto current_users = MONERO_UNWRAP( + reader->get_accounts(db::account_status::active, std::move(self_->accounts_cur_)) + ); + if (current_users.count() < self_->active_.size()) + { + // a shrinking user base, re-shuffle + self_->do_replace_users(); + return; + } + std::vector active_copy = self_->active_; + std::vector new_accounts; + for (auto user = current_users.make_iterator(); !user.is_end(); ++user) + { + const db::account_id user_id = user.get_value(); + const auto loc = std::lower_bound(active_copy.begin(), active_copy.end(), user_id); + if (loc == active_copy.end() || *loc != user_id) + { + new_accounts.push_back(MONERO_UNWRAP(reader->get_full_account(user.get_value()))); + if (replace_threshold < new_accounts.size()) + { + self_->do_replace_users(); + return; + } + self_->active_.insert( + std::lower_bound(self_->active_.begin(), self_->active_.end(), user_id), + user_id + ); + } + else + active_copy.erase(loc); + } + + if (!active_copy.empty()) + { + self_->do_replace_users(); + return; + } + + self_->next_thread_ %= total_threads; + while (!new_accounts.empty()) + { + if (self_->next_thread_ < self_->local_.size()) + { + self_->local_[self_->next_thread_]->push_accounts( + std::make_move_iterator(new_accounts.end() - 1), + std::make_move_iterator(new_accounts.end()) + ); + new_accounts.erase(new_accounts.end() - 1); + ++self_->next_thread_; + } + else + { + std::size_t j = 0; + for (auto offset = self_->local_.size(); j < remotes.size(); ++j) + { + if (self_->next_thread_ <= offset) + break; + offset += remotes[j]->threads_; + } + + const auto user_count = std::min(new_accounts.size(), remotes[j]->threads_); + std::vector next{ + std::make_move_iterator(new_accounts.end() - user_count), + std::make_move_iterator(new_accounts.end()) + }; + new_accounts.erase(new_accounts.end() - user_count); + write_command(remotes[j], push_accounts{std::move(next)}); + self_->next_thread_ += remotes[j]->threads_; + } + + self_->next_thread_ %= total_threads; + } + self_->read_txn_ = reader->finish_read(); + self_->accounts_cur_ = current_users.give_cursor(); + } + }; + + void server::do_replace_users() + { + assert(strand_.running_in_this_thread()); + MINFO("Updating/replacing user account(s) on worker thread(s)"); + + std::size_t remaining_threads = local_.size(); + std::vector> remotes; + remotes.reserve(remote_.size()); + for (auto remote = remote_.begin(); remote != remote_.end(); ) + { + auto conn = remote->lock(); + if (conn) + { + if (std::numeric_limits::max() - remaining_threads < conn->threads_) + MONERO_THROW(error::configuration, "Exceeded max threads (size_t) across all systems"); + + remaining_threads += conn->threads_; + remotes.push_back(std::move(conn)); + ++remote; + } + else + remote = remote_.erase(remote); + } + + if (!remaining_threads) + { + MWARNING("Currently no worker threads, waiting for new clients"); + return; + } + + std::vector active{}; + std::vector users{}; + auto reader = MONERO_UNWRAP(disk_.start_read()); + { + auto active_users = MONERO_UNWRAP(reader.get_accounts(db::account_status::active)); + const auto active_count = active_users.count(); + active.reserve(active_count); + users.reserve(active_count); + for (auto user : active_users.make_range()) + { + active.insert(std::lower_bound(active.begin(), active.end(), user.id), user.id); + users.insert(std::lower_bound(users.begin(), users.end(), user, by_height{}), user); + } + } + + // if under `remote_threshold` users per thread, use local scanning only + if (local_.size() && (users.size() / local_.size()) < remote_threshold) + remaining_threads = local_.size(); + + // make sure to notify of zero users too! + for (auto& local : local_) + { + const auto user_count = users.size() / remaining_threads; + + std::vector next{}; + next.reserve(user_count); + + for (std::size_t j = 0; !users.empty() && j < user_count; ++j) + { + next.push_back(MONERO_UNWRAP(reader.get_full_account(users.back()))); + users.erase(users.end() - 1); + } + + local->replace_accounts(std::move(next)); + --remaining_threads; + } + + // make sure to notify of zero users too! + for (auto& remote : remotes) + { + const auto users_per_thread = users.size() / std::max(std::size_t(1), remaining_threads); + const auto user_count = std::max(std::size_t(1), users_per_thread) * remote->threads_; + + std::vector next{}; + next.reserve(user_count); + + for (std::size_t j = 0; !users.empty() && j < user_count; ++j) + { + next.push_back(MONERO_UNWRAP(reader.get_full_account(users.back()))); + users.erase(users.end() - 1); + } + + write_command(remote, replace_accounts{std::move(next)}); + remaining_threads -= std::min(remaining_threads, remote->threads_); + } + + next_thread_ = 0; + active_ = std::move(active); + } + + boost::asio::ip::tcp::endpoint server::get_endpoint(const std::string& address) + { + std::string host; + std::string port; + { + const auto split = address.rfind(':'); + if (split == std::string::npos) + { + host = "0.0.0.0"; + port = address; + } + else + { + host = address.substr(0, split); + port = address.substr(split + 1); + } + } + return boost::asio::ip::tcp::endpoint{ + boost::asio::ip::address::from_string(host), boost::lexical_cast(port) + }; + } + + server::server(boost::asio::io_service& io, db::storage disk, rpc::client zclient, std::vector> local, std::vector active, ssl_verification_t webhook_verify) + : strand_(io), + check_timer_(io), + acceptor_(io), + remote_(), + local_(std::move(local)), + active_(std::move(active)), + disk_(std::move(disk)), + zclient_(std::move(zclient)), + read_txn_{}, + accounts_cur_{}, + next_thread_(0), + pass_hashed_(), + pass_salt_(), + webhook_verify_(webhook_verify) + { + std::sort(active_.begin(), active_.end()); + for (const auto& local : local_) + { + if (!local) + MONERO_THROW(common_error::kInvalidArgument, "given nullptr local queue"); + } + + std::memset(pass_hashed_.data(), 0, pass_hashed_.size()); + randombytes_buf(pass_salt_.data(), pass_salt_.size()); + } + + server::~server() noexcept + {} + + bool server::check_pass(const std::string& pass) const noexcept + { + std::array out; + std::memset(out.data(), 0, out.size()); + compute_hash(out, pass); + return sodium_memcmp(out.data(), pass_hashed_.data(), out.size()) == 0; + } + + void server::compute_hash(std::array& out, const std::string& pass) const noexcept + { + if (out.size() < crypto_pwhash_BYTES_MIN) + MONERO_THROW(error::crypto_failure, "Invalid output size"); + if (crypto_pwhash_BYTES_MAX < out.size()) + MONERO_THROW(error::crypto_failure, "Invalid output size"); + + if (pass.size() < crypto_pwhash_PASSWD_MIN && crypto_pwhash_PASSWD_MAX < pass.size()) + MONERO_THROW(error::crypto_failure, "Invalid password size"); + + if (crypto_pwhash(out.data(), out.size(), pass.data(), pass.size(), pass_salt_.data(), + crypto_pwhash_OPSLIMIT_MIN, crypto_pwhash_MEMLIMIT_MIN, crypto_pwhash_ALG_DEFAULT) != 0) + MONERO_THROW(error::crypto_failure, "Failed password hashing"); + } + + void server::start_acceptor(const std::shared_ptr& self, const std::string& address, std::string pass) + { + if (!self) + MONERO_THROW(common_error::kInvalidArgument, "nullptr self"); + if (address.empty()) + return; + + auto endpoint = get_endpoint(address); + self->strand_.dispatch([self, endpoint = std::move(endpoint), pass = std::move(pass)] () + { + self->acceptor_.close(); + self->acceptor_.open(endpoint.protocol()); + self->acceptor_.bind(endpoint); + self->acceptor_.listen(); + + MINFO("Listening at " << endpoint << " for scanner clients"); + + self->compute_hash(self->pass_hashed_, pass); + acceptor{std::move(self)}(); + }); + } + + void server::start_user_checking(const std::shared_ptr& self) + { + if (!self) + MONERO_THROW(common_error::kInvalidArgument, "nullptr self"); + self->strand_.dispatch(check_users{self}); + } + + void server::replace_users(const std::shared_ptr& self) + { + if (!self) + MONERO_THROW(common_error::kInvalidArgument, "nullptr self"); + self->strand_.dispatch([self] () { self->do_replace_users(); }); + } + + void server::store(const std::shared_ptr& self, std::vector users, std::vector blocks) + { + if (!self) + MONERO_THROW(common_error::kInvalidArgument, "nullptr self"); + + std::sort(users.begin(), users.end(), by_height{}); + self->strand_.dispatch([self, users = std::move(users), blocks = std::move(blocks)] () + { + const lws::scanner_options opts{self->webhook_verify_, false, false}; + if (!lws::user_data::store(self->disk_, self->zclient_, epee::to_span(blocks), epee::to_span(users), nullptr, opts)) + GET_IO_SERVICE(self->check_timer_).stop(); + }); + } +}}} // lws // rpc // scanner diff --git a/src/rpc/scanner/server.h b/src/rpc/scanner/server.h new file mode 100644 index 0000000..6be369c --- /dev/null +++ b/src/rpc/scanner/server.h @@ -0,0 +1,110 @@ +// Copyright (c) 2024, The Monero Project +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without modification, are +// permitted provided that the following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of +// conditions and the following disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list +// of conditions and the following disclaimer in the documentation and/or other +// materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be +// used to endorse or promote products derived from this software without specific +// prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY +// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL +// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, +// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF +// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "db/fwd.h" +#include "db/storage.h" +#include "net/net_ssl.h" // monero/contrib/epee/include +#include "rpc/client.h" +#include "rpc/scanner/queue.h" + +namespace lws { namespace rpc { namespace scanner +{ + //! Checking frequency for local user db changes + constexpr const std::chrono::seconds account_poll_interval{10}; + + using ssl_verification_t = epee::net_utils::ssl_verification_t; + struct server_connection; + + /*! + \brief Manages local and remote scanning for the primary daemon. + + \note HTTP and ZMQ were not used because a two-way messaging system were + needed (basically a REST server on either end). */ + class server + { + boost::asio::io_service::strand strand_; + boost::asio::steady_timer check_timer_; + boost::asio::ip::tcp::acceptor acceptor_; + std::set, std::owner_less>> remote_; + std::vector> local_; + std::vector active_; + db::storage disk_; + rpc::client zclient_; + lmdb::suspended_txn read_txn_; + db::cursor::accounts accounts_cur_; + std::size_t next_thread_; + std::array pass_hashed_; + std::array pass_salt_; + const ssl_verification_t webhook_verify_; + + //! Async acceptor routine + class acceptor; + struct check_users; + + //! Reset `local_` and `remote_` scanners. Must be called in `strand_`. + void do_replace_users(); + + public: + static boost::asio::ip::tcp::endpoint get_endpoint(const std::string& address); + + explicit server(boost::asio::io_service& io, db::storage disk, rpc::client zclient, std::vector> local, std::vector active, ssl_verification_t webhook_verify); + + server(const server&) = delete; + server(server&&) = delete; + ~server() noexcept; + server& operator=(const server&) = delete; + server& operator=(server&&) = delete; + + //! \return True if `pass` matches expected + bool check_pass(const std::string& pass) const noexcept; + + void compute_hash(std::array& out, const std::string& pass) const noexcept; + + //! Start listening for incoming connections on `address`. + static void start_acceptor(const std::shared_ptr& self, const std::string& address, std::string pass); + + //! Start timed checks of local DB for change in user state + static void start_user_checking(const std::shared_ptr& self); + + //! Replace users/accounts on all local and remote threads + static void replace_users(const std::shared_ptr& self); + + //! Update `users` information on local DB + static void store(const std::shared_ptr& self, std::vector users, std::vector blocks); + }; +}}} // lws // rpc // scanner diff --git a/src/rpc/scanner/write_commands.cpp b/src/rpc/scanner/write_commands.cpp new file mode 100644 index 0000000..7ae6d33 --- /dev/null +++ b/src/rpc/scanner/write_commands.cpp @@ -0,0 +1,55 @@ +// Copyright (c) 2024, The Monero Project +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without modification, are +// permitted provided that the following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of +// conditions and the following disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list +// of conditions and the following disclaimer in the documentation and/or other +// materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be +// used to endorse or promote products derived from this software without specific +// prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY +// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL +// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, +// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF +// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#include "write_commands.h" + +#include +#include + + +namespace lws { namespace rpc { namespace scanner +{ + epee::byte_slice complete_command(const std::uint8_t id, epee::byte_stream sink) + { + if (sink.size() < sizeof(header)) + { + MERROR("Message sink was unexpectedly shrunk on message"); + return nullptr; + } + + using value_type = header::length_type::value_type; + if (std::numeric_limits::max() < sink.size() - sizeof(header)) + { + MERROR("Message to exceeds max size"); + return nullptr; + } + + const header head{0, id, header::length_type{value_type(sink.size() - sizeof(header))}}; + std::memcpy(sink.data(), std::addressof(head), sizeof(head)); + return epee::byte_slice{std::move(sink)}; + } +}}} // lws // rpc // scanner diff --git a/src/rpc/scanner/write_commands.h b/src/rpc/scanner/write_commands.h new file mode 100644 index 0000000..d13384b --- /dev/null +++ b/src/rpc/scanner/write_commands.h @@ -0,0 +1,209 @@ +// Copyright (c) 2024, The Monero Project +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without modification, are +// permitted provided that the following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of +// conditions and the following disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list +// of conditions and the following disclaimer in the documentation and/or other +// materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be +// used to endorse or promote products derived from this software without specific +// prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY +// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +// MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL +// THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, +// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF +// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +#include "byte_slice.h" // monero/contrib/epee/include +#include "byte_stream.h" // monero/contrib/epee/include +#include "commands.h" +#include "common/expect.h"// monero/src +#include "crypto/hash.h" // monero/src +#include "db/account.h" +#include "misc_log_ex.h" +#include "rpc/scanner/commands.h" +#include "rpc/scanner/connection.h" +#include "wire/msgpack/write.h" + +namespace lws { namespace rpc { namespace scanner +{ + constexpr const std::size_t max_write_buffers = 100; + + /* \brief ASIO handler for write timeouts + + \tparam T concept requirements: + * Must be derived from `lws::rpc::scanner::connection`. + * Must have `cleanup()` function that invokes `base_cleanup()`, and + does any other necessary work given that the socket connection is being + terminated. */ + template + struct timeout + { + static_assert(std::is_base_of{}); + std::shared_ptr self_; + + void operator()(const boost::system::error_code& error) const + { + if (self_ && error != boost::asio::error::operation_aborted) + { + assert(self_->strand_.running_in_this_thread()); + MERROR("Write timeout on socket (" << self_->remote_endpoint() << ")"); + self_->cleanup(); + } + } + }; + + /*! \brief ASIO coroutine for write client OR server commands. + + \tparam T concept requirements: + * Must be derived from `lws::rpc::scanner::connection`. + * Must have `cleanup()` function that invokes `base_cleanup()`, and + does any other necessary work given that the socket connection is being + terminated. */ + template + class write_buffers : public boost::asio::coroutine + { + static_assert(std::is_base_of{}); + std::shared_ptr self_; + public: + explicit write_buffers(std::shared_ptr self) + : boost::asio::coroutine(), self_(std::move(self)) + {} + + write_buffers(write_buffers&&) = default; + write_buffers(const write_buffers&) = default; + + void operator()(const boost::system::error_code& error = {}, std::size_t = 0) + { + if (!self_) + return; + + assert(self_->strand_.running_in_this_thread()); + if (error) + { + if (error != boost::asio::error::operation_aborted) + { + MERROR("Write error on socket (" << self_->remote_endpoint() << "): " << error.message()); + self_->cleanup(); + } + self_->write_timeout_.cancel(); + return; + } + if (self_->cleanup_) + return; // callback queued before cancellation + + BOOST_ASIO_CORO_REENTER(*this) + { + while (!self_->write_bufs_.empty()) + { + self_->write_timeout_.expires_from_now(std::chrono::seconds{10}); + self_->write_timeout_.async_wait(self_->strand_.wrap(timeout{self_})); + BOOST_ASIO_CORO_YIELD boost::asio::async_write(self_->sock_, self_->write_buffer(), self_->strand_.wrap(*this)); + self_->write_timeout_.cancel(); + self_->write_bufs_.pop_front(); + } + } + } + }; + + //! \return Completed message using `sink` as source. + epee::byte_slice complete_command(std::uint8_t id, epee::byte_stream sink); + + + /*! Writes "raw" `header` then `data` as msgpack, and queues for writing to + `self`. Also starts ASIO async writing (via `write_buffers`) if the queue + was empty before queueing `data`. + + \tparam T must meet concept requirements for `T` outlined in + `write_commands`. + \tparam U concept requirements: + * must be serializable to msgpack using `wire` engine. + * must have static function `id` which returns an `std::uint8_t` to + identify the command on the remote side. */ + template + void write_command(const std::shared_ptr& self, const U& data) + { + static_assert(std::is_base_of{}); + if (!self) + MONERO_THROW(common_error::kInvalidArgument, "nullptr self"); + + epee::byte_slice msg = nullptr; + try + { + epee::byte_stream sink{}; + sink.put_n(0, sizeof(header)); + + // use integer keys for speed (default to_bytes uses strings) + wire::msgpack_slice_writer dest{std::move(sink), true}; + wire_write::bytes(dest, data); + + msg = complete_command(U::id(), dest.take_sink()); + } + catch (const wire::exception& e) + { + MERROR("Failed to serialize msgpack for remote (" << self.get() << ") command: " << e.what()); + throw; // this should rarely happen, so just shutdown + } + + if (msg.empty()) + { + self->cleanup(); + return; + } + + class queue_slice + { + std::shared_ptr self_; + epee::byte_slice msg_; + + public: + explicit queue_slice(std::shared_ptr self, epee::byte_slice msg) + : self_(std::move(self)), msg_(std::move(msg)) + {} + + queue_slice(queue_slice&&) = default; + queue_slice(const queue_slice& rhs) + : self_(rhs.self_), msg_(rhs.msg_.clone()) + {} + + void operator()() + { + if (!self_) + return; + + const bool queue = self_->write_bufs_.empty(); + self_->write_bufs_.push_back(std::move(msg_)); + + if (queue) + write_buffers{self_}(); + else if (max_write_buffers <= self_->write_bufs_.size()) + { + MERROR("Exceeded max buffer size for connection: " << self_->remote_endpoint()); + self_->cleanup(); + } + } + }; + + self->strand_.dispatch(queue_slice{self, std::move(msg)}); + } +}}} // lws // rpc // scanner diff --git a/src/scanner.cpp b/src/scanner.cpp index 0b49ce1..ae1100c 100644 --- a/src/scanner.cpp +++ b/src/scanner.cpp @@ -60,6 +60,8 @@ #include "rpc/json.h" #include "rpc/lws_pub.h" #include "rpc/message_data_structs.h" // monero/src +#include "rpc/scanner/queue.h" +#include "rpc/scanner/server.h" #include "rpc/webhook.h" #include "util/blocks.h" #include "util/source_location.h" @@ -72,8 +74,6 @@ namespace lws { - std::atomic scanner::running{true}; - // Not in `rates.h` - defaulting to JSON output seems odd std::ostream& operator<<(std::ostream& out, lws::rates const& src) { @@ -87,54 +87,24 @@ namespace lws { namespace net = epee::net_utils; - constexpr const std::chrono::seconds account_poll_interval{10}; constexpr const std::chrono::minutes block_rpc_timeout{2}; constexpr const std::chrono::seconds send_timeout{30}; constexpr const std::chrono::seconds sync_rpc_timeout{30}; - struct thread_sync - { - boost::mutex sync; - boost::condition_variable user_poll; - std::atomic update; - }; - - struct options - { - net::ssl_verification_t webhook_verify; - bool enable_subaddresses; - bool untrusted_daemon; - }; - struct thread_data { - explicit thread_data(rpc::client client, db::storage disk, std::vector users, options opts) - : client(std::move(client)), disk(std::move(disk)), users(std::move(users)), opts(opts) + explicit thread_data(rpc::client client, db::storage disk, std::vector users, std::shared_ptr queue, scanner_options opts) + : client(std::move(client)), disk(std::move(disk)), users(std::move(users)), queue(std::move(queue)), opts(std::move(opts)) {} rpc::client client; db::storage disk; std::vector users; - options opts; + std::shared_ptr queue; + scanner_options opts; }; - - // until we have a signal-handler safe notification system - void checked_wait(const std::chrono::nanoseconds wait) - { - static constexpr const std::chrono::milliseconds interval{500}; - - const auto start = std::chrono::steady_clock::now(); - while (scanner::is_running()) - { - const auto current = std::chrono::steady_clock::now() - start; - if (wait <= current) - break; - const auto sleep_time = std::min(wait - current, std::chrono::nanoseconds{interval}); - boost::this_thread::sleep_for(boost::chrono::nanoseconds{sleep_time.count()}); - } - } - - bool is_new_block(std::string&& chain_msg, db::storage& disk, const account& user) + + bool is_new_block(std::string&& chain_msg, std::optional& disk, const account& user) { const auto chain = rpc::minimal_chain_pub::from_json(std::move(chain_msg)); if (!chain) @@ -146,7 +116,13 @@ namespace lws if (user.scan_height() < db::block_id(chain->top_block_height)) return true; - auto reader = disk.start_read(); + if (!disk) + { + MWARNING("Assuming new block - no access to local DB"); + return true; + } + + auto reader = disk->start_read(); if (!reader) { MWARNING("Failed to start DB read: " << reader.error()); @@ -179,7 +155,7 @@ namespace lws { rpc::send_webhook(client, events, "json-full-payment_hook:", "msgpack-full-payment_hook:", std::chrono::seconds{5}, verify_mode); } - + std::size_t get_target_time(db::block_id height) { const hardfork_t* fork = nullptr; @@ -221,15 +197,7 @@ namespace lws { rpc::send_webhook(client, events, "json-full-spend_hook:", "msgpack-full-spend_hook:", std::chrono::seconds{5}, verify_mode); } - - struct by_height - { - bool operator()(account const& left, account const& right) const noexcept - { - return left.scan_height() < right.scan_height(); - } - }; - + struct add_spend { void operator()(lws::account& user, const db::spend& spend) const @@ -322,12 +290,12 @@ namespace lws expect reader; db::cursor::subaddress_indexes cur; - subaddress_reader(db::storage const& disk, const bool enable_subaddresses) + subaddress_reader(std::optional const& disk, const bool enable_subaddresses) : reader(common_error::kInvalidArgument), cur(nullptr) { - if (enable_subaddresses) + if (disk && enable_subaddresses) { - reader = disk.start_read(); + reader = disk->start_read(); if (!reader) MERROR("Subadress lookup failure: " << reader.error().message()); } @@ -588,7 +556,7 @@ namespace lws scan_transaction_base(users, height, timestamp, tx_hash, tx, out_ids, reader, add_spend{}, add_output{}); } - void scan_transactions(std::string&& txpool_msg, epee::span users, db::storage const& disk, rpc::client& client, const options& opts) + void scan_transactions(std::string&& txpool_msg, epee::span users, db::storage const& disk, rpc::client& client, const scanner_options& opts) { // uint64::max is for txpool static const std::vector fake_outs( @@ -605,7 +573,7 @@ namespace lws const auto time = boost::numeric_cast(std::chrono::system_clock::to_time_t(std::chrono::system_clock::now())); - subaddress_reader reader{disk, opts.enable_subaddresses}; + subaddress_reader reader{std::optional{disk.clone()}, opts.enable_subaddresses}; send_webhook sender{disk, client, opts.webhook_verify}; for (const auto& tx : parsed->txes) scan_transaction_base(users, db::block_id::txpool, time, crypto::hash{}, tx, fake_outs, reader, null_spend{}, sender); @@ -620,49 +588,106 @@ namespace lws MINFO("Updated exchange rates: " << *(*new_rates)); } - void scan_loop(thread_sync& self, std::shared_ptr data, const bool untrusted_daemon, const bool leader_thread) noexcept + void do_scan_loop(scanner_sync& self, std::shared_ptr data, const bool leader_thread) noexcept { - try + struct stop_ { + scanner_sync& self; + ~stop_() { self.stop(); } + } stop{self}; + + // thread entry point, so wrap everything in `try { } catch (...) {}` + try + { // boost::thread doesn't support move-only types + attributes rpc::client client{std::move(data->client)}; db::storage disk{std::move(data->disk)}; std::vector users{std::move(data->users)}; - const options opts = std::move(data->opts); - - assert(!users.empty()); - assert(std::is_sorted(users.begin(), users.end(), by_height{})); - + const std::shared_ptr queue{std::move(data->queue)}; + const scanner_options opts{std::move(data->opts)}; + data.reset(); - struct stop_ + if (!queue) + return; + + while (self.is_running()) { - thread_sync& self; - ~stop_() noexcept + if (!users.empty()) { - self.update = true; - self.user_poll.notify_one(); + auto new_client = MONERO_UNWRAP(client.clone()); + MONERO_UNWRAP(new_client.watch_scan_signals()); + user_data store_local{disk.clone()}; + if (!scanner::loop(self.stop_, std::move(store_local), disk.clone(), std::move(new_client), std::move(users), *queue, opts, leader_thread)) + return; } - } stop{self}; - // RPC server assumes that `start_height == 0` means use + users.clear(); + auto status = queue->wait_for_accounts(); + if (status.replace) + users = std::move(*status.replace); + users.insert( + users.end(), + std::make_move_iterator(status.push.begin()), + std::make_move_iterator(status.push.end()) + ); + } + } + catch (std::exception const& e) + { + self.shutdown(); + MERROR(e.what()); + } + catch (...) + { + self.shutdown(); + MERROR("Unknown exception"); + } + } + } // anonymous + + scanner::scanner(db::storage disk) + : disk_(std::move(disk)), sync_(), signals_(sync_.io_) + { + signals_.add(SIGINT); + signals_.async_wait([this] (const boost::system::error_code& error, int) + { + if (error != boost::asio::error::operation_aborted) + shutdown(); + }); + } + + scanner::~scanner() + {} + + bool scanner::loop(const std::atomic& stop, store_func store, std::optional disk, rpc::client client, std::vector users, rpc::scanner::queue& queue, const scanner_options& opts, const bool leader_thread) + { + if (users.empty()) + return true; + + { // previous `try` block; leave to prevent git blame spam + std::sort(users.begin(), users.end(), by_height{}); + + /// RPC server assumes that `start_height == 0` means use // block ids. This technically skips genesis block. cryptonote::rpc::GetBlocksFast::Request req{}; req.start_height = std::uint64_t(users.begin()->scan_height()); req.start_height = std::max(std::uint64_t(1), req.start_height); - req.prune = !untrusted_daemon; + req.prune = !opts.untrusted_daemon; epee::byte_slice block_request = rpc::client::make_message("get_blocks_fast", req); if (!send(client, block_request.clone())) - return; + return false; std::vector blockchain{}; std::vector new_pow{}; db::pow_window pow_window{}; - const db::block_info last_checkpoint = db::storage::get_last_checkpoint(); - const db::block_id last_pow = MONERO_UNWRAP(MONERO_UNWRAP(disk.start_read()).get_last_pow_block()).id; - while (!self.update && scanner::is_running()) + db::block_id last_pow{}; + if (opts.untrusted_daemon && disk) + last_pow = MONERO_UNWRAP(MONERO_UNWRAP(disk->start_read()).get_last_pow_block()).id; + + while (!stop) { blockchain.clear(); new_pow.clear(); @@ -674,7 +699,7 @@ namespace lws if (timeout) MWARNING("Block retrieval timeout, resetting scanner"); if (timeout || resp.matches(std::errc::interrupted)) - return; + return false; MONERO_THROW(resp.error(), "Failed to retrieve blocks from daemon"); } @@ -682,7 +707,7 @@ namespace lws if (!fetched) { MERROR("Failed to retrieve next blocks: " << fetched.error().message() << ". Resetting state and trying again"); - return; + return false; } if (fetched->blocks.empty()) @@ -691,35 +716,46 @@ namespace lws if (fetched->start_height != req.start_height) { MWARNING("Daemon sent wrong blocks, resetting state"); - return; + return false; } { - expect> new_accounts = client.pull_accounts(); - if (!new_accounts) + bool resort = false; + auto status = queue.get_accounts(); + if (status.replace && status.replace->empty() && status.push.empty()) + return true; // no work explictly given, leave + + if (status.replace) { - MERROR("Failed to pull new accounts: " << new_accounts.error().message()); - return; // get all active accounts the easy way + MINFO("Received " << status.replace->size() << " replacement account(s) for scanning"); + users = std::move(*status.replace); + resort = true; } - if (!new_accounts->empty()) + if (!status.push.empty()) { - MINFO("Received " << new_accounts->size() << " new account(s) for scanning"); - std::sort(new_accounts->begin(), new_accounts->end(), by_height{}); - const db::block_id oldest = new_accounts->front().scan_height(); + MINFO("Received " << status.push.size() << " new account(s) for scanning"); users.insert( users.end(), - std::make_move_iterator(new_accounts->begin()), - std::make_move_iterator(new_accounts->end()) + std::make_move_iterator(status.push.begin()), + std::make_move_iterator(status.push.end()) ); + resort = true; + } + + if (resort) + { + assert(!users.empty()); // by logic from above + std::sort(users.begin(), users.end(), by_height{}); + const db::block_id oldest = users.front().scan_height(); if (std::uint64_t(oldest) < fetched->start_height) { req.start_height = std::uint64_t(oldest); block_request = rpc::client::make_message("get_blocks_fast", req); if (!send(client, block_request.clone())) - return; + return false; continue; // to next get_blocks_fast read } - // else, the oldest new account is within the newly fetched range + // else, the oldest new account is within the newly fetch range } } @@ -734,7 +770,7 @@ namespace lws { expect>> new_pubs = client.wait_for_block(); if (new_pubs.matches(std::errc::interrupted)) - return; // reset entire state (maybe shutdown) + return false; // reset entire state (maybe shutdown) if (!new_pubs) break; // exit wait for block loop, and try fetching new blocks @@ -747,9 +783,9 @@ namespace lws auto message = new_pubs->begin(); for ( ; message != new_pubs->end(); ++message) { - if (message->first != rpc::client::topic::txpool) + if (!disk || message->first != rpc::client::topic::txpool) break; // inner for loop - scan_transactions(std::move(message->second), epee::to_mut_span(users), disk, client, opts); + scan_transactions(std::move(message->second), epee::to_mut_span(users), *disk, client, opts); } for ( ; message != new_pubs->end(); ++message) @@ -761,19 +797,19 @@ namespace lws // request next chunk of blocks if (!send(client, block_request.clone())) - return; + return false; continue; // to next get_blocks_fast read } // if only one block was fetched // request next chunk of blocks if (!send(client, block_request.clone())) - return; + return false; if (fetched->blocks.size() != fetched->output_indices.size()) throw std::runtime_error{"Bad daemon response - need same number of blocks and indices"}; blockchain.push_back(cryptonote::get_block_hash(fetched->blocks.front().block)); - if (untrusted_daemon) + if (opts.untrusted_daemon) new_pow.push_back(db::pow_sync{fetched->blocks.front().block.timestamp}); auto blocks = epee::to_mut_span(fetched->blocks); @@ -788,10 +824,10 @@ namespace lws else fetched->start_height = 0; - if (untrusted_daemon) + if (disk && opts.untrusted_daemon) { pow_window = MONERO_UNWRAP( - MONERO_UNWRAP(disk.start_read()).get_pow_window(db::block_id(fetched->start_height)) + MONERO_UNWRAP(disk->start_read()).get_pow_window(db::block_id(fetched->start_height)) ); } @@ -826,7 +862,7 @@ namespace lws reader ); - if (untrusted_daemon) + if (opts.untrusted_daemon) { if (block.prev_id != blockchain.back()) MONERO_THROW(error::bad_blockchain, "A blocks prev_id does not match"); @@ -838,19 +874,19 @@ namespace lws pow_window.median_timestamps.erase(pow_window.median_timestamps.begin()); // longhash takes a while, check is_running - if (!scanner::is_running()) - return; + if (stop) + return false; diff = cryptonote::next_difficulty(pow_window.pow_timestamps, pow_window.cumulative_diffs, get_target_time(db::block_id(fetched->start_height))); // skip POW hashing if done previously - if (last_pow < db::block_id(fetched->start_height)) + if (disk && last_pow < db::block_id(fetched->start_height)) { if (!verify_timestamp(block.timestamp, pow_window.median_timestamps)) MONERO_THROW(error::bad_blockchain, "Block failed timestamp check - possible chain forgery"); const crypto::hash pow = - get_block_longhash(get_block_hashing_blob(block), db::block_id(fetched->start_height), block.major_version, disk, initial_height, epee::to_span(blockchain)); + get_block_longhash(get_block_hashing_blob(block), db::block_id(fetched->start_height), block.major_version, *disk, initial_height, epee::to_span(blockchain)); if (!cryptonote::check_hash(pow, diff)) MONERO_THROW(error::bad_blockchain, "Block had too low difficulty"); } @@ -862,7 +898,7 @@ namespace lws for (auto tx_data : boost::combine(block.tx_hashes, txes, indices)) { - if (untrusted_daemon) + if (opts.untrusted_daemon) { if (cryptonote::get_transaction_hash(boost::get<1>(tx_data)) != boost::get<0>(tx_data)) MONERO_THROW(error::bad_blockchain, "Hash of transaction does not match hash in block"); @@ -879,7 +915,7 @@ namespace lws ); } - if (untrusted_daemon) + if (opts.untrusted_daemon) { const auto last_difficulty = pow_window.cumulative_diffs.empty() ? @@ -895,255 +931,130 @@ namespace lws } // for each block reader.reader = std::error_code{common_error::kInvalidArgument}; // cleanup reader before next write - auto updated = disk.update( - users.front().scan_height(), epee::to_span(blockchain), epee::to_span(users), epee::to_span(new_pow) - ); - if (!updated) - { - if (updated == lws::error::blockchain_reorg) - { - MINFO("Blockchain reorg detected, resetting state"); - return; - } - MONERO_THROW(updated.error(), "Failed to update accounts on disk"); - } + if (!store(client, epee::to_span(blockchain), epee::to_span(users), epee::to_span(new_pow), opts)) + return false; - if (untrusted_daemon && leader_thread && fetched->start_height % 4 == 0 && last_pow < db::block_id(fetched->start_height)) + // TODO + if (opts.untrusted_daemon && leader_thread && fetched->start_height % 4 == 0 && last_pow < db::block_id(fetched->start_height)) { MINFO("On chain with hash " << blockchain.back() << " and difficulty " << diff << " at height " << fetched->start_height); } - MINFO("Processed " << blocks.size() << " block(s) against " << users.size() << " account(s)"); - send_payment_hook(client, epee::to_span(updated->confirm_pubs), opts.webhook_verify); - send_spend_hook(client, epee::to_span(updated->spend_pubs), opts.webhook_verify); - if (updated->accounts_updated != users.size()) - { - MWARNING("Only updated " << updated->accounts_updated << " account(s) out of " << users.size() << ", resetting"); - return; - } - for (account& user : users) user.updated(db::block_id(fetched->start_height)); - - // Publish when all scan threads have past this block - if (!blockchain.empty() && client.has_publish()) - rpc::publish_scanned(client, blockchain.back(), epee::to_span(users)); } } - catch (std::exception const& e) - { - scanner::stop(); - MERROR(e.what()); - } - catch (...) - { - scanner::stop(); - MERROR("Unknown exception"); - } - } - - lws::account prep_account(db::storage_reader& reader, const lws::db::account& user) - { - std::vector> receives{}; - std::vector pubs{}; - auto receive_list = MONERO_UNWRAP(reader.get_outputs(user.id)); - const std::size_t elems = receive_list.count(); - receives.reserve(elems); - pubs.reserve(elems); - - for (auto output = receive_list.make_iterator(); !output.is_end(); ++output) - { - auto id = output.get_value(); - auto subaddr = output.get_value(); - receives.emplace_back(std::move(id), std::move(subaddr)); - pubs.emplace_back(output.get_value()); - } - - return lws::account{user, std::move(receives), std::move(pubs)}; - } + return false; + } // end scan_loop + namespace + { /*! Launches `thread_count` threads to run `scan_loop`, and then polls for active account changes in background */ - void check_loop(db::storage disk, rpc::context& ctx, std::size_t thread_count, std::vector users, std::vector active, const options opts) + void check_loop(scanner_sync& self, db::storage disk, rpc::context& ctx, const std::size_t thread_count, const std::string& lws_server_addr, std::string lws_server_pass, std::vector users, std::vector active, const scanner_options& opts) { - assert(0 < thread_count); - assert(0 < users.size()); + assert(users.size() == active.size()); + assert(thread_count || !lws_server_addr.empty()); + assert(!thread_count || !users.empty()); - thread_sync self{}; std::vector threads{}; + threads.reserve(thread_count); + + std::vector> queues; + queues.resize(thread_count); struct join_ { - thread_sync& self; - std::vector& threads; + scanner_sync& self; rpc::context& ctx; + std::vector>& queues; + std::vector& threads; ~join_() noexcept { - self.update = true; - ctx.raise_abort_scan(); + self.stop(); + if (self.has_shutdown()) + ctx.raise_abort_process(); + else + ctx.raise_abort_scan(); + + for (const auto& queue : queues) + { + if (queue) + queue->stop(); + } for (auto& thread : threads) thread.join(); } - } join{self, threads, ctx}; + } join{self, ctx, queues, threads}; /* The algorithm here is extremely basic. Users are divided evenly amongst the configurable thread count, and grouped by scan height. If an old account appears, some accounts (grouped on that thread) will be delayed in processing waiting for that account to catch up. Its not the greatest, - but this "will have to do" for the first cut. - Its not expected that many people will be running - "enterprise level" of nodes where accounts are constantly added. + but this "will have to do" - but we're getting closer to fixing that + too. Another "issue" is that each thread works independently instead of more cooperatively for scanning. This requires a bit more synchronization, so was left for later. Its likely worth doing to reduce the number of transfers from the daemon, and the bottleneck on the writes into LMDB. - - If the active user list changes, all threads are stopped/joined, and - everything is re-started. */ + self.stop_ = false; + boost::thread::attributes attrs; attrs.set_stack_size(THREAD_STACK_SIZE); - threads.reserve(thread_count); std::sort(users.begin(), users.end(), by_height{}); - // enable the new bind point before registering pull accounts - lws::rpc::account_push pusher = MONERO_UNWRAP(ctx.bind_push()); - - MINFO("Starting scan loops on " << std::min(thread_count, users.size()) << " thread(s) with " << users.size() << " account(s)"); + MINFO("Starting scan loops on " << thread_count << " thread(s) with " << users.size() << " account(s)"); - bool leader_thread = true; - bool remaining_threads = true; - while (!users.empty() && --thread_count) + for (std::size_t i = 0; i < queues.size(); ++i) { - const std::size_t per_thread = std::max(std::size_t(1), users.size() / (thread_count + 1)); - const std::size_t count = std::min(per_thread, users.size()); + queues[i] = std::make_shared(); + + // this can create threads with no active accounts, they just wait + const std::size_t count = users.size() / (queues.size() - i); std::vector thread_users{ std::make_move_iterator(users.end() - count), std::make_move_iterator(users.end()) }; users.erase(users.end() - count, users.end()); - rpc::client client = MONERO_UNWRAP(ctx.connect()); - MONERO_UNWRAP(client.watch_scan_signals()); - MONERO_UNWRAP(client.enable_pull_accounts()); - - auto data = std::make_shared( - std::move(client), disk.clone(), std::move(thread_users), opts - ); - threads.emplace_back(attrs, std::bind(&scan_loop, std::ref(self), std::move(data), opts.untrusted_daemon, leader_thread)); - leader_thread = false; - } - - if (!users.empty()) - { - rpc::client client = MONERO_UNWRAP(ctx.connect()); - MONERO_UNWRAP(client.watch_scan_signals()); - MONERO_UNWRAP(client.enable_pull_accounts()); - auto data = std::make_shared( - std::move(client), disk.clone(), std::move(users), opts + MONERO_UNWRAP(ctx.connect()), disk.clone(), std::move(thread_users), queues[i], opts ); - threads.emplace_back(attrs, std::bind(&scan_loop, std::ref(self), std::move(data), opts.untrusted_daemon, leader_thread)); - remaining_threads = false; + threads.emplace_back(attrs, std::bind(&do_scan_loop, std::ref(self), std::move(data), i == 0)); } - auto last_check = std::chrono::steady_clock::now(); - - lmdb::suspended_txn read_txn{}; - db::cursor::accounts accounts_cur{}; - boost::unique_lock lock{self.sync}; + users.clear(); + users.shrink_to_fit(); - while (scanner::is_running()) { - update_rates(ctx); - - for (;;) - { - //! \TODO use signalfd + ZMQ? Windows is the difficult case... - self.user_poll.wait_for(lock, boost::chrono::seconds{1}); - if (self.update || !scanner::is_running()) - return; - auto this_check = std::chrono::steady_clock::now(); - if (account_poll_interval <= (this_check - last_check)) - { - last_check = this_check; - break; - } - } - - auto reader = disk.start_read(std::move(read_txn)); - if (!reader) - { - if (reader.matches(std::errc::no_lock_available)) - { - MWARNING("Failed to open DB read handle, retrying later"); - continue; - } - MONERO_THROW(reader.error(), "Failed to open DB read handle"); - } - - auto current_users = MONERO_UNWRAP( - reader->get_accounts(db::account_status::active, std::move(accounts_cur)) + auto server = std::make_shared( + self.io_, + disk.clone(), + MONERO_UNWRAP(ctx.connect()), + queues, + std::move(active), + opts.webhook_verify ); - if (current_users.count() < active.size()) - { - // cannot remove accounts via ZMQ (yet) - MINFO("Decrease in active user accounts detected, stopping scan threads..."); - return; - } - std::vector active_copy = active; - std::vector new_; - for (auto user = current_users.make_iterator(); !user.is_end(); ++user) - { - const db::account_id user_id = user.get_value(); - const auto loc = std::lower_bound(active_copy.begin(), active_copy.end(), user_id); - if (loc == active_copy.end() || *loc != user_id) - { - new_.emplace_back(prep_account(*reader, user.get_value())); - active.insert( - std::lower_bound(active.begin(), active.end(), user_id), user_id - ); - } - else - active_copy.erase(loc); - } - if (!active_copy.empty()) - { - MINFO("Change in active user accounts detected, stopping scan threads..."); - return; - } - if (!new_.empty()) - { - if (remaining_threads) - { - MINFO("Received new account(s), starting more thread(s)"); - return; - } + rpc::scanner::server::start_user_checking(server); + if (!lws_server_addr.empty()) + rpc::scanner::server::start_acceptor(std::move(server), lws_server_addr, std::move(lws_server_pass)); + } - const auto pushed = pusher.push(epee::to_span(new_), std::chrono::seconds{1}); - if (!pushed) - { - MERROR("Failed to push new account to workers: " << pushed.error().message()); - return; // pull in new accounts by resetting state - } - else - MINFO("Pushed " << new_.size() << " new accounts to worker thread(s)"); - } - read_txn = reader->finish_read(); - accounts_cur = current_users.give_cursor(); - } // while scanning + // Blocks until sigint, local scanner issue, or exception + self.io_.run(); } template - expect fetch_chain(rpc::client& client, const char* endpoint, const Q& req) + expect fetch_chain(const scanner_sync& self, rpc::client& client, const char* endpoint, const Q& req) { expect sent{lws::error::daemon_timeout}; @@ -1152,7 +1063,7 @@ namespace lws while (!(sent = client.send(std::move(msg), std::chrono::seconds{1}))) { - if (!scanner::is_running()) + if (self.has_shutdown()) return {lws::error::signal_abort_process}; if (sync_rpc_timeout <= (std::chrono::steady_clock::now() - start)) @@ -1167,7 +1078,7 @@ namespace lws while (!(resp = client.get_message(std::chrono::seconds{1}))) { - if (!scanner::is_running()) + if (self.has_shutdown()) return {lws::error::signal_abort_process}; if (sync_rpc_timeout <= (std::chrono::steady_clock::now() - start)) @@ -1180,7 +1091,7 @@ namespace lws } // does not validate blockchain hashes - expect sync_quick(db::storage disk, rpc::client client) + expect sync_quick(const scanner_sync& self, db::storage disk, rpc::client client) { MINFO("Starting blockchain sync with daemon"); @@ -1193,7 +1104,7 @@ namespace lws if (req.known_hashes.empty()) return {lws::error::bad_blockchain}; - auto resp = fetch_chain(client, "get_hashes_fast", req); + auto resp = fetch_chain(self, client, "get_hashes_fast", req); if (!resp) return resp.error(); @@ -1219,7 +1130,7 @@ namespace lws } // validates blockchain hashes - expect sync_full(db::storage disk, rpc::client client) + expect sync_full(const scanner_sync& self, db::storage disk, rpc::client client) { MINFO("Starting blockchain sync with daemon"); @@ -1235,7 +1146,7 @@ namespace lws if (req.block_ids.empty()) return {lws::error::bad_blockchain}; - auto resp = fetch_chain(client, "get_blocks_fast", req); + auto resp = fetch_chain(self, client, "get_blocks_fast", req); if (!resp) return resp.error(); @@ -1297,7 +1208,7 @@ namespace lws pow_window.median_timestamps.erase(pow_window.median_timestamps.begin()); // longhash takes a while, check is_running - if (!scanner::is_running()) + if (self.has_shutdown()) return {error::signal_abort_process}; diff = cryptonote::next_difficulty(pow_window.pow_timestamps, pow_window.cumulative_diffs, get_target_time(height)); @@ -1342,37 +1253,119 @@ namespace lws } } // anonymous - expect scanner::sync(db::storage disk, rpc::client client, const bool untrusted_daemon) + bool user_data::store(db::storage& disk, rpc::client& client, const epee::span chain, const epee::span users, const epee::span pow, const scanner_options& opts) { + if (users.empty()) + return true; + if (!std::is_sorted(users.begin(), users.end(), by_height{})) + throw std::logic_error{"users must be sorted!"}; + + auto updated = disk.update(users[0].scan_height(), chain, users, pow); + if (!updated) + { + if (updated == lws::error::blockchain_reorg) + { + MINFO("Blockchain reorg detected, resetting state"); + return false; + } + MONERO_THROW(updated.error(), "Failed to update accounts on disk"); + } + + MINFO("Processed " << chain.size() << " block(s) against " << users.size() << " account(s)"); + send_payment_hook(client, epee::to_span(updated->confirm_pubs), opts.webhook_verify); + send_spend_hook(client, epee::to_span(updated->spend_pubs), opts.webhook_verify); + if (updated->accounts_updated != users.size()) + { + MWARNING("Only updated " << updated->accounts_updated << " account(s) out of " << users.size() << ", resetting"); + return false; + } + + // Publish when all scan threads have past this block + // only address is printed from users, so height doesn't need updating + if (!chain.empty() && client.has_publish()) + rpc::publish_scanned(client, chain[chain.size() - 1], epee::to_span(users)); + + return true; + } + + bool user_data::operator()(rpc::client& client, const epee::span chain, const epee::span users, const epee::span pow, const scanner_options& opts) + { + return store(disk_, client, chain, users, pow, opts); + } + + expect scanner::sync(rpc::client client, const bool untrusted_daemon) + { + if (has_shutdown()) + MONERO_THROW(common_error::kInvalidArgument, "this has shutdown"); if (untrusted_daemon) - return sync_full(std::move(disk), std::move(client)); - return sync_quick(std::move(disk), std::move(client)); + return sync_full(sync_, disk_.clone(), std::move(client)); + return sync_quick(sync_, disk_.clone(), std::move(client)); } - void scanner::run(db::storage disk, rpc::context ctx, std::size_t thread_count, const epee::net_utils::ssl_verification_t webhook_verify, const bool enable_subaddresses, const bool untrusted_daemon) + void scanner::run(rpc::context ctx, std::size_t thread_count, const std::string& lws_server_addr, std::string lws_server_pass, const scanner_options& opts) { - thread_count = std::max(std::size_t(1), thread_count); + if (has_shutdown()) + MONERO_THROW(common_error::kInvalidArgument, "this has shutdown"); + if (!lws_server_addr.empty() && (opts.enable_subaddresses || opts.untrusted_daemon)) + MONERO_THROW(error::configuration, "Cannot use remote scanner with subaddresses or untrusted daemon"); + + if (lws_server_addr.empty()) + thread_count = std::max(std::size_t(1), thread_count); + + /*! \NOTE Be careful about references and lifetimes of the callbacks. The + ones below are safe because no `io_service::run()` call is after the + destruction of the references. + + \NOTE That `ctx` will need a strand or lock if multiple + `io_service::run()` calls are used. */ + + boost::asio::steady_timer rate_timer{sync_.io_}; + class rate_updater + { + boost::asio::steady_timer& rate_timer_; + rpc::context& ctx_; + const std::chrono::minutes rate_interval_; + + public: + explicit rate_updater(boost::asio::steady_timer& rate_timer, rpc::context& ctx) + : rate_timer_(rate_timer), ctx_(ctx), rate_interval_(ctx.cache_interval()) + {} + + void operator()(const boost::system::error_code& error = {}) const + { + update_rates(ctx_); + rate_timer_.expires_from_now(rate_interval_); + rate_timer_.async_wait(*this); + } + + std::chrono::minutes rate_interval() const noexcept { return rate_interval_; } + }; + + { + rate_updater updater{rate_timer, ctx}; + if (std::chrono::minutes{0} < updater.rate_interval()) + updater(); + } rpc::client client{}; + for (;;) { - const auto last = std::chrono::steady_clock::now(); - update_rates(ctx); - std::vector active; std::vector users; + if (thread_count) { MINFO("Retrieving current active account list"); - auto reader = MONERO_UNWRAP(disk.start_read()); + auto reader = MONERO_UNWRAP(disk_.start_read()); auto accounts = MONERO_UNWRAP( reader.get_accounts(db::account_status::active) ); for (db::account user : accounts.make_range()) { - users.emplace_back(prep_account(reader, user)); + users.emplace_back(MONERO_UNWRAP(reader.get_full_account(user))); active.insert( std::lower_bound(active.begin(), active.end(), user.id), user.id ); @@ -1381,21 +1374,27 @@ namespace lws reader.finish_read(); } // cleanup DB reader - if (users.empty()) + if (thread_count && users.empty()) { MINFO("No active accounts"); - checked_wait(account_poll_interval - (std::chrono::steady_clock::now() - last)); + + boost::asio::steady_timer poll{sync_.io_}; + poll.expires_from_now(rpc::scanner::account_poll_interval); + poll.async_wait([] (boost::system::error_code) {}); + + sync_.io_.run_one(); } else - check_loop(disk.clone(), ctx, thread_count, std::move(users), std::move(active), options{webhook_verify, enable_subaddresses, untrusted_daemon}); + check_loop(sync_, disk_.clone(), ctx, thread_count, lws_server_addr, lws_server_pass, std::move(users), std::move(active), opts); - if (!scanner::is_running()) + sync_.io_.reset(); + if (has_shutdown()) return; if (!client) client = MONERO_UNWRAP(ctx.connect()); - expect synced = sync(disk.clone(), std::move(client), untrusted_daemon); + expect synced = sync(std::move(client), opts.untrusted_daemon); if (!synced) { if (!synced.matches(std::errc::timed_out)) diff --git a/src/scanner.h b/src/scanner.h index 0694f9f..45774fd 100644 --- a/src/scanner.h +++ b/src/scanner.h @@ -27,37 +27,109 @@ #pragma once #include +#include +#include #include #include #include +#include "db/fwd.h" #include "db/storage.h" #include "net/net_ssl.h" // monero/contrib/epee/include #include "rpc/client.h" +#include "rpc/scanner/fwd.h" +#include "span.h" // monero/contrib/epee/include namespace lws { + struct scanner_options + { + epee::net_utils::ssl_verification_t webhook_verify; + bool enable_subaddresses; + bool untrusted_daemon; + }; + + //! Used in `scan_loop` by server + class user_data + { + db::storage disk_; + + public: + user_data(db::storage disk) + : disk_(std::move(disk)) + {} + + user_data(user_data const& rhs) + : disk_(rhs.disk_.clone()) + {} + + user_data(user_data&& rhs) + : disk_(std::move(rhs.disk_)) + {} + + /*! Store updated accounts locally (`disk`), and send ZMQ/RMQ/webhook + events. `users` must be sorted by height (lowest first). */ + static bool store(db::storage& disk, rpc::client& zclient, epee::span chain, epee::span users, epee::span pow, const scanner_options&); + + //! `users` must be sorted by height (lowest first) + bool operator()(rpc::client& zclient, epee::span chain, epee::span users, epee::span pow, const scanner_options&); + }; + + struct scanner_sync + { + boost::asio::io_service io_; + std::atomic stop_; //!< Stop scanning but do not shutdown + std::atomic shutdown_; //!< Exit scanner::run + + explicit scanner_sync() + : io_(), stop_(false), shutdown_(false) + {} + + bool is_running() const noexcept { return !stop_ && !shutdown_; } + bool has_shutdown() const noexcept { return shutdown_; } + void stop() { stop_ = true; io_.stop(); } + void shutdown() { shutdown_ = true; stop(); } + }; + //! Scans all active `db::account`s. Detects if another process changes active list. class scanner { - static std::atomic running; - - scanner() = delete; + db::storage disk_; + scanner_sync sync_; + boost::asio::signal_set signals_; //!< Detect SIGINT requested shutdown public: + + //! Register `SIGINT` handler and keep a copy of `disk` + explicit scanner(db::storage disk); + ~scanner(); + + //! Callback for storing user account (typically local lmdb, but perhaps remote rpc) + using store_func = std::function, epee::span, epee::span, const scanner_options&)>; + + /*! Run _just_ the inner scanner loop while `self.is_running() == true`. + * + \throw std::exception on hard errors (shutdown) conditions + \return True iff `queue` indicates thread now has zero accounts. False + indicates a soft, typically recoverable error. */ + static bool loop(const std::atomic& stop, store_func store, std::optional disk, rpc::client client, std::vector users, rpc::scanner::queue& queue, const scanner_options& opts, bool leader_thread); + //! Use `client` to sync blockchain data, and \return client if successful. - static expect sync(db::storage disk, rpc::client client, const bool untrusted_daemon = false); + expect sync(rpc::client client, const bool untrusted_daemon = false); + + //! Poll daemon until `shutdown()` is called, using `thread_count` threads. + void run(rpc::context ctx, std::size_t thread_count, const std::string& server_addr, std::string server_pass, const scanner_options&); - //! Poll daemon until `stop()` is called, using `thread_count` threads. - static void run(db::storage disk, rpc::context ctx, std::size_t thread_count, epee::net_utils::ssl_verification_t webhook_verify, bool enable_subaddresses, bool untrusted_daemon = false); + //! \return True iff `stop()` and `shutdown()` has never been called + bool is_running() const noexcept { return sync_.is_running(); } - //! \return True if `stop()` has never been called. - static bool is_running() noexcept { return running; } + //! \return True if `shutdown()` has been been called. + bool has_shutdown() const noexcept { return sync_.has_shutdown(); } - //! Stops all scanner instances globally. - static void stop() noexcept { running = false; } + //! Stop scan threads, but do not shutdown scanner. + void stop() { sync_.stop(); } - //! For testing, \post is_running() == true - static void reset() noexcept { running = true; } + // Stop scan threads AND shutdown scanner. + void shutdown() { sync_.shutdown(); } }; } // lws diff --git a/src/server_main.cpp b/src/server_main.cpp index 1b99539..c8c1bda 100644 --- a/src/server_main.cpp +++ b/src/server_main.cpp @@ -66,6 +66,8 @@ namespace #endif const command_line::arg_descriptor> rest_servers; const command_line::arg_descriptor> admin_rest_servers; + const command_line::arg_descriptor lws_server_addr; + const command_line::arg_descriptor lws_server_pass; const command_line::arg_descriptor rest_ssl_key; const command_line::arg_descriptor rest_ssl_cert; const command_line::arg_descriptor rest_threads; @@ -111,6 +113,8 @@ namespace #endif , rest_servers{"rest-server", "[(https|http)://
:][/] for incoming connections, multiple declarations allowed"} , admin_rest_servers{"admin-rest-server", "[(https|http])://
:][/] for incoming admin connections, multiple declarations allowed"} + , lws_server_addr{"lws-server-addr", "[:] to listen for lws-clients", ""} + , lws_server_pass{"lws-server-pass", "Password for lws-clients connecting to server", ""} , rest_ssl_key{"rest-ssl-key", " to PEM formatted SSL key for https REST server", ""} , rest_ssl_cert{"rest-ssl-certificate", " to PEM formatted SSL certificate (chains supported) for https REST server", ""} , rest_threads{"rest-threads", "Number of threads to process REST connections", 1} @@ -135,6 +139,8 @@ namespace lws::options::prepare(description); command_line::add_arg(description, daemon_rpc); command_line::add_arg(description, daemon_sub); + command_line::add_arg(description, lws_server_addr); + command_line::add_arg(description, lws_server_pass); command_line::add_arg(description, zmq_pub); #ifdef MLWS_RMQ_ENABLED command_line::add_arg(description, rmq_address); @@ -167,6 +173,8 @@ namespace std::string db_path; std::vector rest_servers; std::vector admin_rest_servers; + std::string lws_server_addr; + std::string lws_server_pass; lws::rest_server::configuration rest_config; std::string daemon_rpc; std::string daemon_sub; @@ -236,11 +244,13 @@ namespace command_line::get_arg(args, opts.db_path), command_line::get_arg(args, opts.rest_servers), command_line::get_arg(args, opts.admin_rest_servers), + command_line::get_arg(args, opts.lws_server_addr), + command_line::get_arg(args, opts.lws_server_pass), lws::rest_server::configuration{ {command_line::get_arg(args, opts.rest_ssl_key), command_line::get_arg(args, opts.rest_ssl_cert)}, command_line::get_arg(args, opts.access_controls), command_line::get_arg(args, opts.rest_threads), - command_line::get_arg(args, opts.max_subaddresses), + command_line::get_arg(args, opts.max_subaddresses), webhook_verify, command_line::get_arg(args, opts.external_bind), command_line::get_arg(args, opts.disable_admin_auth), @@ -266,8 +276,12 @@ namespace command_line::get_arg(args, opts.untrusted_daemon) }; + if (!prog.lws_server_addr.empty() && (prog.rest_config.max_subaddresses || prog.untrusted_daemon)) + MONERO_THROW(lws::error::configuration, "Remote scanning cannot be used with subaddresses or untrusted daemon"); + prog.rest_config.threads = std::max(std::size_t(1), prog.rest_config.threads); - prog.scan_threads = std::max(std::size_t(1), prog.scan_threads); + if (prog.lws_server_addr.empty()) + prog.scan_threads = std::max(std::size_t(1), prog.scan_threads); if (command_line::is_arg_defaulted(args, opts.daemon_rpc)) prog.daemon_rpc = options::get_default_zmq(); @@ -277,19 +291,20 @@ namespace void run(program prog) { - std::signal(SIGINT, [] (int) { lws::scanner::stop(); }); - boost::filesystem::create_directories(prog.db_path); auto disk = lws::db::storage::open(prog.db_path.c_str(), prog.create_queue_max); auto ctx = lws::rpc::context::make(std::move(prog.daemon_rpc), std::move(prog.daemon_sub), std::move(prog.zmq_pub), std::move(prog.rmq), prog.rates_interval, prog.untrusted_daemon); + //! SIGINT handle registered by `scanner` constructor + lws::scanner scanner{disk.clone()}; + MINFO("Using monerod ZMQ RPC at " << ctx.daemon_address()); - auto client = lws::scanner::sync(disk.clone(), ctx.connect().value(), prog.untrusted_daemon).value(); + auto client = scanner.sync(ctx.connect().value(), prog.untrusted_daemon).value(); const auto enable_subaddresses = bool(prog.rest_config.max_subaddresses); const auto webhook_verify = prog.rest_config.webhook_verify; lws::rest_server server{ - epee::to_span(prog.rest_servers), prog.admin_rest_servers, disk.clone(), std::move(client), std::move(prog.rest_config) + epee::to_span(prog.rest_servers), prog.admin_rest_servers, std::move(disk), std::move(client), std::move(prog.rest_config) }; for (const std::string& address : prog.rest_servers) MINFO("Listening for REST clients at " << address); @@ -297,7 +312,13 @@ namespace MINFO("Listening for REST admin clients at " << address); // blocks until SIGINT - lws::scanner::run(std::move(disk), std::move(ctx), prog.scan_threads, webhook_verify, enable_subaddresses, prog.untrusted_daemon); + scanner.run( + std::move(ctx), + prog.scan_threads, + std::move(prog.lws_server_addr), + std::move(prog.lws_server_pass), + lws::scanner_options{webhook_verify, enable_subaddresses, prog.untrusted_daemon} + ); } } // anonymous diff --git a/tests/unit/scanner.test.cpp b/tests/unit/scanner.test.cpp index a2cf8c7..724ac19 100644 --- a/tests/unit/scanner.test.cpp +++ b/tests/unit/scanner.test.cpp @@ -223,17 +223,23 @@ namespace } return out; } + + void scanner_thread(lws::scanner& scanner, void* ctx, const std::vector& reply) + { + struct stop_ + { + lws::scanner& scanner; + ~stop_() { scanner.shutdown(); }; + } stop{scanner}; + + lws_test::rpc_thread(ctx, reply); + } } // anonymous namespace lws_test { void rpc_thread(void* ctx, const std::vector& reply) { - struct stop_ - { - ~stop_() noexcept { lws::scanner::stop(); }; - } stop{}; - try { net::zmq::socket server{}; @@ -321,7 +327,6 @@ LWS_CASE("lws::scanner::sync and lws::scanner::run") SETUP("lws::rpc::context, ZMQ_REP Server, and lws::db::storage") { - lws::scanner::reset(); auto rpc = lws::rpc::context::make(lws_test::rpc_rendevous, {}, {}, {}, std::chrono::minutes{0}, false); @@ -345,9 +350,11 @@ LWS_CASE("lws::scanner::sync and lws::scanner::run") std::vector messages{}; messages.push_back(to_json_rpc(1)); - boost::thread server_thread(&lws_test::rpc_thread, rpc.zmq_context(), std::cref(messages)); + lws::scanner scanner{db.clone()}; + + boost::thread server_thread(&scanner_thread, std::ref(scanner), rpc.zmq_context(), std::cref(messages)); const join on_scope_exit{server_thread}; - EXPECT(!lws::scanner::sync(db.clone(), MONERO_UNWRAP(rpc.connect()))); + EXPECT(!scanner.sync(MONERO_UNWRAP(rpc.connect()))); lws_test::test_chain(lest_env, MONERO_UNWRAP(db.start_read()), last_block.id, hashes); } @@ -377,9 +384,10 @@ LWS_CASE("lws::scanner::sync and lws::scanner::run") lws_test::test_chain(lest_env, MONERO_UNWRAP(db.start_read()), last_block.id, {hashes.data(), 1}); { - boost::thread server_thread(&lws_test::rpc_thread, rpc.zmq_context(), std::cref(messages)); + lws::scanner scanner{db.clone()}; + boost::thread server_thread(&scanner_thread, std::ref(scanner), rpc.zmq_context(), std::cref(messages)); const join on_scope_exit{server_thread}; - EXPECT(lws::scanner::sync(db.clone(), MONERO_UNWRAP(rpc.connect()))); + EXPECT(scanner.sync(MONERO_UNWRAP(rpc.connect()))); lws_test::test_chain(lest_env, MONERO_UNWRAP(db.start_read()), last_block.id, epee::to_span(hashes)); } @@ -400,9 +408,10 @@ LWS_CASE("lws::scanner::sync and lws::scanner::run") message.hashes.resize(1); messages.push_back(daemon_response(message)); - boost::thread server_thread(&lws_test::rpc_thread, rpc.zmq_context(), std::cref(messages)); + lws::scanner scanner{db.clone()}; + boost::thread server_thread(&scanner_thread, std::ref(scanner), rpc.zmq_context(), std::cref(messages)); const join on_scope_exit{server_thread}; - EXPECT(lws::scanner::sync(db.clone(), MONERO_UNWRAP(rpc.connect()))); + EXPECT(scanner.sync(MONERO_UNWRAP(rpc.connect()))); lws_test::test_chain(lest_env, MONERO_UNWRAP(db.start_read()), last_block.id, epee::to_span(hashes)); } } @@ -507,15 +516,14 @@ LWS_CASE("lws::scanner::sync and lws::scanner::run") messages.push_back(daemon_response(hmessage)); { - boost::thread server_thread(&lws_test::rpc_thread, rpc.zmq_context(), std::cref(messages)); + lws::scanner scanner{db.clone()}; + boost::thread server_thread(&scanner_thread, std::ref(scanner), rpc.zmq_context(), std::cref(messages)); const join on_scope_exit{server_thread}; - EXPECT(lws::scanner::sync(db.clone(), MONERO_UNWRAP(rpc.connect()))); + EXPECT(scanner.sync(MONERO_UNWRAP(rpc.connect()))); lws_test::test_chain(lest_env, MONERO_UNWRAP(db.start_read()), last_block.id, epee::to_span(hashes)); } } - lws::scanner::reset(); - EXPECT(db.add_account(account, keys.m_view_secret_key)); EXPECT(db.add_account(account2, keys2.m_view_secret_key)); @@ -526,9 +534,13 @@ LWS_CASE("lws::scanner::sync and lws::scanner::run") bmessage.output_indices.resize(1); messages.push_back(daemon_response(bmessage)); { - boost::thread server_thread(&lws_test::rpc_thread, rpc.zmq_context(), std::cref(messages)); + static constexpr const lws::scanner_options opts{ + epee::net_utils::ssl_verification_t::none, true, false + }; + lws::scanner scanner{db.clone()}; + boost::thread server_thread(&scanner_thread, std::ref(scanner), rpc.zmq_context(), std::cref(messages)); const join on_scope_exit{server_thread}; - lws::scanner::run(db.clone(), std::move(rpc), 1, epee::net_utils::ssl_verification_t::none, true); + scanner.run(std::move(rpc), 1, {}, {}, opts); } hashes.push_back(cryptonote::get_block_hash(bmessage.blocks.back().block)); diff --git a/tests/unit/scanner.test.h b/tests/unit/scanner.test.h index e7611ec..067de88 100644 --- a/tests/unit/scanner.test.h +++ b/tests/unit/scanner.test.h @@ -27,6 +27,7 @@ #include #include "byte_slice.h" // monero/contrib/epee/include +#include "fwd.h" namespace lws_test {