Skip to content

Commit

Permalink
Merge pull request libbitcoin#391 from evoskuil/master
Browse files Browse the repository at this point in the history
Add service suspension, tests.
  • Loading branch information
evoskuil authored May 6, 2024
2 parents 9b0a0e8 + 243444e commit 11891bd
Show file tree
Hide file tree
Showing 20 changed files with 180 additions and 26 deletions.
1 change: 1 addition & 0 deletions include/bitcoin/network/error.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ enum error_t : uint8_t
channel_inactive,
channel_stopped,
service_stopped,
service_suspended,
subscriber_exists,
subscriber_stopped,
desubscribed
Expand Down
5 changes: 4 additions & 1 deletion include/bitcoin/network/net/acceptor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#ifndef LIBBITCOIN_NETWORK_NET_ACCEPTOR_HPP
#define LIBBITCOIN_NETWORK_NET_ACCEPTOR_HPP

#include <atomic>
#include <functional>
#include <memory>
#include <bitcoin/system.hpp>
Expand Down Expand Up @@ -49,7 +50,8 @@ class BCT_API acceptor

/// Construct an instance.
acceptor(const logger& log, asio::strand& strand,
asio::io_context& service, const settings& settings) NOEXCEPT;
asio::io_context& service, const settings& settings,
std::atomic_bool& suspended) NOEXCEPT;

/// Asserts/logs stopped.
virtual ~acceptor() NOEXCEPT;
Expand Down Expand Up @@ -88,6 +90,7 @@ class BCT_API acceptor
const settings& settings_;
asio::io_context& service_;
asio::strand& strand_;
std::atomic_bool& suspended_;

// These are protected by strand.
asio::acceptor acceptor_;
Expand Down
5 changes: 4 additions & 1 deletion include/bitcoin/network/net/connector.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#ifndef LIBBITCOIN_NETWORK_NET_CONNECTOR_HPP
#define LIBBITCOIN_NETWORK_NET_CONNECTOR_HPP

#include <atomic>
#include <memory>
#include <bitcoin/system.hpp>
#include <bitcoin/network/async/async.hpp>
Expand Down Expand Up @@ -51,7 +52,8 @@ class BCT_API connector

/// Construct an instance.
connector(const logger& log, asio::strand& strand,
asio::io_context& service, const settings& settings) NOEXCEPT;
asio::io_context& service, const settings& settings,
std::atomic_bool& suspended) NOEXCEPT;

/// Asserts/logs stopped.
virtual ~connector() NOEXCEPT;
Expand Down Expand Up @@ -90,6 +92,7 @@ class BCT_API connector
const settings& settings_;
asio::io_context& service_;
asio::strand& strand_;
std::atomic_bool& suspended_;

// These are protected by strand.
asio::resolver resolver_;
Expand Down
15 changes: 15 additions & 0 deletions include/bitcoin/network/p2p.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,13 @@ class BCT_API p2p
virtual void connect(const config::endpoint& endpoint,
channel_notifier&& handler) NOEXCEPT;

/// Suspensions.
/// -----------------------------------------------------------------------

/// Suspend/resume all connections.
virtual void suspend() NOEXCEPT;
virtual void resume() NOEXCEPT;

/// Properties.
/// -----------------------------------------------------------------------

Expand Down Expand Up @@ -198,6 +205,12 @@ class BCT_API p2p
virtual acceptor::ptr create_acceptor() NOEXCEPT;
virtual connector::ptr create_connector() NOEXCEPT;

/// Suspend/resume inbound/outbound connections.
virtual void suspend_acceptors() NOEXCEPT;
virtual void resume_acceptors() NOEXCEPT;
virtual void suspend_connectors() NOEXCEPT;
virtual void resume_connectors() NOEXCEPT;

/// Register nonces for loopback (true implies found), require strand.
virtual bool store_nonce(const channel& channel) NOEXCEPT;
virtual bool unstore_nonce(const channel& channel) NOEXCEPT;
Expand Down Expand Up @@ -252,6 +265,8 @@ class BCT_API p2p
// These are thread safe.
const settings& settings_;
std::atomic_bool closed_{ false };
std::atomic_bool accept_suspended_{ false };
std::atomic_bool connect_suspended_{ false };
std::atomic<size_t> total_channel_count_{};
std::atomic<size_t> inbound_channel_count_{};

Expand Down
3 changes: 2 additions & 1 deletion include/bitcoin/network/sessions/session_outbound.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ class BCT_API session_outbound
void do_one(const code& ec, const config::address& peer, object_key key,
const race::ptr& racer, const connector::ptr& connector) NOEXCEPT;
void handle_one(const code& ec, const socket::ptr& socket,
object_key key, const race::ptr& racer) NOEXCEPT;
object_key key, const config::address& peer,
const race::ptr& racer) NOEXCEPT;
void handle_connect(const code& ec, const socket::ptr& socket,
object_key key) NOEXCEPT;

Expand Down
1 change: 1 addition & 0 deletions src/error.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ DEFINE_ERROR_T_MESSAGE_MAP(error)
{ channel_inactive, "channel inactive" },
{ channel_stopped, "channel stopped" },
{ service_stopped, "service stopped" },
{ service_suspended, "service suspended" },
{ subscriber_exists, "subscriber exists" },
{ subscriber_stopped, "subscriber stopped" },
{ desubscribed, "subscriber desubscribed" }
Expand Down
18 changes: 17 additions & 1 deletion src/net/acceptor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
#include <bitcoin/network/net/acceptor.hpp>

#include <atomic>
#include <functional>
#include <memory>
#include <utility>
Expand Down Expand Up @@ -49,10 +50,12 @@ inline asio::endpoint make_endpoint(bool enable_ipv6, uint16_t port) NOEXCEPT
// Calls are stranded to protect the acceptor member.

acceptor::acceptor(const logger& log, asio::strand& strand,
asio::io_context& service, const settings& settings) NOEXCEPT
asio::io_context& service, const settings& settings,
std::atomic_bool& suspended) NOEXCEPT
: settings_(settings),
service_(service),
strand_(strand),
suspended_(suspended),
acceptor_(strand_),
reporter(log),
tracker<acceptor>(log)
Expand Down Expand Up @@ -150,6 +153,12 @@ void acceptor::accept(socket_handler&& handler) NOEXCEPT
return;
}

if (suspended_.load())
{
handler(error::service_suspended, nullptr);
return;
}

// Create the socket.
const auto socket = std::make_shared<network::socket>(log, service_);

Expand All @@ -173,6 +182,13 @@ void acceptor::handle_accept(const code& ec, const socket::ptr& socket,
return;
}

if (suspended_.load())
{
socket->stop();
handler(error::service_suspended, nullptr);
return;
}

if (stopped_)
{
socket->stop();
Expand Down
27 changes: 26 additions & 1 deletion src/net/connector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
#include <bitcoin/network/net/connector.hpp>

#include <atomic>
#include <functional>
#include <memory>
#include <utility>
Expand All @@ -43,10 +44,12 @@ using namespace std::placeholders;
// ----------------------------------------------------------------------------

connector::connector(const logger& log, asio::strand& strand,
asio::io_context& service, const settings& settings) NOEXCEPT
asio::io_context& service, const settings& settings,
std::atomic_bool& suspended) NOEXCEPT
: settings_(settings),
service_(service),
strand_(strand),
suspended_(suspended),
resolver_(strand),
timer_(std::make_shared<deadline>(log, strand, settings.connect_timeout())),
reporter(log),
Expand Down Expand Up @@ -106,6 +109,12 @@ void connector::start(const std::string& hostname, uint16_t port,
return;
}

if (suspended_.load())
{
handler(error::service_suspended, nullptr);
return;
}

// Capture the handler.
racer_.start(std::move(handler));

Expand Down Expand Up @@ -139,6 +148,14 @@ void connector::handle_resolve(const error::boost_code& ec,
return;
}

if (suspended_.load())
{
timer_->stop();
socket->stop();
racer_.finish(error::service_suspended, nullptr);
return;
}

// Failure in resolve, it wins (with resolve failure).
if (ec)
{
Expand Down Expand Up @@ -179,6 +196,14 @@ void connector::handle_connect(const code& ec, const finish_ptr& finish,
return;
}

if (suspended_.load())
{
socket->stop();
timer_->stop();
racer_.finish(error::service_suspended, nullptr);
return;
}

// Failure in connect, connector wins (with connect failure).
if (ec)
{
Expand Down
39 changes: 37 additions & 2 deletions src/p2p.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,13 @@ p2p::~p2p() NOEXCEPT
acceptor::ptr p2p::create_acceptor() NOEXCEPT
{
return std::make_shared<acceptor>(log, strand(), service(),
network_settings());
network_settings(), accept_suspended_);
}

connector::ptr p2p::create_connector() NOEXCEPT
{
return std::make_shared<connector>(log, strand(), service(),
network_settings());
network_settings(), connect_suspended_);
}

connectors_ptr p2p::create_connectors(size_t count) NOEXCEPT
Expand Down Expand Up @@ -372,6 +372,41 @@ void p2p::do_connect_handled(const config::endpoint& endpoint,
handler(error::service_stopped, nullptr);
}

// Suspensions.
// ----------------------------------------------------------------------------

void p2p::suspend_acceptors() NOEXCEPT
{
accept_suspended_.store(true);
}

void p2p::resume_acceptors() NOEXCEPT
{
accept_suspended_.store(false);
}

void p2p::suspend_connectors() NOEXCEPT
{
connect_suspended_.store(true);
}

void p2p::resume_connectors() NOEXCEPT
{
connect_suspended_.store(false);
}

void p2p::suspend() NOEXCEPT
{
suspend_acceptors();
suspend_connectors();
}

void p2p::resume() NOEXCEPT
{
resume_acceptors();
resume_connectors();
}

// Properties.
// ----------------------------------------------------------------------------

Expand Down
7 changes: 7 additions & 0 deletions src/sessions/session_inbound.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,13 @@ void session_inbound::handle_accept(const code& ec,
return;
}

if (ec == error::service_suspended)
{
////LOGS("Suspended inbound channel start.");
defer(BIND(start_accept, _1, acceptor));
return;
}

// There was an error accepting the channel, so try again after delay.
if (ec)
{
Expand Down
7 changes: 7 additions & 0 deletions src/sessions/session_manual.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,13 @@ void session_manual::handle_connect(const code& ec, const socket::ptr& socket,
return;
}

if (ec == error::service_suspended)
{
////LOGS("Suspended manual channel start [" << peer << "].");
defer(BIND(start_connect, _1, peer, connector, handler));
return;
}

// There was an error connecting the channel, so try again after delay.
if (ec)
{
Expand Down
16 changes: 14 additions & 2 deletions src/sessions/session_outbound.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ void session_outbound::handle_started(const code& ec,
LOG_ONLY(const auto batch = settings().connect_batch_size;)
LOGN("Create " << peers << " connections " << batch << " at a time.");

// There is currently no way to vary the number of connections at runtime.
for (size_t peer = 0; peer < peers; ++peer)
start_connect(error::success);

Expand Down Expand Up @@ -164,16 +165,20 @@ void session_outbound::do_one(const code& ec, const config::address& peer,
return;
}

connector->connect(peer, BIND(handle_one, _1, _2, key, racer));
connector->connect(peer, BIND(handle_one, _1, _2, key, peer, racer));
}

// Handle each do_one connection attempt, stopping on first success.
void session_outbound::handle_one(const code& ec, const socket::ptr& socket,
object_key key, const race::ptr& racer) NOEXCEPT
object_key key, const config::address& peer,
const race::ptr& racer) NOEXCEPT
{
BC_ASSERT_MSG(stranded(), "strand");
////COUNT(events::outbound2, key);

if (ec == error::service_suspended)
restore(peer, BIND(handle_reclaim, _1));

// Winner in quality race is first to pass success.
if (racer->finish(ec, socket))
{
Expand Down Expand Up @@ -210,6 +215,13 @@ void session_outbound::handle_connect(const code& ec,
return;
}

if (ec == error::service_suspended)
{
////LOGS("Suspended outbound channel start.");
defer(BIND(start_connect, _1));
return;
}

// There was an error connecting a channel, so try again after delay.
if (ec)
{
Expand Down
9 changes: 9 additions & 0 deletions test/error.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,15 @@ BOOST_AUTO_TEST_CASE(error_t__code__service_stopped__true_exected_message)
BOOST_REQUIRE_EQUAL(ec.message(), "service stopped");
}

BOOST_AUTO_TEST_CASE(error_t__code__service_suspended__true_exected_message)
{
constexpr auto value = error::service_suspended;
const auto ec = code(value);
BOOST_REQUIRE(ec);
BOOST_REQUIRE(ec == value);
BOOST_REQUIRE_EQUAL(ec.message(), "service suspended");
}

BOOST_AUTO_TEST_CASE(error_t__code__subscriber_exists__true_exected_message)
{
constexpr auto value = error::subscriber_exists;
Expand Down
Loading

0 comments on commit 11891bd

Please sign in to comment.