Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Boost fiber #2076

Draft
wants to merge 14 commits into
base: develop
Choose a base branch
from
20 changes: 9 additions & 11 deletions libraries/app/api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@
#include <fc/crypto/base64.hpp>
#include <fc/crypto/hex.hpp>
#include <fc/rpc/api_connection.hpp>
#include <fc/thread/future.hpp>
#include <fc/thread/async.hpp>

#include <boost/fiber/future.hpp>

template class fc::api<graphene::app::block_api>;
template class fc::api<graphene::app::network_broadcast_api>;
Expand Down Expand Up @@ -187,12 +189,13 @@ namespace graphene { namespace app {

fc::variant network_broadcast_api::broadcast_transaction_synchronous(const precomputable_transaction& trx)
{
fc::promise<fc::variant>::ptr prom = fc::promise<fc::variant>::create();
broadcast_transaction_with_callback( [prom]( const fc::variant& v ){
prom->set_value(v);
boost::fibers::promise<fc::variant> prom;
boost::fibers::future<fc::variant> result = prom.get_future();
broadcast_transaction_with_callback( [&prom]( const fc::variant& v ) {
prom.set_value(v);
}, trx );

return fc::future<fc::variant>(prom).wait();
return result.get();
}

void network_broadcast_api::broadcast_block( const signed_block& b )
Expand Down Expand Up @@ -356,12 +359,7 @@ namespace graphene { namespace app {
if(_app.is_plugin_enabled("elasticsearch")) {
auto es = _app.get_plugin<elasticsearch::elasticsearch_plugin>("elasticsearch");
if(es.get()->get_running_mode() != elasticsearch::mode::only_save) {
if(!_app.elasticsearch_thread)
_app.elasticsearch_thread= std::make_shared<fc::thread>("elasticsearch");

return _app.elasticsearch_thread->async([&es, &account, &stop, &limit, &start]() {
return es->get_account_history(account, stop, limit, start);
}, "thread invoke for method " BOOST_PP_STRINGIZE(method_name)).wait();
return es->get_account_history(account, stop, limit, start);
}
}

Expand Down
1 change: 1 addition & 0 deletions libraries/app/database_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

#include <fc/crypto/hex.hpp>
#include <fc/rpc/api_connection.hpp>
#include <fc/thread/async.hpp>

#include <boost/range/iterator_range.hpp>

Expand Down
2 changes: 0 additions & 2 deletions libraries/app/include/graphene/app/application.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,6 @@ namespace graphene { namespace app {

bool is_plugin_enabled(const string& name) const;

std::shared_ptr<fc::thread> elasticsearch_thread;

private:
void add_available_plugin( std::shared_ptr<abstract_plugin> p );
std::shared_ptr<detail::application_impl> my;
Expand Down
15 changes: 10 additions & 5 deletions libraries/chain/db_block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@

#include <graphene/protocol/fee_schedule.hpp>

#include <fc/asio.hpp>
#include <fc/io/raw.hpp>
#include <fc/thread/parallel.hpp>

Expand Down Expand Up @@ -822,9 +823,9 @@ void database::_precompute_parallel( const Trx* trx, const size_t count, const u
}
}

fc::future<void> database::precompute_parallel( const signed_block& block, const uint32_t skip )const
boost::fibers::future<void> database::precompute_parallel( const signed_block& block, const uint32_t skip )const
{ try {
std::vector<fc::future<void>> workers;
std::vector<boost::fibers::future<void>> workers;
if( !block.transactions.empty() )
{
if( (skip & skip_expensive) == skip_expensive )
Expand All @@ -850,16 +851,20 @@ fc::future<void> database::precompute_parallel( const signed_block& block, const
block.id();

if( workers.empty() )
return fc::future< void >( fc::promise< void >::create( true ) );
{
boost::fibers::promise< void > done;
done.set_value();
return done.get_future();
}

auto first = workers.begin();
auto worker = first;
while( ++worker != workers.end() )
worker->wait();
return *first;
return std::move( *first );
} FC_LOG_AND_RETHROW() }

fc::future<void> database::precompute_parallel( const precomputable_transaction& trx )const
boost::fibers::future<void> database::precompute_parallel( const precomputable_transaction& trx )const
{
return fc::do_parallel([this,&trx] () {
_precompute_parallel( &trx, 1, skip_nothing );
Expand Down
4 changes: 2 additions & 2 deletions libraries/chain/db_management.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ void database::reindex( fc::path data_dir )

size_t total_block_size = _block_id_to_block.total_block_size();
const auto& gpo = get_global_properties();
std::queue< std::tuple< size_t, signed_block, fc::future< void > > > blocks;
std::queue< std::tuple< size_t, signed_block, boost::fibers::future< void > > > blocks;
uint32_t next_block_num = head_block_num() + 1;
uint32_t i = next_block_num;
while( next_block_num <= last_block_num || !blocks.empty() )
Expand All @@ -93,7 +93,7 @@ void database::reindex( fc::path data_dir )
{
if( block->timestamp >= last_block->timestamp - gpo.parameters.maximum_time_until_expiration )
skip &= ~skip_transaction_dupe_check;
blocks.emplace( processed_block_size, std::move(*block), fc::future<void>() );
blocks.emplace( processed_block_size, std::move(*block), boost::fibers::future<void>() );
std::get<2>(blocks.back()) = precompute_parallel( std::get<1>(blocks.back()), skip );
}
else
Expand Down
9 changes: 6 additions & 3 deletions libraries/chain/include/graphene/chain/database.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,12 @@
#include <graphene/db/object_database.hpp>
#include <graphene/db/object.hpp>
#include <graphene/db/simple_index.hpp>
#include <fc/signals.hpp>

#include <fc/signals.hpp>
#include <fc/log/logger.hpp>

#include <boost/fiber/future.hpp>

#include <map>

namespace graphene { namespace chain {
Expand Down Expand Up @@ -453,7 +455,8 @@ namespace graphene { namespace chain {
* @return a future that will resolve to the input block with
* precomputations applied
*/
fc::future<void> precompute_parallel( const signed_block& block, const uint32_t skip = skip_nothing )const;
boost::fibers::future<void> precompute_parallel( const signed_block& block,
const uint32_t skip = skip_nothing )const;

/** Precomputes digests, signatures and operation validations.
* "Expensive" computations may be done in a parallel thread.
Expand All @@ -462,7 +465,7 @@ namespace graphene { namespace chain {
* @return a future that will resolve to the input transaction with
* precomputations applied
*/
fc::future<void> precompute_parallel( const precomputable_transaction& trx )const;
boost::fibers::future<void> precompute_parallel( const precomputable_transaction& trx )const;
private:
template<typename Trx>
void _precompute_parallel( const Trx* trx, const size_t count, const uint32_t skip )const;
Expand Down
4 changes: 2 additions & 2 deletions libraries/db/object_database.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ void object_database::flush()
{
// ilog("Save object_database in ${d}", ("d", _data_dir));
fc::create_directories( _data_dir / "object_database.tmp" / "lock" );
std::vector<fc::future<void>> tasks;
std::vector<boost::fibers::future<void>> tasks;
tasks.reserve(200);
for( uint32_t space = 0; space < _index.size(); ++space )
{
Expand Down Expand Up @@ -109,7 +109,7 @@ void object_database::open(const fc::path& data_dir)
wlog("Ignoring locked object_database");
return;
}
std::vector<fc::future<void>> tasks;
std::vector<boost::fibers::future<void>> tasks;
tasks.reserve(200);
ilog("Opening object database from ${d} ...", ("d", data_dir));
for( uint32_t space = 0; space < _index.size(); ++space )
Expand Down
2 changes: 1 addition & 1 deletion libraries/fc
Submodule fc updated 64 files
+6 −9 CMakeLists.txt
+4 −1 include/fc/api.hpp
+65 −177 include/fc/asio.hpp
+0 −1 include/fc/io/stdio.hpp
+0 −10 include/fc/network/http/websocket.hpp
+1 −5 include/fc/network/tcp_socket.hpp
+1 −0 include/fc/rpc/api_connection.hpp
+9 −11 include/fc/rpc/cli.hpp
+11 −8 include/fc/rpc/state.hpp
+0 −15 include/fc/signals.hpp
+74 −0 include/fc/thread/async.hpp
+100 −0 include/fc/thread/fibers.hpp
+0 −344 include/fc/thread/future.hpp
+0 −109 include/fc/thread/mutex.hpp
+0 −35 include/fc/thread/non_preemptable_scope_check.hpp
+20 −17 include/fc/thread/parallel.hpp
+0 −22 include/fc/thread/priority.hpp
+0 −11 include/fc/thread/scoped_lock.hpp
+0 −35 include/fc/thread/spin_lock.hpp
+0 −35 include/fc/thread/spin_yield_lock.hpp
+0 −171 include/fc/thread/task.hpp
+0 −275 include/fc/thread/thread.hpp
+0 −84 include/fc/thread/thread_specific.hpp
+0 −42 include/fc/thread/unique_lock.hpp
+26 −89 src/asio.cpp
+12 −23 src/crypto/aes.cpp
+3 −160 src/io/iostream.cpp
+167 −0 src/io/stdio.cpp
+5 −7 src/log/appender.cpp
+4 −6 src/log/console_appender.cpp
+53 −45 src/log/file_appender.cpp
+1 −1 src/log/gelf_appender.cpp
+3 −6 src/log/log_message.cpp
+7 −8 src/log/logger.cpp
+173 −164 src/network/http/websocket.cpp
+120 −83 src/network/rate_limiting.cpp
+34 −26 src/network/tcp_socket.cpp
+37 −22 src/network/udp_socket.cpp
+59 −47 src/rpc/cli.cpp
+10 −16 src/rpc/state.cpp
+2 −2 src/rpc/websocket_api.cpp
+0 −256 src/thread/context.hpp
+214 −0 src/thread/fibers.cpp
+0 −140 src/thread/future.cpp
+0 −215 src/thread/mutex.cpp
+0 −20 src/thread/non_preemptable_scope_check.cpp
+121 −106 src/thread/parallel.cpp
+0 −46 src/thread/spin_lock.cpp
+0 −51 src/thread/spin_yield_lock.cpp
+0 −113 src/thread/task.cpp
+0 −530 src/thread/thread.cpp
+0 −835 src/thread/thread_d.hpp
+0 −65 src/thread/thread_specific.cpp
+0 −12 tests/CMakeLists.txt
+31 −18 tests/api_tests.cpp
+3 −5 tests/logging_tests.cpp
+41 −12 tests/network/http/websocket_test.cpp
+7 −5 tests/stacktrace_test.cpp
+108 −82 tests/thread/parallel_tests.cpp
+0 −249 tests/thread/task_cancel.cpp
+22 −27 tests/thread/thread_tests.cpp
+54 −0 tests/thread/worker_thread.hxx
+328 −0 vendor/boost/fiber/asio/detail/yield.hpp
+63 −0 vendor/boost/fiber/asio/yield.hpp
2 changes: 1 addition & 1 deletion libraries/net/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ set(SOURCES node.cpp
add_library( graphene_net ${SOURCES} ${HEADERS} )

target_link_libraries( graphene_net
PUBLIC fc graphene_db graphene_protocol )
PUBLIC fc graphene_db graphene_protocol graphene_utilities )
target_include_directories( graphene_net
PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/include"
PRIVATE "${CMAKE_SOURCE_DIR}/libraries/chain/include"
Expand Down
28 changes: 2 additions & 26 deletions libraries/net/include/graphene/net/node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include <graphene/protocol/types.hpp>

#include <list>
#include <thread>

namespace graphene { namespace net {

Expand Down Expand Up @@ -293,35 +294,10 @@ namespace graphene { namespace net {
void disable_peer_advertising();
fc::variant_object get_call_statistics() const;
private:
std::unique_ptr<detail::node_impl, detail::node_impl_deleter> my;
std::unique_ptr<detail::node_impl> my;
};

class simulated_network : public node
{
public:
~simulated_network();
simulated_network(const std::string& user_agent) : node(user_agent) {}
void listen_to_p2p_network() override {}
void connect_to_p2p_network() override {}
void connect_to_endpoint(const fc::ip::endpoint& ep) override {}

fc::ip::endpoint get_actual_listening_endpoint() const override { return fc::ip::endpoint(); }

void sync_from(const item_id& current_head_block, const std::vector<uint32_t>& hard_fork_block_numbers) override {}
void broadcast(const message& item_to_broadcast) override;
void add_node_delegate(node_delegate* node_delegate_to_add);

virtual uint32_t get_connection_count() const override { return 8; }
private:
struct node_info;
void message_sender(node_info* destination_node);
std::list<node_info*> network_nodes;
};


typedef std::shared_ptr<node> node_ptr;
typedef std::shared_ptr<simulated_network> simulated_network_ptr;

} } // graphene::net

FC_REFLECT(graphene::net::message_propagation_data, (received_time)(validated_time)(originating_peer));
Expand Down
29 changes: 21 additions & 8 deletions libraries/net/include/graphene/net/peer_connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <graphene/net/peer_database.hpp>
#include <graphene/net/message_oriented_connection.hpp>
#include <graphene/net/config.hpp>
#include <graphene/utilities/recurring_task.hpp>

#include <boost/tuple/tuple.hpp>

Expand All @@ -39,10 +40,25 @@

#include <queue>
#include <boost/container/deque.hpp>
#include <fc/thread/future.hpp>

namespace graphene { namespace net
{
namespace graphene { namespace net {
class peer_connection;

namespace detail {

class send_queued_messages_task : public graphene::utilities::recurring_task {
public:
send_queued_messages_task() : _conn(nullptr) {}
explicit send_queued_messages_task( peer_connection& conn ) : _conn(&conn) {}

protected:
virtual void run();

peer_connection* _conn;
};

} // detail

struct firewall_check_state_data
{
node_id_t expected_node_id;
Expand All @@ -58,7 +74,6 @@ namespace graphene { namespace net
node_id_t requesting_peer;
};

class peer_connection;
class peer_connection_delegate
{
public:
Expand All @@ -69,7 +84,6 @@ namespace graphene { namespace net
virtual message get_message_for_item(const item_id& item) = 0;
};

class peer_connection;
typedef std::shared_ptr<peer_connection> peer_connection_ptr;
class peer_connection : public message_oriented_connection_delegate,
public std::enable_shared_from_this<peer_connection>
Expand Down Expand Up @@ -166,7 +180,7 @@ namespace graphene { namespace net

size_t _total_queued_messages_size = 0;
std::queue<std::unique_ptr<queued_message>, std::list<std::unique_ptr<queued_message> > > _queued_messages;
fc::future<void> _send_queued_messages_done;
detail::send_queued_messages_task _send_queued_messages;
public:
fc::time_point connection_initiation_time;
fc::time_point connection_closed_time;
Expand Down Expand Up @@ -260,8 +274,6 @@ namespace graphene { namespace net

uint32_t last_known_fork_block_number = 0;

fc::future<void> accept_or_connect_task_done;

firewall_check_state_data *firewall_check_state = nullptr;
private:
#ifndef NDEBUG
Expand Down Expand Up @@ -313,6 +325,7 @@ namespace graphene { namespace net
void send_queued_messages_task();
void accept_connection_task();
void connect_to_task(const fc::ip::endpoint& remote_endpoint);
friend class detail::send_queued_messages_task;
};
typedef std::shared_ptr<peer_connection> peer_connection_ptr;

Expand Down
Loading