From 358f392992bab913fcfc8f195999c0751008c197 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Thu, 2 Jan 2025 13:52:13 +0100 Subject: [PATCH 1/9] Extract `rpc_callbacks` class --- nano/node/CMakeLists.txt | 2 + nano/node/fwd.hpp | 1 + nano/node/node.cpp | 125 +------------------------------- nano/node/node.hpp | 3 +- nano/node/rpc_callbacks.cpp | 139 ++++++++++++++++++++++++++++++++++++ nano/node/rpc_callbacks.hpp | 24 +++++++ 6 files changed, 171 insertions(+), 123 deletions(-) create mode 100644 nano/node/rpc_callbacks.cpp create mode 100644 nano/node/rpc_callbacks.hpp diff --git a/nano/node/CMakeLists.txt b/nano/node/CMakeLists.txt index 53241a3c9d..77eb01e82a 100644 --- a/nano/node/CMakeLists.txt +++ b/nano/node/CMakeLists.txt @@ -135,6 +135,8 @@ add_library( rep_tiers.cpp request_aggregator.hpp request_aggregator.cpp + rpc_callbacks.hpp + rpc_callbacks.cpp scheduler/bucket.cpp scheduler/bucket.hpp scheduler/component.hpp diff --git a/nano/node/fwd.hpp b/nano/node/fwd.hpp index 3a8b9a162b..9227249275 100644 --- a/nano/node/fwd.hpp +++ b/nano/node/fwd.hpp @@ -33,6 +33,7 @@ class recently_cemented_cache; class recently_confirmed_cache; class rep_crawler; class rep_tiers; +class rpc_callbacks; class telemetry; class unchecked_map; class stats; diff --git a/nano/node/node.cpp b/nano/node/node.cpp index b6838515fa..6c50715b44 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -30,6 +30,7 @@ #include #include #include +#include #include #include #include @@ -193,6 +194,8 @@ nano::node::node (std::shared_ptr io_ctx_a, std::filesy peer_history{ *peer_history_impl }, monitor_impl{ std::make_unique (config.monitor, *this) }, monitor{ *monitor_impl }, + rpc_callbacks_impl{ std::make_unique (*this) }, + rpc_callbacks{ *rpc_callbacks_impl }, startup_time{ std::chrono::steady_clock::now () }, node_seq{ seq } { @@ -250,66 +253,6 @@ nano::node::node (std::shared_ptr io_ctx_a, std::filesy network.disconnect_observer = [this] () { observers.disconnect.notify (); }; - if (!config.callback_address.empty ()) - { - observers.blocks.add ([this] (nano::election_status const & status_a, std::vector const & votes_a, nano::account const & account_a, nano::amount const & amount_a, bool is_state_send_a, bool is_state_epoch_a) { - auto block_a (status_a.winner); - if ((status_a.type == nano::election_status_type::active_confirmed_quorum || status_a.type == nano::election_status_type::active_confirmation_height)) - { - auto node_l (shared_from_this ()); - io_ctx.post ([node_l, block_a, account_a, amount_a, is_state_send_a, is_state_epoch_a] () { - boost::property_tree::ptree event; - event.add ("account", account_a.to_account ()); - event.add ("hash", block_a->hash ().to_string ()); - std::string block_text; - block_a->serialize_json (block_text); - event.add ("block", block_text); - event.add ("amount", amount_a.to_string_dec ()); - if (is_state_send_a) - { - event.add ("is_send", is_state_send_a); - event.add ("subtype", "send"); - } - // Subtype field - else if (block_a->type () == nano::block_type::state) - { - if (block_a->is_change ()) - { - event.add ("subtype", "change"); - } - else if (is_state_epoch_a) - { - debug_assert (amount_a == 0 && node_l->ledger.is_epoch_link (block_a->link_field ().value ())); - event.add ("subtype", "epoch"); - } - else - { - event.add ("subtype", "receive"); - } - } - std::stringstream ostream; - boost::property_tree::write_json (ostream, event); - ostream.flush (); - auto body (std::make_shared (ostream.str ())); - auto address (node_l->config.callback_address); - auto port (node_l->config.callback_port); - auto target (std::make_shared (node_l->config.callback_target)); - auto resolver (std::make_shared (node_l->io_ctx)); - resolver->async_resolve (boost::asio::ip::tcp::resolver::query (address, std::to_string (port)), [node_l, address, port, target, body, resolver] (boost::system::error_code const & ec, boost::asio::ip::tcp::resolver::iterator i_a) { - if (!ec) - { - node_l->do_rpc_callback (i_a, address, port, target, body, resolver); - } - else - { - node_l->logger.error (nano::log::type::rpc_callbacks, "Error resolving callback: {}:{} ({})", address, port, ec.message ()); - node_l->stats.inc (nano::stat::type::error, nano::stat::detail::http_callback, nano::stat::dir::out); - } - }); - }); - } - }); - } observers.channel_connected.add ([this] (std::shared_ptr const & channel) { network.send_keepalive_self (channel); @@ -472,68 +415,6 @@ nano::node::~node () stop (); } -// TODO: Move to a separate class -void nano::node::do_rpc_callback (boost::asio::ip::tcp::resolver::iterator i_a, std::string const & address, uint16_t port, std::shared_ptr const & target, std::shared_ptr const & body, std::shared_ptr const & resolver) -{ - if (i_a != boost::asio::ip::tcp::resolver::iterator{}) - { - auto node_l (shared_from_this ()); - auto sock (std::make_shared (node_l->io_ctx)); - sock->async_connect (i_a->endpoint (), [node_l, target, body, sock, address, port, i_a, resolver] (boost::system::error_code const & ec) mutable { - if (!ec) - { - auto req (std::make_shared> ()); - req->method (boost::beast::http::verb::post); - req->target (*target); - req->version (11); - req->insert (boost::beast::http::field::host, address); - req->insert (boost::beast::http::field::content_type, "application/json"); - req->body () = *body; - req->prepare_payload (); - boost::beast::http::async_write (*sock, *req, [node_l, sock, address, port, req, i_a, target, body, resolver] (boost::system::error_code const & ec, std::size_t bytes_transferred) mutable { - if (!ec) - { - auto sb (std::make_shared ()); - auto resp (std::make_shared> ()); - boost::beast::http::async_read (*sock, *sb, *resp, [node_l, sb, resp, sock, address, port, i_a, target, body, resolver] (boost::system::error_code const & ec, std::size_t bytes_transferred) mutable { - if (!ec) - { - if (boost::beast::http::to_status_class (resp->result ()) == boost::beast::http::status_class::successful) - { - node_l->stats.inc (nano::stat::type::http_callback, nano::stat::detail::initiate, nano::stat::dir::out); - } - else - { - node_l->logger.error (nano::log::type::rpc_callbacks, "Callback to {}:{} failed [status: {}]", address, port, nano::util::to_str (resp->result ())); - node_l->stats.inc (nano::stat::type::error, nano::stat::detail::http_callback, nano::stat::dir::out); - } - } - else - { - node_l->logger.error (nano::log::type::rpc_callbacks, "Unable to complete callback: {}:{} ({})", address, port, ec.message ()); - node_l->stats.inc (nano::stat::type::error, nano::stat::detail::http_callback, nano::stat::dir::out); - }; - }); - } - else - { - node_l->logger.error (nano::log::type::rpc_callbacks, "Unable to send callback: {}:{} ({})", address, port, ec.message ()); - node_l->stats.inc (nano::stat::type::error, nano::stat::detail::http_callback, nano::stat::dir::out); - } - }); - } - else - { - node_l->logger.error (nano::log::type::rpc_callbacks, "Unable to connect to callback address({}): {}:{} ({})", address, i_a->endpoint ().address ().to_string (), port, ec.message ()); - node_l->stats.inc (nano::stat::type::error, nano::stat::detail::http_callback, nano::stat::dir::out); - ++i_a; - - node_l->do_rpc_callback (i_a, address, port, target, body, resolver); - } - }); - } -} - bool nano::node::copy_with_compaction (std::filesystem::path const & destination) { return store.copy_db (destination); diff --git a/nano/node/node.hpp b/nano/node/node.hpp index fc6581829a..5105d60409 100644 --- a/nano/node/node.hpp +++ b/nano/node/node.hpp @@ -82,7 +82,6 @@ class node final : public std::enable_shared_from_this bool block_confirmed_or_being_confirmed (nano::secure::transaction const &, nano::block_hash const &); bool block_confirmed_or_being_confirmed (nano::block_hash const &); - void do_rpc_callback (boost::asio::ip::tcp::resolver::iterator i_a, std::string const &, uint16_t, std::shared_ptr const &, std::shared_ptr const &, std::shared_ptr const &); bool online () const; bool init_error () const; std::pair> get_bootstrap_weights () const; @@ -201,6 +200,8 @@ class node final : public std::enable_shared_from_this nano::peer_history & peer_history; std::unique_ptr monitor_impl; nano::monitor & monitor; + std::unique_ptr rpc_callbacks_impl; + nano::rpc_callbacks & rpc_callbacks; public: std::chrono::steady_clock::time_point const startup_time; diff --git a/nano/node/rpc_callbacks.cpp b/nano/node/rpc_callbacks.cpp new file mode 100644 index 0000000000..1fa8ad0a2b --- /dev/null +++ b/nano/node/rpc_callbacks.cpp @@ -0,0 +1,139 @@ +#include +#include +#include +#include + +nano::rpc_callbacks::rpc_callbacks (nano::node & node_a) : + node{ node_a }, + config{ node_a.config }, + observers{ node_a.observers }, + ledger{ node_a.ledger }, + logger{ node_a.logger }, + stats{ node_a.stats } +{ + if (!config.callback_address.empty ()) + { + logger.info (nano::log::type::rpc_callbacks, "RPC callbacks enabled on {}:{}", config.callback_address, config.callback_port); + setup_callbacks (); + } +} + +void nano::rpc_callbacks::setup_callbacks () +{ + observers.blocks.add ([this] (nano::election_status const & status_a, std::vector const & votes_a, nano::account const & account_a, nano::amount const & amount_a, bool is_state_send_a, bool is_state_epoch_a) { + auto block_a (status_a.winner); + if ((status_a.type == nano::election_status_type::active_confirmed_quorum || status_a.type == nano::election_status_type::active_confirmation_height)) + { + node.workers.post ([this, block_a, account_a, amount_a, is_state_send_a, is_state_epoch_a] () { + boost::property_tree::ptree event; + event.add ("account", account_a.to_account ()); + event.add ("hash", block_a->hash ().to_string ()); + std::string block_text; + block_a->serialize_json (block_text); + event.add ("block", block_text); + event.add ("amount", amount_a.to_string_dec ()); + if (is_state_send_a) + { + event.add ("is_send", is_state_send_a); + event.add ("subtype", "send"); + } + // Subtype field + else if (block_a->type () == nano::block_type::state) + { + if (block_a->is_change ()) + { + event.add ("subtype", "change"); + } + else if (is_state_epoch_a) + { + debug_assert (amount_a == 0 && ledger.is_epoch_link (block_a->link_field ().value ())); + event.add ("subtype", "epoch"); + } + else + { + event.add ("subtype", "receive"); + } + } + std::stringstream ostream; + boost::property_tree::write_json (ostream, event); + ostream.flush (); + auto body (std::make_shared (ostream.str ())); + auto address (config.callback_address); + auto port (config.callback_port); + auto target (std::make_shared (config.callback_target)); + auto resolver (std::make_shared (node.io_ctx)); + resolver->async_resolve (boost::asio::ip::tcp::resolver::query (address, std::to_string (port)), [this, address, port, target, body, resolver] (boost::system::error_code const & ec, boost::asio::ip::tcp::resolver::iterator i_a) { + if (!ec) + { + do_rpc_callback (i_a, address, port, target, body, resolver); + } + else + { + logger.error (nano::log::type::rpc_callbacks, "Error resolving callback: {}:{} ({})", address, port, ec.message ()); + stats.inc (nano::stat::type::error, nano::stat::detail::http_callback, nano::stat::dir::out); + } + }); + }); + } + }); +} + +void nano::rpc_callbacks::do_rpc_callback (boost::asio::ip::tcp::resolver::iterator i_a, std::string const & address, uint16_t port, std::shared_ptr const & target, std::shared_ptr const & body, std::shared_ptr const & resolver) +{ + if (i_a != boost::asio::ip::tcp::resolver::iterator{}) + { + auto sock (std::make_shared (node.io_ctx)); + sock->async_connect (i_a->endpoint (), [this, target, body, sock, address, port, i_a, resolver] (boost::system::error_code const & ec) mutable { + if (!ec) + { + auto req (std::make_shared> ()); + req->method (boost::beast::http::verb::post); + req->target (*target); + req->version (11); + req->insert (boost::beast::http::field::host, address); + req->insert (boost::beast::http::field::content_type, "application/json"); + req->body () = *body; + req->prepare_payload (); + boost::beast::http::async_write (*sock, *req, [this, sock, address, port, req, i_a, target, body, resolver] (boost::system::error_code const & ec, std::size_t bytes_transferred) mutable { + if (!ec) + { + auto sb (std::make_shared ()); + auto resp (std::make_shared> ()); + boost::beast::http::async_read (*sock, *sb, *resp, [this, sb, resp, sock, address, port, i_a, target, body, resolver] (boost::system::error_code const & ec, std::size_t bytes_transferred) mutable { + if (!ec) + { + if (boost::beast::http::to_status_class (resp->result ()) == boost::beast::http::status_class::successful) + { + stats.inc (nano::stat::type::http_callback, nano::stat::detail::initiate, nano::stat::dir::out); + } + else + { + logger.error (nano::log::type::rpc_callbacks, "Callback to {}:{} failed [status: {}]", address, port, nano::util::to_str (resp->result ())); + stats.inc (nano::stat::type::error, nano::stat::detail::http_callback, nano::stat::dir::out); + } + } + else + { + logger.error (nano::log::type::rpc_callbacks, "Unable to complete callback: {}:{} ({})", address, port, ec.message ()); + stats.inc (nano::stat::type::error, nano::stat::detail::http_callback, nano::stat::dir::out); + }; + }); + } + else + { + logger.error (nano::log::type::rpc_callbacks, "Unable to send callback: {}:{} ({})", address, port, ec.message ()); + stats.inc (nano::stat::type::error, nano::stat::detail::http_callback, nano::stat::dir::out); + } + }); + } + else + { + logger.error (nano::log::type::rpc_callbacks, "Unable to connect to callback address({}): {}:{} ({})", address, i_a->endpoint ().address ().to_string (), port, ec.message ()); + stats.inc (nano::stat::type::error, nano::stat::detail::http_callback, nano::stat::dir::out); + ++i_a; + + do_rpc_callback (i_a, address, port, target, body, resolver); + } + }); + } +} \ No newline at end of file diff --git a/nano/node/rpc_callbacks.hpp b/nano/node/rpc_callbacks.hpp new file mode 100644 index 0000000000..1461a3d365 --- /dev/null +++ b/nano/node/rpc_callbacks.hpp @@ -0,0 +1,24 @@ +#pragma once + +#include + +namespace nano +{ +class rpc_callbacks +{ +public: + explicit rpc_callbacks (nano::node &); + +private: // Dependencies + nano::node_config const & config; + nano::node & node; + nano::node_observers & observers; + nano::ledger & ledger; + nano::logger & logger; + nano::stats & stats; + +private: + void setup_callbacks (); + void do_rpc_callback (boost::asio::ip::tcp::resolver::iterator i_a, std::string const &, uint16_t, std::shared_ptr const &, std::shared_ptr const &, std::shared_ptr const &); +}; +} \ No newline at end of file From 38a0dd6d97201139280dc583ae28ee3073374f36 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Thu, 2 Jan 2025 13:53:48 +0100 Subject: [PATCH 2/9] Comments --- nano/node/rpc_callbacks.cpp | 3 +++ 1 file changed, 3 insertions(+) diff --git a/nano/node/rpc_callbacks.cpp b/nano/node/rpc_callbacks.cpp index 1fa8ad0a2b..6dcc08075a 100644 --- a/nano/node/rpc_callbacks.cpp +++ b/nano/node/rpc_callbacks.cpp @@ -24,6 +24,7 @@ void nano::rpc_callbacks::setup_callbacks () auto block_a (status_a.winner); if ((status_a.type == nano::election_status_type::active_confirmed_quorum || status_a.type == nano::election_status_type::active_confirmation_height)) { + // It's OK to capture this by reference since workers are stopped before node destruction node.workers.post ([this, block_a, account_a, amount_a, is_state_send_a, is_state_epoch_a] () { boost::property_tree::ptree event; event.add ("account", account_a.to_account ()); @@ -62,6 +63,8 @@ void nano::rpc_callbacks::setup_callbacks () auto port (config.callback_port); auto target (std::make_shared (config.callback_target)); auto resolver (std::make_shared (node.io_ctx)); + + // It's OK to capture this by reference since io_context is stopped before node destruction resolver->async_resolve (boost::asio::ip::tcp::resolver::query (address, std::to_string (port)), [this, address, port, target, body, resolver] (boost::system::error_code const & ec, boost::asio::ip::tcp::resolver::iterator i_a) { if (!ec) { From c87f94bde71c6fcac48a5af0dd80e2fb8f3f5b60 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Thu, 2 Jan 2025 14:04:16 +0100 Subject: [PATCH 3/9] Cleanup --- nano/node/rpc_callbacks.cpp | 43 +++++++++++++++++++++---------------- 1 file changed, 25 insertions(+), 18 deletions(-) diff --git a/nano/node/rpc_callbacks.cpp b/nano/node/rpc_callbacks.cpp index 6dcc08075a..b6b519a4b5 100644 --- a/nano/node/rpc_callbacks.cpp +++ b/nano/node/rpc_callbacks.cpp @@ -21,7 +21,7 @@ nano::rpc_callbacks::rpc_callbacks (nano::node & node_a) : void nano::rpc_callbacks::setup_callbacks () { observers.blocks.add ([this] (nano::election_status const & status_a, std::vector const & votes_a, nano::account const & account_a, nano::amount const & amount_a, bool is_state_send_a, bool is_state_epoch_a) { - auto block_a (status_a.winner); + auto block_a = status_a.winner; if ((status_a.type == nano::election_status_type::active_confirmed_quorum || status_a.type == nano::election_status_type::active_confirmation_height)) { // It's OK to capture this by reference since workers are stopped before node destruction @@ -58,14 +58,15 @@ void nano::rpc_callbacks::setup_callbacks () std::stringstream ostream; boost::property_tree::write_json (ostream, event); ostream.flush (); - auto body (std::make_shared (ostream.str ())); - auto address (config.callback_address); - auto port (config.callback_port); - auto target (std::make_shared (config.callback_target)); - auto resolver (std::make_shared (node.io_ctx)); + auto body = std::make_shared (ostream.str ()); + auto address = config.callback_address; + auto port = config.callback_port; + auto target = std::make_shared (config.callback_target); + auto resolver = std::make_shared (node.io_ctx); // It's OK to capture this by reference since io_context is stopped before node destruction - resolver->async_resolve (boost::asio::ip::tcp::resolver::query (address, std::to_string (port)), [this, address, port, target, body, resolver] (boost::system::error_code const & ec, boost::asio::ip::tcp::resolver::iterator i_a) { + resolver->async_resolve (boost::asio::ip::tcp::resolver::query{ address, std::to_string (port) }, + [this, address, port, target, body, resolver] (boost::system::error_code const & ec, boost::asio::ip::tcp::resolver::iterator i_a) { if (!ec) { do_rpc_callback (i_a, address, port, target, body, resolver); @@ -81,15 +82,16 @@ void nano::rpc_callbacks::setup_callbacks () }); } -void nano::rpc_callbacks::do_rpc_callback (boost::asio::ip::tcp::resolver::iterator i_a, std::string const & address, uint16_t port, std::shared_ptr const & target, std::shared_ptr const & body, std::shared_ptr const & resolver) +void nano::rpc_callbacks::do_rpc_callback (boost::asio::ip::tcp::resolver::iterator i_a, std::string const & address, uint16_t port, +std::shared_ptr const & target, std::shared_ptr const & body, std::shared_ptr const & resolver) { if (i_a != boost::asio::ip::tcp::resolver::iterator{}) { - auto sock (std::make_shared (node.io_ctx)); + auto sock = std::make_shared (node.io_ctx); sock->async_connect (i_a->endpoint (), [this, target, body, sock, address, port, i_a, resolver] (boost::system::error_code const & ec) mutable { if (!ec) { - auto req (std::make_shared> ()); + auto req = std::make_shared> (); req->method (boost::beast::http::verb::post); req->target (*target); req->version (11); @@ -97,12 +99,14 @@ void nano::rpc_callbacks::do_rpc_callback (boost::asio::ip::tcp::resolver::itera req->insert (boost::beast::http::field::content_type, "application/json"); req->body () = *body; req->prepare_payload (); - boost::beast::http::async_write (*sock, *req, [this, sock, address, port, req, i_a, target, body, resolver] (boost::system::error_code const & ec, std::size_t bytes_transferred) mutable { + boost::beast::http::async_write (*sock, *req, + [this, sock, address, port, req, i_a, target, body, resolver] (boost::system::error_code const & ec, std::size_t bytes_transferred) mutable { if (!ec) { - auto sb (std::make_shared ()); - auto resp (std::make_shared> ()); - boost::beast::http::async_read (*sock, *sb, *resp, [this, sb, resp, sock, address, port, i_a, target, body, resolver] (boost::system::error_code const & ec, std::size_t bytes_transferred) mutable { + auto sb = std::make_shared (); + auto resp = std::make_shared> (); + boost::beast::http::async_read (*sock, *sb, *resp, + [this, sb, resp, sock, address, port, i_a, target, body, resolver] (boost::system::error_code const & ec, std::size_t bytes_transferred) mutable { if (!ec) { if (boost::beast::http::to_status_class (resp->result ()) == boost::beast::http::status_class::successful) @@ -111,15 +115,17 @@ void nano::rpc_callbacks::do_rpc_callback (boost::asio::ip::tcp::resolver::itera } else { - logger.error (nano::log::type::rpc_callbacks, "Callback to {}:{} failed [status: {}]", address, port, nano::util::to_str (resp->result ())); + logger.error (nano::log::type::rpc_callbacks, "Callback to {}:{} failed [status: {}]", + address, port, nano::util::to_str (resp->result ())); stats.inc (nano::stat::type::error, nano::stat::detail::http_callback, nano::stat::dir::out); } } else { - logger.error (nano::log::type::rpc_callbacks, "Unable to complete callback: {}:{} ({})", address, port, ec.message ()); + logger.error (nano::log::type::rpc_callbacks, "Unable to complete callback: {}:{} ({})", + address, port, ec.message ()); stats.inc (nano::stat::type::error, nano::stat::detail::http_callback, nano::stat::dir::out); - }; + } }); } else @@ -131,7 +137,8 @@ void nano::rpc_callbacks::do_rpc_callback (boost::asio::ip::tcp::resolver::itera } else { - logger.error (nano::log::type::rpc_callbacks, "Unable to connect to callback address({}): {}:{} ({})", address, i_a->endpoint ().address ().to_string (), port, ec.message ()); + logger.error (nano::log::type::rpc_callbacks, "Unable to connect to callback address({}): {}:{} ({})", + address, i_a->endpoint ().address ().to_string (), port, ec.message ()); stats.inc (nano::stat::type::error, nano::stat::detail::http_callback, nano::stat::dir::out); ++i_a; From 0a5d18158248a3ec698b0e5915f2de4577b7f3ba Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Thu, 2 Jan 2025 14:08:55 +0100 Subject: [PATCH 4/9] Cleanup --- nano/node/rpc_callbacks.cpp | 64 ++++++++++++++++++++++++++++++------- 1 file changed, 53 insertions(+), 11 deletions(-) diff --git a/nano/node/rpc_callbacks.cpp b/nano/node/rpc_callbacks.cpp index b6b519a4b5..58f9752f8c 100644 --- a/nano/node/rpc_callbacks.cpp +++ b/nano/node/rpc_callbacks.cpp @@ -11,6 +11,7 @@ nano::rpc_callbacks::rpc_callbacks (nano::node & node_a) : logger{ node_a.logger }, stats{ node_a.stats } { + // Only set up callbacks if a callback address is configured if (!config.callback_address.empty ()) { logger.info (nano::log::type::rpc_callbacks, "RPC callbacks enabled on {}:{}", config.callback_address, config.callback_port); @@ -20,12 +21,22 @@ nano::rpc_callbacks::rpc_callbacks (nano::node & node_a) : void nano::rpc_callbacks::setup_callbacks () { - observers.blocks.add ([this] (nano::election_status const & status_a, std::vector const & votes_a, nano::account const & account_a, nano::amount const & amount_a, bool is_state_send_a, bool is_state_epoch_a) { + // Add observer for block confirmations + observers.blocks.add ([this] (nano::election_status const & status_a, + std::vector const & votes_a, + nano::account const & account_a, + nano::amount const & amount_a, + bool is_state_send_a, + bool is_state_epoch_a) { auto block_a = status_a.winner; + + // Only process blocks that have achieved quorum or confirmation height if ((status_a.type == nano::election_status_type::active_confirmed_quorum || status_a.type == nano::election_status_type::active_confirmation_height)) { - // It's OK to capture this by reference since workers are stopped before node destruction + // Post callback processing to worker thread + // Safe to capture 'this' by reference as workers are stopped before node destruction node.workers.post ([this, block_a, account_a, amount_a, is_state_send_a, is_state_epoch_a] () { + // Construct the callback payload as a property tree boost::property_tree::ptree event; event.add ("account", account_a.to_account ()); event.add ("hash", block_a->hash ().to_string ()); @@ -33,12 +44,15 @@ void nano::rpc_callbacks::setup_callbacks () block_a->serialize_json (block_text); event.add ("block", block_text); event.add ("amount", amount_a.to_string_dec ()); + + // Add transaction type information if (is_state_send_a) { event.add ("is_send", is_state_send_a); event.add ("subtype", "send"); } - // Subtype field + + // Handle different state block subtypes else if (block_a->type () == nano::block_type::state) { if (block_a->is_change ()) @@ -55,18 +69,24 @@ void nano::rpc_callbacks::setup_callbacks () event.add ("subtype", "receive"); } } + + // Serialize the event to JSON std::stringstream ostream; boost::property_tree::write_json (ostream, event); ostream.flush (); + + // Prepare callback request parameters auto body = std::make_shared (ostream.str ()); auto address = config.callback_address; auto port = config.callback_port; auto target = std::make_shared (config.callback_target); auto resolver = std::make_shared (node.io_ctx); - // It's OK to capture this by reference since io_context is stopped before node destruction + // Resolve the callback address + // Safe to capture 'this' as io_context is stopped before node destruction resolver->async_resolve (boost::asio::ip::tcp::resolver::query{ address, std::to_string (port) }, - [this, address, port, target, body, resolver] (boost::system::error_code const & ec, boost::asio::ip::tcp::resolver::iterator i_a) { + [this, address, port, target, body, resolver] (boost::system::error_code const & ec, + boost::asio::ip::tcp::resolver::iterator i_a) { if (!ec) { do_rpc_callback (i_a, address, port, target, body, resolver); @@ -82,15 +102,28 @@ void nano::rpc_callbacks::setup_callbacks () }); } -void nano::rpc_callbacks::do_rpc_callback (boost::asio::ip::tcp::resolver::iterator i_a, std::string const & address, uint16_t port, -std::shared_ptr const & target, std::shared_ptr const & body, std::shared_ptr const & resolver) +/** + * Performs the actual RPC callback HTTP request + * Handles connection establishment, request sending, and response processing + * Includes retry logic for failed connection attempts + */ +void nano::rpc_callbacks::do_rpc_callback (boost::asio::ip::tcp::resolver::iterator i_a, +std::string const & address, +uint16_t port, +std::shared_ptr const & target, +std::shared_ptr const & body, +std::shared_ptr const & resolver) { + // Check if we have more endpoints to try if (i_a != boost::asio::ip::tcp::resolver::iterator{}) { + // Create socket and attempt connection auto sock = std::make_shared (node.io_ctx); - sock->async_connect (i_a->endpoint (), [this, target, body, sock, address, port, i_a, resolver] (boost::system::error_code const & ec) mutable { + sock->async_connect (i_a->endpoint (), + [this, target, body, sock, address, port, i_a, resolver] (boost::system::error_code const & ec) mutable { if (!ec) { + // Connection successful, prepare and send HTTP request auto req = std::make_shared> (); req->method (boost::beast::http::verb::post); req->target (*target); @@ -99,16 +132,24 @@ std::shared_ptr const & target, std::shared_ptr const req->insert (boost::beast::http::field::content_type, "application/json"); req->body () = *body; req->prepare_payload (); + + // Send the HTTP request boost::beast::http::async_write (*sock, *req, - [this, sock, address, port, req, i_a, target, body, resolver] (boost::system::error_code const & ec, std::size_t bytes_transferred) mutable { + [this, sock, address, port, req, i_a, target, body, resolver] ( + boost::system::error_code const & ec, std::size_t bytes_transferred) mutable { if (!ec) { + // Request sent successfully, prepare to receive response auto sb = std::make_shared (); auto resp = std::make_shared> (); + + // Read the HTTP response boost::beast::http::async_read (*sock, *sb, *resp, - [this, sb, resp, sock, address, port, i_a, target, body, resolver] (boost::system::error_code const & ec, std::size_t bytes_transferred) mutable { + [this, sb, resp, sock, address, port, i_a, target, body, resolver] ( + boost::system::error_code const & ec, std::size_t bytes_transferred) mutable { if (!ec) { + // Check response status if (boost::beast::http::to_status_class (resp->result ()) == boost::beast::http::status_class::successful) { stats.inc (nano::stat::type::http_callback, nano::stat::detail::initiate, nano::stat::dir::out); @@ -137,11 +178,12 @@ std::shared_ptr const & target, std::shared_ptr const } else { + // Connection failed, try next endpoint if available logger.error (nano::log::type::rpc_callbacks, "Unable to connect to callback address({}): {}:{} ({})", address, i_a->endpoint ().address ().to_string (), port, ec.message ()); stats.inc (nano::stat::type::error, nano::stat::detail::http_callback, nano::stat::dir::out); - ++i_a; + ++i_a; do_rpc_callback (i_a, address, port, target, body, resolver); } }); From b7d782f3a85bef96e46cd031dea7a60e255d1763 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Sat, 4 Jan 2025 13:18:16 +0100 Subject: [PATCH 5/9] Rename to `http_callbacks` --- nano/lib/logging_enums.hpp | 2 +- nano/node/fwd.hpp | 2 +- nano/node/node.cpp | 4 ++-- nano/node/node.hpp | 4 ++-- nano/node/rpc_callbacks.cpp | 18 +++++++++--------- nano/node/rpc_callbacks.hpp | 4 ++-- 6 files changed, 17 insertions(+), 17 deletions(-) diff --git a/nano/lib/logging_enums.hpp b/nano/lib/logging_enums.hpp index 64f520bc62..9ffab9fa71 100644 --- a/nano/lib/logging_enums.hpp +++ b/nano/lib/logging_enums.hpp @@ -39,7 +39,7 @@ enum class type qt, rpc, rpc_connection, - rpc_callbacks, + http_callbacks, rpc_request, ipc, ipc_server, diff --git a/nano/node/fwd.hpp b/nano/node/fwd.hpp index 9227249275..8aa4674085 100644 --- a/nano/node/fwd.hpp +++ b/nano/node/fwd.hpp @@ -33,7 +33,7 @@ class recently_cemented_cache; class recently_confirmed_cache; class rep_crawler; class rep_tiers; -class rpc_callbacks; +class http_callbacks; class telemetry; class unchecked_map; class stats; diff --git a/nano/node/node.cpp b/nano/node/node.cpp index 6c50715b44..9fbec5eb60 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -194,8 +194,8 @@ nano::node::node (std::shared_ptr io_ctx_a, std::filesy peer_history{ *peer_history_impl }, monitor_impl{ std::make_unique (config.monitor, *this) }, monitor{ *monitor_impl }, - rpc_callbacks_impl{ std::make_unique (*this) }, - rpc_callbacks{ *rpc_callbacks_impl }, + http_callbacks_impl{ std::make_unique (*this) }, + http_callbacks{ *http_callbacks_impl }, startup_time{ std::chrono::steady_clock::now () }, node_seq{ seq } { diff --git a/nano/node/node.hpp b/nano/node/node.hpp index 5105d60409..f25098be8e 100644 --- a/nano/node/node.hpp +++ b/nano/node/node.hpp @@ -200,8 +200,8 @@ class node final : public std::enable_shared_from_this nano::peer_history & peer_history; std::unique_ptr monitor_impl; nano::monitor & monitor; - std::unique_ptr rpc_callbacks_impl; - nano::rpc_callbacks & rpc_callbacks; + std::unique_ptr http_callbacks_impl; + nano::http_callbacks & http_callbacks; public: std::chrono::steady_clock::time_point const startup_time; diff --git a/nano/node/rpc_callbacks.cpp b/nano/node/rpc_callbacks.cpp index 58f9752f8c..08b95b1c95 100644 --- a/nano/node/rpc_callbacks.cpp +++ b/nano/node/rpc_callbacks.cpp @@ -3,7 +3,7 @@ #include #include -nano::rpc_callbacks::rpc_callbacks (nano::node & node_a) : +nano::http_callbacks::http_callbacks (nano::node & node_a) : node{ node_a }, config{ node_a.config }, observers{ node_a.observers }, @@ -14,12 +14,12 @@ nano::rpc_callbacks::rpc_callbacks (nano::node & node_a) : // Only set up callbacks if a callback address is configured if (!config.callback_address.empty ()) { - logger.info (nano::log::type::rpc_callbacks, "RPC callbacks enabled on {}:{}", config.callback_address, config.callback_port); + logger.info (nano::log::type::http_callbacks, "Callbacks enabled on {}:{}", config.callback_address, config.callback_port); setup_callbacks (); } } -void nano::rpc_callbacks::setup_callbacks () +void nano::http_callbacks::setup_callbacks () { // Add observer for block confirmations observers.blocks.add ([this] (nano::election_status const & status_a, @@ -93,7 +93,7 @@ void nano::rpc_callbacks::setup_callbacks () } else { - logger.error (nano::log::type::rpc_callbacks, "Error resolving callback: {}:{} ({})", address, port, ec.message ()); + logger.error (nano::log::type::http_callbacks, "Error resolving callback: {}:{} ({})", address, port, ec.message ()); stats.inc (nano::stat::type::error, nano::stat::detail::http_callback, nano::stat::dir::out); } }); @@ -107,7 +107,7 @@ void nano::rpc_callbacks::setup_callbacks () * Handles connection establishment, request sending, and response processing * Includes retry logic for failed connection attempts */ -void nano::rpc_callbacks::do_rpc_callback (boost::asio::ip::tcp::resolver::iterator i_a, +void nano::http_callbacks::do_rpc_callback (boost::asio::ip::tcp::resolver::iterator i_a, std::string const & address, uint16_t port, std::shared_ptr const & target, @@ -156,14 +156,14 @@ std::shared_ptr const & resolver) } else { - logger.error (nano::log::type::rpc_callbacks, "Callback to {}:{} failed [status: {}]", + logger.error (nano::log::type::http_callbacks, "Callback to {}:{} failed [status: {}]", address, port, nano::util::to_str (resp->result ())); stats.inc (nano::stat::type::error, nano::stat::detail::http_callback, nano::stat::dir::out); } } else { - logger.error (nano::log::type::rpc_callbacks, "Unable to complete callback: {}:{} ({})", + logger.error (nano::log::type::http_callbacks, "Unable to complete callback: {}:{} ({})", address, port, ec.message ()); stats.inc (nano::stat::type::error, nano::stat::detail::http_callback, nano::stat::dir::out); } @@ -171,7 +171,7 @@ std::shared_ptr const & resolver) } else { - logger.error (nano::log::type::rpc_callbacks, "Unable to send callback: {}:{} ({})", address, port, ec.message ()); + logger.error (nano::log::type::http_callbacks, "Unable to send callback: {}:{} ({})", address, port, ec.message ()); stats.inc (nano::stat::type::error, nano::stat::detail::http_callback, nano::stat::dir::out); } }); @@ -179,7 +179,7 @@ std::shared_ptr const & resolver) else { // Connection failed, try next endpoint if available - logger.error (nano::log::type::rpc_callbacks, "Unable to connect to callback address({}): {}:{} ({})", + logger.error (nano::log::type::http_callbacks, "Unable to connect to callback address({}): {}:{} ({})", address, i_a->endpoint ().address ().to_string (), port, ec.message ()); stats.inc (nano::stat::type::error, nano::stat::detail::http_callback, nano::stat::dir::out); diff --git a/nano/node/rpc_callbacks.hpp b/nano/node/rpc_callbacks.hpp index 1461a3d365..cd565cc655 100644 --- a/nano/node/rpc_callbacks.hpp +++ b/nano/node/rpc_callbacks.hpp @@ -4,10 +4,10 @@ namespace nano { -class rpc_callbacks +class http_callbacks { public: - explicit rpc_callbacks (nano::node &); + explicit http_callbacks (nano::node &); private: // Dependencies nano::node_config const & config; From 5e47b802b364b5ec13e7597bd43464064f9622a5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Sat, 4 Jan 2025 13:29:41 +0100 Subject: [PATCH 6/9] Stats --- nano/lib/stats_enums.hpp | 16 +++++++++++++++- nano/node/rpc_callbacks.cpp | 28 ++++++++++++++++++++-------- nano/rpc_test/rpc.cpp | 4 ++-- 3 files changed, 37 insertions(+), 11 deletions(-) diff --git a/nano/lib/stats_enums.hpp b/nano/lib/stats_enums.hpp index 429741d2f3..630e0d1ea0 100644 --- a/nano/lib/stats_enums.hpp +++ b/nano/lib/stats_enums.hpp @@ -26,7 +26,9 @@ enum class type election, election_cleanup, election_vote, - http_callback, + http_callbacks, + http_callbacks_notified, + http_callbacks_ec, ipc, tcp, tcp_server, @@ -166,6 +168,8 @@ enum class detail other, drop, queued, + error, + failed, // processing queue queue, @@ -625,6 +629,16 @@ enum class detail host_unreachable, not_supported, + // http + error_resolving, + error_connecting, + error_sending, + error_completing, + bad_status, + + // http_callbacks + block_confirmed, + _last // Must be the last enum }; diff --git a/nano/node/rpc_callbacks.cpp b/nano/node/rpc_callbacks.cpp index 08b95b1c95..ea5ab4c27a 100644 --- a/nano/node/rpc_callbacks.cpp +++ b/nano/node/rpc_callbacks.cpp @@ -33,6 +33,8 @@ void nano::http_callbacks::setup_callbacks () // Only process blocks that have achieved quorum or confirmation height if ((status_a.type == nano::election_status_type::active_confirmed_quorum || status_a.type == nano::election_status_type::active_confirmation_height)) { + stats.inc (nano::stat::type::http_callbacks_notified, nano::stat::detail::block_confirmed); + // Post callback processing to worker thread // Safe to capture 'this' by reference as workers are stopped before node destruction node.workers.post ([this, block_a, account_a, amount_a, is_state_send_a, is_state_epoch_a] () { @@ -93,8 +95,10 @@ void nano::http_callbacks::setup_callbacks () } else { + stats.inc (nano::stat::type::http_callbacks, nano::stat::detail::error_resolving); + stats.inc (nano::stat::type::http_callbacks_ec, to_stat_detail (ec)); + logger.error (nano::log::type::http_callbacks, "Error resolving callback: {}:{} ({})", address, port, ec.message ()); - stats.inc (nano::stat::type::error, nano::stat::detail::http_callback, nano::stat::dir::out); } }); }); @@ -117,6 +121,8 @@ std::shared_ptr const & resolver) // Check if we have more endpoints to try if (i_a != boost::asio::ip::tcp::resolver::iterator{}) { + stats.inc (nano::stat::type::http_callbacks, nano::stat::detail::initiate); + // Create socket and attempt connection auto sock = std::make_shared (node.io_ctx); sock->async_connect (i_a->endpoint (), @@ -152,36 +158,42 @@ std::shared_ptr const & resolver) // Check response status if (boost::beast::http::to_status_class (resp->result ()) == boost::beast::http::status_class::successful) { - stats.inc (nano::stat::type::http_callback, nano::stat::detail::initiate, nano::stat::dir::out); + stats.inc (nano::stat::type::http_callbacks, nano::stat::detail::success); } else { + stats.inc (nano::stat::type::http_callbacks, nano::stat::detail::bad_status); + logger.error (nano::log::type::http_callbacks, "Callback to {}:{} failed [status: {}]", address, port, nano::util::to_str (resp->result ())); - stats.inc (nano::stat::type::error, nano::stat::detail::http_callback, nano::stat::dir::out); } } else { + stats.inc (nano::stat::type::http_callbacks, nano::stat::detail::error_completing); + stats.inc (nano::stat::type::http_callbacks_ec, to_stat_detail (ec)); + logger.error (nano::log::type::http_callbacks, "Unable to complete callback: {}:{} ({})", address, port, ec.message ()); - stats.inc (nano::stat::type::error, nano::stat::detail::http_callback, nano::stat::dir::out); } }); } else { + stats.inc (nano::stat::type::http_callbacks, nano::stat::detail::error_sending); + stats.inc (nano::stat::type::http_callbacks_ec, to_stat_detail (ec)); + logger.error (nano::log::type::http_callbacks, "Unable to send callback: {}:{} ({})", address, port, ec.message ()); - stats.inc (nano::stat::type::error, nano::stat::detail::http_callback, nano::stat::dir::out); } }); } - else + else // Connection failed, try next endpoint if available { - // Connection failed, try next endpoint if available + stats.inc (nano::stat::type::http_callbacks, nano::stat::detail::error_connecting); + stats.inc (nano::stat::type::http_callbacks_ec, to_stat_detail (ec)); + logger.error (nano::log::type::http_callbacks, "Unable to connect to callback address({}): {}:{} ({})", address, i_a->endpoint ().address ().to_string (), port, ec.message ()); - stats.inc (nano::stat::type::error, nano::stat::detail::http_callback, nano::stat::dir::out); ++i_a; do_rpc_callback (i_a, address, port, target, body, resolver); diff --git a/nano/rpc_test/rpc.cpp b/nano/rpc_test/rpc.cpp index 82c9837d5b..eecaf6430c 100644 --- a/nano/rpc_test/rpc.cpp +++ b/nano/rpc_test/rpc.cpp @@ -5252,7 +5252,7 @@ TEST (rpc, block_confirm_confirmed) auto transaction = node->ledger.tx_begin_read (); ASSERT_TRUE (node->ledger.confirmed.block_exists_or_pruned (transaction, nano::dev::genesis->hash ())); } - ASSERT_EQ (0, node->stats.count (nano::stat::type::error, nano::stat::detail::http_callback, nano::stat::dir::out)); + ASSERT_EQ (0, node->stats.count (nano::stat::type::http_callbacks_ec)); auto const rpc_ctx = add_rpc (system, node); boost::property_tree::ptree request; request.put ("action", "block_confirm"); @@ -5266,7 +5266,7 @@ TEST (rpc, block_confirm_confirmed) // Check callback // Callback result is error because callback target port isn't listening // Check for error count greater than zero as the address goes through DNS resolution and may make multiple attempts for multiple IPs per DNS - ASSERT_TIMELY (5s, node->stats.count (nano::stat::type::error, nano::stat::detail::http_callback, nano::stat::dir::out) != 0); + ASSERT_TIMELY (5s, node->stats.count (nano::stat::type::http_callbacks_ec) != 0); } TEST (rpc, node_id) From 1b989270b1d91e460e2d84e2d5eff5879f951a9d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Sat, 4 Jan 2025 13:42:07 +0100 Subject: [PATCH 7/9] Use dedicated thread pool --- nano/lib/thread_roles.cpp | 3 +++ nano/lib/thread_roles.hpp | 1 + nano/node/node.cpp | 3 +++ nano/node/rpc_callbacks.cpp | 22 +++++++++++++++++++--- nano/node/rpc_callbacks.hpp | 8 ++++++++ 5 files changed, 34 insertions(+), 3 deletions(-) diff --git a/nano/lib/thread_roles.cpp b/nano/lib/thread_roles.cpp index ccb34be3bb..8b4c0314a9 100644 --- a/nano/lib/thread_roles.cpp +++ b/nano/lib/thread_roles.cpp @@ -190,6 +190,9 @@ std::string nano::thread_role::get_string (nano::thread_role::name role) case nano::thread_role::name::monitor: thread_role_name_string = "Monitor"; break; + case nano::thread_role::name::http_callbacks: + thread_role_name_string = "HTTP callbacks"; + break; default: debug_assert (false && "nano::thread_role::get_string unhandled thread role"); } diff --git a/nano/lib/thread_roles.hpp b/nano/lib/thread_roles.hpp index 66f7455d3e..349c02f1bd 100644 --- a/nano/lib/thread_roles.hpp +++ b/nano/lib/thread_roles.hpp @@ -68,6 +68,7 @@ enum class name vote_router, online_reps, monitor, + http_callbacks, }; std::string_view to_string (name); diff --git a/nano/node/node.cpp b/nano/node/node.cpp index 9fbec5eb60..fbdf7c7285 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -558,6 +558,7 @@ void nano::node::start () vote_router.start (); online_reps.start (); monitor.start (); + http_callbacks.start (); add_initial_peers (); } @@ -605,6 +606,7 @@ void nano::node::stop () message_processor.stop (); network.stop (); monitor.stop (); + http_callbacks.stop (); bootstrap_workers.stop (); wallet_workers.stop (); @@ -1075,6 +1077,7 @@ nano::container_info nano::node::container_info () const info.add ("bandwidth", outbound_limiter.container_info ()); info.add ("backlog_scan", backlog_scan.container_info ()); info.add ("bounded_backlog", backlog.container_info ()); + info.add ("http_callbacks", http_callbacks.container_info ()); return info; } diff --git a/nano/node/rpc_callbacks.cpp b/nano/node/rpc_callbacks.cpp index ea5ab4c27a..cb1a0be4f2 100644 --- a/nano/node/rpc_callbacks.cpp +++ b/nano/node/rpc_callbacks.cpp @@ -9,7 +9,8 @@ nano::http_callbacks::http_callbacks (nano::node & node_a) : observers{ node_a.observers }, ledger{ node_a.ledger }, logger{ node_a.logger }, - stats{ node_a.stats } + stats{ node_a.stats }, + workers{ 1, nano::thread_role::name::http_callbacks } { // Only set up callbacks if a callback address is configured if (!config.callback_address.empty ()) @@ -19,6 +20,21 @@ nano::http_callbacks::http_callbacks (nano::node & node_a) : } } +void nano::http_callbacks::start () +{ + workers.start (); +} + +void nano::http_callbacks::stop () +{ + workers.stop (); +} + +nano::container_info nano::http_callbacks::container_info () const +{ + return workers.container_info (); +} + void nano::http_callbacks::setup_callbacks () { // Add observer for block confirmations @@ -36,8 +52,8 @@ void nano::http_callbacks::setup_callbacks () stats.inc (nano::stat::type::http_callbacks_notified, nano::stat::detail::block_confirmed); // Post callback processing to worker thread - // Safe to capture 'this' by reference as workers are stopped before node destruction - node.workers.post ([this, block_a, account_a, amount_a, is_state_send_a, is_state_epoch_a] () { + // Safe to capture 'this' by reference as workers are stopped before this component destruction + workers.post ([this, block_a, account_a, amount_a, is_state_send_a, is_state_epoch_a] () { // Construct the callback payload as a property tree boost::property_tree::ptree event; event.add ("account", account_a.to_account ()); diff --git a/nano/node/rpc_callbacks.hpp b/nano/node/rpc_callbacks.hpp index cd565cc655..5e6359322c 100644 --- a/nano/node/rpc_callbacks.hpp +++ b/nano/node/rpc_callbacks.hpp @@ -1,5 +1,6 @@ #pragma once +#include #include namespace nano @@ -9,6 +10,11 @@ class http_callbacks public: explicit http_callbacks (nano::node &); + void start (); + void stop (); + + nano::container_info container_info () const; + private: // Dependencies nano::node_config const & config; nano::node & node; @@ -20,5 +26,7 @@ class http_callbacks private: void setup_callbacks (); void do_rpc_callback (boost::asio::ip::tcp::resolver::iterator i_a, std::string const &, uint16_t, std::shared_ptr const &, std::shared_ptr const &, std::shared_ptr const &); + + nano::thread_pool workers; }; } \ No newline at end of file From 1785fcf02896b37e40cc0ff476d14916251e841b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Sat, 4 Jan 2025 13:48:55 +0100 Subject: [PATCH 8/9] Warn about large backlog of notifications --- nano/lib/stats_enums.hpp | 1 + nano/node/rpc_callbacks.cpp | 10 ++++++++++ 2 files changed, 11 insertions(+) diff --git a/nano/lib/stats_enums.hpp b/nano/lib/stats_enums.hpp index 630e0d1ea0..752b49081a 100644 --- a/nano/lib/stats_enums.hpp +++ b/nano/lib/stats_enums.hpp @@ -638,6 +638,7 @@ enum class detail // http_callbacks block_confirmed, + large_backlog, _last // Must be the last enum }; diff --git a/nano/node/rpc_callbacks.cpp b/nano/node/rpc_callbacks.cpp index cb1a0be4f2..9fa05802c5 100644 --- a/nano/node/rpc_callbacks.cpp +++ b/nano/node/rpc_callbacks.cpp @@ -1,4 +1,5 @@ #include +#include #include #include #include @@ -51,6 +52,15 @@ void nano::http_callbacks::setup_callbacks () { stats.inc (nano::stat::type::http_callbacks_notified, nano::stat::detail::block_confirmed); + constexpr size_t warning_threshold = 10000; + static nano::interval warning_interval; + + if (workers.queued_tasks () > warning_threshold && warning_interval.elapsed (15s)) + { + stats.inc (nano::stat::type::http_callbacks, nano::stat::detail::large_backlog); + logger.warn (nano::log::type::http_callbacks, "Backlog of {} http callback notifications to process", workers.queued_tasks ()); + } + // Post callback processing to worker thread // Safe to capture 'this' by reference as workers are stopped before this component destruction workers.post ([this, block_a, account_a, amount_a, is_state_send_a, is_state_epoch_a] () { From c833b0298906d9dc132f0a6bb3b9012ad07471f9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Sat, 4 Jan 2025 14:26:51 +0100 Subject: [PATCH 9/9] Multithreaded interval utility --- nano/lib/interval.hpp | 23 ++++++++++++++++++++++- nano/lib/uniquer.hpp | 2 +- nano/node/block_processor.cpp | 2 +- nano/node/bootstrap/bootstrap_service.cpp | 2 +- nano/node/local_block_broadcaster.cpp | 2 +- nano/node/rpc_callbacks.cpp | 3 +-- nano/node/rpc_callbacks.hpp | 1 + nano/node/transport/tcp_listener.cpp | 2 +- nano/node/vote_cache.cpp | 2 +- 9 files changed, 30 insertions(+), 9 deletions(-) diff --git a/nano/lib/interval.hpp b/nano/lib/interval.hpp index a4b500fa64..776c6ce963 100644 --- a/nano/lib/interval.hpp +++ b/nano/lib/interval.hpp @@ -1,13 +1,14 @@ #pragma once #include +#include namespace nano { class interval { public: - bool elapsed (auto target) + bool elapse (auto target) { auto const now = std::chrono::steady_clock::now (); if (now - last >= target) @@ -21,4 +22,24 @@ class interval private: std::chrono::steady_clock::time_point last{ std::chrono::steady_clock::now () }; }; + +class interval_mt +{ +public: + bool elapse (auto target) + { + std::lock_guard guard{ mutex }; + auto const now = std::chrono::steady_clock::now (); + if (now - last >= target) + { + last = now; + return true; + } + return false; + } + +private: + std::mutex mutex; + std::chrono::steady_clock::time_point last{ std::chrono::steady_clock::now () }; +}; } \ No newline at end of file diff --git a/nano/lib/uniquer.hpp b/nano/lib/uniquer.hpp index 0a3a160e4c..6f3dc32e94 100644 --- a/nano/lib/uniquer.hpp +++ b/nano/lib/uniquer.hpp @@ -27,7 +27,7 @@ class uniquer final nano::lock_guard guard{ mutex }; - if (cleanup_interval.elapsed (cleanup_cutoff)) + if (cleanup_interval.elapse (cleanup_cutoff)) { cleanup (); } diff --git a/nano/node/block_processor.cpp b/nano/node/block_processor.cpp index dd72521c8c..be75b4be34 100644 --- a/nano/node/block_processor.cpp +++ b/nano/node/block_processor.cpp @@ -223,7 +223,7 @@ void nano::block_processor::run () } } - if (log_interval.elapsed (15s)) + if (log_interval.elapse (15s)) { logger.info (nano::log::type::block_processor, "{} blocks (+ {} forced) in processing queue", queue.size (), diff --git a/nano/node/bootstrap/bootstrap_service.cpp b/nano/node/bootstrap/bootstrap_service.cpp index 8f60f9924f..250afead5d 100644 --- a/nano/node/bootstrap/bootstrap_service.cpp +++ b/nano/node/bootstrap/bootstrap_service.cpp @@ -727,7 +727,7 @@ void nano::bootstrap_service::cleanup_and_sync () tags_by_order.pop_front (); } - if (sync_dependencies_interval.elapsed (60s)) + if (sync_dependencies_interval.elapse (60s)) { stats.inc (nano::stat::type::bootstrap, nano::stat::detail::sync_dependencies); accounts.sync_dependencies (); diff --git a/nano/node/local_block_broadcaster.cpp b/nano/node/local_block_broadcaster.cpp index 6248de0042..607fb9d257 100644 --- a/nano/node/local_block_broadcaster.cpp +++ b/nano/node/local_block_broadcaster.cpp @@ -127,7 +127,7 @@ void nano::local_block_broadcaster::run () { stats.inc (nano::stat::type::local_block_broadcaster, nano::stat::detail::loop); - if (cleanup_interval.elapsed (config.cleanup_interval)) + if (cleanup_interval.elapse (config.cleanup_interval)) { cleanup (lock); debug_assert (lock.owns_lock ()); diff --git a/nano/node/rpc_callbacks.cpp b/nano/node/rpc_callbacks.cpp index 9fa05802c5..499a816cdc 100644 --- a/nano/node/rpc_callbacks.cpp +++ b/nano/node/rpc_callbacks.cpp @@ -53,9 +53,8 @@ void nano::http_callbacks::setup_callbacks () stats.inc (nano::stat::type::http_callbacks_notified, nano::stat::detail::block_confirmed); constexpr size_t warning_threshold = 10000; - static nano::interval warning_interval; - if (workers.queued_tasks () > warning_threshold && warning_interval.elapsed (15s)) + if (workers.queued_tasks () > warning_threshold && warning_interval.elapse (15s)) { stats.inc (nano::stat::type::http_callbacks, nano::stat::detail::large_backlog); logger.warn (nano::log::type::http_callbacks, "Backlog of {} http callback notifications to process", workers.queued_tasks ()); diff --git a/nano/node/rpc_callbacks.hpp b/nano/node/rpc_callbacks.hpp index 5e6359322c..01093435e2 100644 --- a/nano/node/rpc_callbacks.hpp +++ b/nano/node/rpc_callbacks.hpp @@ -28,5 +28,6 @@ class http_callbacks void do_rpc_callback (boost::asio::ip::tcp::resolver::iterator i_a, std::string const &, uint16_t, std::shared_ptr const &, std::shared_ptr const &, std::shared_ptr const &); nano::thread_pool workers; + nano::interval_mt warning_interval; }; } \ No newline at end of file diff --git a/nano/node/transport/tcp_listener.cpp b/nano/node/transport/tcp_listener.cpp index 372d9bbbc6..e666b2dd6d 100644 --- a/nano/node/transport/tcp_listener.cpp +++ b/nano/node/transport/tcp_listener.cpp @@ -362,7 +362,7 @@ asio::awaitable nano::transport::tcp_listener::wait_available_slots () con nano::interval log_interval; while (connection_count () >= config.max_inbound_connections && !stopped) { - if (log_interval.elapsed (node.network_params.network.is_dev_network () ? 1s : 15s)) + if (log_interval.elapse (node.network_params.network.is_dev_network () ? 1s : 15s)) { logger.warn (nano::log::type::tcp_listener, "Waiting for available slots to accept new connections (current: {} / max: {})", connection_count (), config.max_inbound_connections); diff --git a/nano/node/vote_cache.cpp b/nano/node/vote_cache.cpp index 89ddd8e650..a2f264f7d7 100644 --- a/nano/node/vote_cache.cpp +++ b/nano/node/vote_cache.cpp @@ -238,7 +238,7 @@ std::deque nano::vote_cache::top (const nano::uint1 { nano::lock_guard lock{ mutex }; - if (cleanup_interval.elapsed (config.age_cutoff / 2)) + if (cleanup_interval.elapse (config.age_cutoff / 2)) { cleanup (); }