Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move http callbacks logic out of node class #4816

Merged
merged 9 commits into from
Jan 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion nano/lib/interval.hpp
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
#pragma once

#include <chrono>
#include <mutex>

namespace nano
{
class interval
{
public:
bool elapsed (auto target)
bool elapse (auto target)
{
auto const now = std::chrono::steady_clock::now ();
if (now - last >= target)
Expand All @@ -21,4 +22,24 @@ class interval
private:
std::chrono::steady_clock::time_point last{ std::chrono::steady_clock::now () };
};

class interval_mt
{
public:
bool elapse (auto target)
{
std::lock_guard guard{ mutex };
auto const now = std::chrono::steady_clock::now ();
if (now - last >= target)
{
last = now;
return true;
}
return false;
}

private:
std::mutex mutex;
std::chrono::steady_clock::time_point last{ std::chrono::steady_clock::now () };
};
}
2 changes: 1 addition & 1 deletion nano/lib/logging_enums.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ enum class type
qt,
rpc,
rpc_connection,
rpc_callbacks,
http_callbacks,
rpc_request,
ipc,
ipc_server,
Expand Down
17 changes: 16 additions & 1 deletion nano/lib/stats_enums.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ enum class type
election,
election_cleanup,
election_vote,
http_callback,
http_callbacks,
http_callbacks_notified,
http_callbacks_ec,
ipc,
tcp,
tcp_server,
Expand Down Expand Up @@ -166,6 +168,8 @@ enum class detail
other,
drop,
queued,
error,
failed,

// processing queue
queue,
Expand Down Expand Up @@ -625,6 +629,17 @@ enum class detail
host_unreachable,
not_supported,

// http
error_resolving,
error_connecting,
error_sending,
error_completing,
bad_status,

// http_callbacks
block_confirmed,
large_backlog,

_last // Must be the last enum
};

Expand Down
3 changes: 3 additions & 0 deletions nano/lib/thread_roles.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,9 @@ std::string nano::thread_role::get_string (nano::thread_role::name role)
case nano::thread_role::name::monitor:
thread_role_name_string = "Monitor";
break;
case nano::thread_role::name::http_callbacks:
thread_role_name_string = "HTTP callbacks";
break;
default:
debug_assert (false && "nano::thread_role::get_string unhandled thread role");
}
Expand Down
1 change: 1 addition & 0 deletions nano/lib/thread_roles.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ enum class name
vote_router,
online_reps,
monitor,
http_callbacks,
};

std::string_view to_string (name);
Expand Down
2 changes: 1 addition & 1 deletion nano/lib/uniquer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class uniquer final

nano::lock_guard<nano::mutex> guard{ mutex };

if (cleanup_interval.elapsed (cleanup_cutoff))
if (cleanup_interval.elapse (cleanup_cutoff))
{
cleanup ();
}
Expand Down
2 changes: 2 additions & 0 deletions nano/node/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ add_library(
rep_tiers.cpp
request_aggregator.hpp
request_aggregator.cpp
rpc_callbacks.hpp
rpc_callbacks.cpp
scheduler/bucket.cpp
scheduler/bucket.hpp
scheduler/component.hpp
Expand Down
2 changes: 1 addition & 1 deletion nano/node/block_processor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ void nano::block_processor::run ()
}
}

if (log_interval.elapsed (15s))
if (log_interval.elapse (15s))
{
logger.info (nano::log::type::block_processor, "{} blocks (+ {} forced) in processing queue",
queue.size (),
Expand Down
2 changes: 1 addition & 1 deletion nano/node/bootstrap/bootstrap_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -727,7 +727,7 @@ void nano::bootstrap_service::cleanup_and_sync ()
tags_by_order.pop_front ();
}

if (sync_dependencies_interval.elapsed (60s))
if (sync_dependencies_interval.elapse (60s))
{
stats.inc (nano::stat::type::bootstrap, nano::stat::detail::sync_dependencies);
accounts.sync_dependencies ();
Expand Down
1 change: 1 addition & 0 deletions nano/node/fwd.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class recently_cemented_cache;
class recently_confirmed_cache;
class rep_crawler;
class rep_tiers;
class http_callbacks;
class telemetry;
class unchecked_map;
class stats;
Expand Down
2 changes: 1 addition & 1 deletion nano/node/local_block_broadcaster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ void nano::local_block_broadcaster::run ()
{
stats.inc (nano::stat::type::local_block_broadcaster, nano::stat::detail::loop);

if (cleanup_interval.elapsed (config.cleanup_interval))
if (cleanup_interval.elapse (config.cleanup_interval))
{
cleanup (lock);
debug_assert (lock.owns_lock ());
Expand Down
128 changes: 6 additions & 122 deletions nano/node/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include <nano/node/peer_history.hpp>
#include <nano/node/portmapping.hpp>
#include <nano/node/request_aggregator.hpp>
#include <nano/node/rpc_callbacks.hpp>
#include <nano/node/scheduler/component.hpp>
#include <nano/node/scheduler/hinted.hpp>
#include <nano/node/scheduler/manual.hpp>
Expand Down Expand Up @@ -193,6 +194,8 @@ nano::node::node (std::shared_ptr<boost::asio::io_context> io_ctx_a, std::filesy
peer_history{ *peer_history_impl },
monitor_impl{ std::make_unique<nano::monitor> (config.monitor, *this) },
monitor{ *monitor_impl },
http_callbacks_impl{ std::make_unique<nano::http_callbacks> (*this) },
http_callbacks{ *http_callbacks_impl },
startup_time{ std::chrono::steady_clock::now () },
node_seq{ seq }
{
Expand Down Expand Up @@ -250,66 +253,6 @@ nano::node::node (std::shared_ptr<boost::asio::io_context> io_ctx_a, std::filesy
network.disconnect_observer = [this] () {
observers.disconnect.notify ();
};
if (!config.callback_address.empty ())
{
observers.blocks.add ([this] (nano::election_status const & status_a, std::vector<nano::vote_with_weight_info> const & votes_a, nano::account const & account_a, nano::amount const & amount_a, bool is_state_send_a, bool is_state_epoch_a) {
auto block_a (status_a.winner);
if ((status_a.type == nano::election_status_type::active_confirmed_quorum || status_a.type == nano::election_status_type::active_confirmation_height))
{
auto node_l (shared_from_this ());
io_ctx.post ([node_l, block_a, account_a, amount_a, is_state_send_a, is_state_epoch_a] () {
boost::property_tree::ptree event;
event.add ("account", account_a.to_account ());
event.add ("hash", block_a->hash ().to_string ());
std::string block_text;
block_a->serialize_json (block_text);
event.add ("block", block_text);
event.add ("amount", amount_a.to_string_dec ());
if (is_state_send_a)
{
event.add ("is_send", is_state_send_a);
event.add ("subtype", "send");
}
// Subtype field
else if (block_a->type () == nano::block_type::state)
{
if (block_a->is_change ())
{
event.add ("subtype", "change");
}
else if (is_state_epoch_a)
{
debug_assert (amount_a == 0 && node_l->ledger.is_epoch_link (block_a->link_field ().value ()));
event.add ("subtype", "epoch");
}
else
{
event.add ("subtype", "receive");
}
}
std::stringstream ostream;
boost::property_tree::write_json (ostream, event);
ostream.flush ();
auto body (std::make_shared<std::string> (ostream.str ()));
auto address (node_l->config.callback_address);
auto port (node_l->config.callback_port);
auto target (std::make_shared<std::string> (node_l->config.callback_target));
auto resolver (std::make_shared<boost::asio::ip::tcp::resolver> (node_l->io_ctx));
resolver->async_resolve (boost::asio::ip::tcp::resolver::query (address, std::to_string (port)), [node_l, address, port, target, body, resolver] (boost::system::error_code const & ec, boost::asio::ip::tcp::resolver::iterator i_a) {
if (!ec)
{
node_l->do_rpc_callback (i_a, address, port, target, body, resolver);
}
else
{
node_l->logger.error (nano::log::type::rpc_callbacks, "Error resolving callback: {}:{} ({})", address, port, ec.message ());
node_l->stats.inc (nano::stat::type::error, nano::stat::detail::http_callback, nano::stat::dir::out);
}
});
});
}
});
}

observers.channel_connected.add ([this] (std::shared_ptr<nano::transport::channel> const & channel) {
network.send_keepalive_self (channel);
Expand Down Expand Up @@ -472,68 +415,6 @@ nano::node::~node ()
stop ();
}

// TODO: Move to a separate class
void nano::node::do_rpc_callback (boost::asio::ip::tcp::resolver::iterator i_a, std::string const & address, uint16_t port, std::shared_ptr<std::string> const & target, std::shared_ptr<std::string> const & body, std::shared_ptr<boost::asio::ip::tcp::resolver> const & resolver)
{
if (i_a != boost::asio::ip::tcp::resolver::iterator{})
{
auto node_l (shared_from_this ());
auto sock (std::make_shared<boost::asio::ip::tcp::socket> (node_l->io_ctx));
sock->async_connect (i_a->endpoint (), [node_l, target, body, sock, address, port, i_a, resolver] (boost::system::error_code const & ec) mutable {
if (!ec)
{
auto req (std::make_shared<boost::beast::http::request<boost::beast::http::string_body>> ());
req->method (boost::beast::http::verb::post);
req->target (*target);
req->version (11);
req->insert (boost::beast::http::field::host, address);
req->insert (boost::beast::http::field::content_type, "application/json");
req->body () = *body;
req->prepare_payload ();
boost::beast::http::async_write (*sock, *req, [node_l, sock, address, port, req, i_a, target, body, resolver] (boost::system::error_code const & ec, std::size_t bytes_transferred) mutable {
if (!ec)
{
auto sb (std::make_shared<boost::beast::flat_buffer> ());
auto resp (std::make_shared<boost::beast::http::response<boost::beast::http::string_body>> ());
boost::beast::http::async_read (*sock, *sb, *resp, [node_l, sb, resp, sock, address, port, i_a, target, body, resolver] (boost::system::error_code const & ec, std::size_t bytes_transferred) mutable {
if (!ec)
{
if (boost::beast::http::to_status_class (resp->result ()) == boost::beast::http::status_class::successful)
{
node_l->stats.inc (nano::stat::type::http_callback, nano::stat::detail::initiate, nano::stat::dir::out);
}
else
{
node_l->logger.error (nano::log::type::rpc_callbacks, "Callback to {}:{} failed [status: {}]", address, port, nano::util::to_str (resp->result ()));
node_l->stats.inc (nano::stat::type::error, nano::stat::detail::http_callback, nano::stat::dir::out);
}
}
else
{
node_l->logger.error (nano::log::type::rpc_callbacks, "Unable to complete callback: {}:{} ({})", address, port, ec.message ());
node_l->stats.inc (nano::stat::type::error, nano::stat::detail::http_callback, nano::stat::dir::out);
};
});
}
else
{
node_l->logger.error (nano::log::type::rpc_callbacks, "Unable to send callback: {}:{} ({})", address, port, ec.message ());
node_l->stats.inc (nano::stat::type::error, nano::stat::detail::http_callback, nano::stat::dir::out);
}
});
}
else
{
node_l->logger.error (nano::log::type::rpc_callbacks, "Unable to connect to callback address({}): {}:{} ({})", address, i_a->endpoint ().address ().to_string (), port, ec.message ());
node_l->stats.inc (nano::stat::type::error, nano::stat::detail::http_callback, nano::stat::dir::out);
++i_a;

node_l->do_rpc_callback (i_a, address, port, target, body, resolver);
}
});
}
}

bool nano::node::copy_with_compaction (std::filesystem::path const & destination)
{
return store.copy_db (destination);
Expand Down Expand Up @@ -677,6 +558,7 @@ void nano::node::start ()
vote_router.start ();
online_reps.start ();
monitor.start ();
http_callbacks.start ();

add_initial_peers ();
}
Expand Down Expand Up @@ -724,6 +606,7 @@ void nano::node::stop ()
message_processor.stop ();
network.stop ();
monitor.stop ();
http_callbacks.stop ();

bootstrap_workers.stop ();
wallet_workers.stop ();
Expand Down Expand Up @@ -1194,6 +1077,7 @@ nano::container_info nano::node::container_info () const
info.add ("bandwidth", outbound_limiter.container_info ());
info.add ("backlog_scan", backlog_scan.container_info ());
info.add ("bounded_backlog", backlog.container_info ());
info.add ("http_callbacks", http_callbacks.container_info ());
return info;
}

Expand Down
3 changes: 2 additions & 1 deletion nano/node/node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ class node final : public std::enable_shared_from_this<node>
bool block_confirmed_or_being_confirmed (nano::secure::transaction const &, nano::block_hash const &);
bool block_confirmed_or_being_confirmed (nano::block_hash const &);

void do_rpc_callback (boost::asio::ip::tcp::resolver::iterator i_a, std::string const &, uint16_t, std::shared_ptr<std::string> const &, std::shared_ptr<std::string> const &, std::shared_ptr<boost::asio::ip::tcp::resolver> const &);
bool online () const;
bool init_error () const;
std::pair<uint64_t, std::unordered_map<nano::account, nano::uint128_t>> get_bootstrap_weights () const;
Expand Down Expand Up @@ -201,6 +200,8 @@ class node final : public std::enable_shared_from_this<node>
nano::peer_history & peer_history;
std::unique_ptr<nano::monitor> monitor_impl;
nano::monitor & monitor;
std::unique_ptr<nano::http_callbacks> http_callbacks_impl;
nano::http_callbacks & http_callbacks;

public:
std::chrono::steady_clock::time_point const startup_time;
Expand Down
Loading
Loading