diff --git a/irohad/ametsuchi/CMakeLists.txt b/irohad/ametsuchi/CMakeLists.txt index cd9860493f..a0740ac49e 100644 --- a/irohad/ametsuchi/CMakeLists.txt +++ b/irohad/ametsuchi/CMakeLists.txt @@ -22,6 +22,8 @@ add_library(ametsuchi impl/in_memory_block_storage_factory.cpp impl/flat_file_block_storage.cpp impl/flat_file_block_storage_factory.cpp + reconnection/impl/storage_connection_wrapper.cpp + reconnection/impl/k_times_reconnection_strategy.cpp ) target_link_libraries(ametsuchi @@ -34,6 +36,7 @@ target_link_libraries(ametsuchi shared_model_stateless_validation SOCI::core SOCI::postgresql + tbb ) target_compile_definitions(ametsuchi diff --git a/irohad/ametsuchi/impl/storage_impl.cpp b/irohad/ametsuchi/impl/storage_impl.cpp index 9812260bea..d0f21dfe2f 100644 --- a/irohad/ametsuchi/impl/storage_impl.cpp +++ b/irohad/ametsuchi/impl/storage_impl.cpp @@ -157,7 +157,11 @@ namespace iroha { if (block_is_prepared) { rollbackPrepared(*sql); } - auto block_result = getBlockQuery()->getTopBlock(); + auto block_query = getBlockQuery(); + if (not block_query) { + return expected::makeError("Can't create blockQuery"); + } + auto block_result = (*block_query)->getTopBlock(); return expected::makeValue>( std::make_unique( block_result.match( @@ -181,17 +185,13 @@ namespace iroha { if (not wsv) { return boost::none; } - return boost::make_optional>( - std::make_shared(wsv)); + auto peer_query = std::make_shared(*wsv); + return boost::make_optional>(peer_query); } boost::optional> StorageImpl::createBlockQuery() const { - auto block_query = getBlockQuery(); - if (not block_query) { - return boost::none; - } - return boost::make_optional(block_query); + return getBlockQuery(); } boost::optional> @@ -215,7 +215,7 @@ namespace iroha { log_manager_->getChild("QueryExecutor"))); } - bool StorageImpl::insertBlock( + ReturnWrapperType StorageImpl::insertBlock( std::shared_ptr block) { log_->info("create mutable storage"); auto storageResult = createMutableStorage(); @@ -234,7 +234,7 @@ namespace iroha { return inserted; } - bool StorageImpl::insertBlocks( + ReturnWrapperType StorageImpl::insertBlocks( const std::vector> &blocks) { log_->info("create mutable storage"); @@ -518,29 +518,33 @@ namespace iroha { } } - std::shared_ptr StorageImpl::getWsvQuery() const { + ReturnWrapperType> StorageImpl::getWsvQuery() + const { std::shared_lock lock(drop_mutex); if (not connection_) { log_->info("connection to database is not initialised"); - return nullptr; + return DefaultError; } - return std::make_shared( + auto wsv = std::make_shared( std::make_unique(*connection_), factory_, log_manager_->getChild("WsvQuery")->getLogger()); + return wrapValue>(wsv); } - std::shared_ptr StorageImpl::getBlockQuery() const { + ReturnWrapperType> StorageImpl::getBlockQuery() + const { std::shared_lock lock(drop_mutex); if (not connection_) { log_->info("connection to database is not initialised"); - return nullptr; + return DefaultError; } - return std::make_shared( + auto block_query = std::make_shared( std::make_unique(*connection_), *block_store_, converter_, log_manager_->getChild("PostgresBlockQuery")->getLogger()); + return wrapValue>(block_query); } rxcpp::observable> diff --git a/irohad/ametsuchi/impl/storage_impl.hpp b/irohad/ametsuchi/impl/storage_impl.hpp index 12b9d0170d..37007b7760 100644 --- a/irohad/ametsuchi/impl/storage_impl.hpp +++ b/irohad/ametsuchi/impl/storage_impl.hpp @@ -6,7 +6,7 @@ #ifndef IROHA_STORAGE_IMPL_HPP #define IROHA_STORAGE_IMPL_HPP -#include "ametsuchi/storage.hpp" +#include "ametsuchi/reconnection/unsafe_storage.hpp" #include #include @@ -34,7 +34,7 @@ namespace iroha { std::unique_ptr block_store; }; - class StorageImpl : public Storage { + class StorageImpl : public UnsafeStorage { protected: static expected::Result createDatabaseIfNotExist( const std::string &dbname, @@ -83,7 +83,7 @@ namespace iroha { * @param blocks - block for insertion * @return true if all blocks are inserted */ - bool insertBlock( + ReturnWrapperType insertBlock( std::shared_ptr block) override; /** @@ -91,7 +91,7 @@ namespace iroha { * @param blocks - collection of blocks for insertion * @return true if inserted */ - bool insertBlocks( + ReturnWrapperType insertBlocks( const std::vector> &blocks) override; @@ -107,9 +107,10 @@ namespace iroha { boost::optional> commitPrepared( std::shared_ptr block) override; - std::shared_ptr getWsvQuery() const override; + ReturnWrapperType> getWsvQuery() const override; - std::shared_ptr getBlockQuery() const override; + ReturnWrapperType> getBlockQuery() + const override; rxcpp::observable> on_commit() override; diff --git a/irohad/ametsuchi/reconnection/impl/k_times_reconnection_strategy.cpp b/irohad/ametsuchi/reconnection/impl/k_times_reconnection_strategy.cpp new file mode 100644 index 0000000000..86f0c5f1d8 --- /dev/null +++ b/irohad/ametsuchi/reconnection/impl/k_times_reconnection_strategy.cpp @@ -0,0 +1,43 @@ +/** + * Copyright Soramitsu Co., Ltd. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +#include "ametsuchi/reconnection/impl/k_times_reconnection_strategy.hpp" + +using namespace iroha::ametsuchi; + +KTimesReconnectionStorageStrategy::KTimesReconnectionStorageStrategy( + size_t number_of_recalls) + : number_of_recalls_(number_of_recalls) {} + +bool KTimesReconnectionStorageStrategy::canInvoke(const Tag &tag) { + CollectionType::accessor accessor; + while (true) { + bool is_found = invokes_.find(accessor, tag); + if (is_found) { + // assume the tag is unique and doesn't share between threads + accessor->second++; + } else { + bool is_inserted = invokes_.insert(accessor, tag); + if (is_inserted) { + accessor->second = 1; + } else { + continue; + } + } + break; + } + return accessor->second <= number_of_recalls_; +} + +void KTimesReconnectionStorageStrategy::reset(const Tag &tag) { + // intentionally don't care about status of erasing + invokes_.erase(tag); +} + +KTimesReconnectionStorageStrategy::Tag +KTimesReconnectionStorageStrategy::makeTag(const Tag &prefix) { + uint64_t gotten_value = tag_counter++; + return prefix + std::to_string(gotten_value); +} diff --git a/irohad/ametsuchi/reconnection/impl/k_times_reconnection_strategy.hpp b/irohad/ametsuchi/reconnection/impl/k_times_reconnection_strategy.hpp new file mode 100644 index 0000000000..9971e769b3 --- /dev/null +++ b/irohad/ametsuchi/reconnection/impl/k_times_reconnection_strategy.hpp @@ -0,0 +1,52 @@ +/** + * Copyright Soramitsu Co., Ltd. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +#ifndef IROHA_K_TIMES_RECONNECTION_STRATEGY_HPP +#define IROHA_K_TIMES_RECONNECTION_STRATEGY_HPP + +#include "ametsuchi/reconnection/reconnection_strategy.hpp" + +#include + +#include "tbb/concurrent_hash_map.h" + +namespace iroha { + namespace ametsuchi { + + /** + * Class provides implementation of reconnection strategy based on number of + * attempts. + * + * Note: all methods are thread-safe + */ + class KTimesReconnectionStorageStrategy + : public ReconnectionStorageStrategy { + public: + /** + * @param number_of_recalls - number of failure attempts before + * reconnection + */ + KTimesReconnectionStorageStrategy(size_t number_of_attempts); + + bool canInvoke(const Tag &) override; + + void reset(const Tag &) override; + + Tag makeTag(const Tag &prefix) override; + + private: + /// shortcut for tag collection type + using CollectionType = + tbb::concurrent_hash_map; + CollectionType invokes_; + size_t number_of_recalls_; + + /// counter for making uniqueness tags + std::atomic tag_counter{0u}; + }; + } // namespace ametsuchi +} // namespace iroha + +#endif // IROHA_K_TIMES_RECONNECTION_STRATEGY_HPP diff --git a/irohad/ametsuchi/reconnection/impl/storage_connection_wrapper.cpp b/irohad/ametsuchi/reconnection/impl/storage_connection_wrapper.cpp new file mode 100644 index 0000000000..02f7ce6956 --- /dev/null +++ b/irohad/ametsuchi/reconnection/impl/storage_connection_wrapper.cpp @@ -0,0 +1,168 @@ +/** + * Copyright Soramitsu Co., Ltd. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +#include "ametsuchi/reconnection/storage_connection_wrapper.hpp" + +#include +#include + +#include "ametsuchi/mutable_storage.hpp" +#include "ametsuchi/temporary_wsv.hpp" + +using namespace iroha::ametsuchi; + +template +ReturnValue StorageConnectionWrapper::reconnectionLoop( + Invoker function_call, + ReturnValue failure_value, + const std::string &tag) const { + while (recall_strategy_->canInvoke(tag)) { + { + auto local_ref_to_storage = unsafe_storage_; + if (local_ref_to_storage) { + should_initialize_.store(true); + auto returned_value = function_call(local_ref_to_storage); + if (returned_value) { + recall_strategy_->reset(tag); + return *returned_value; + } + } + } + std::unique_lock guard(lock_); + // if statement prevents double initialization + if (not unsafe_storage_ or should_initialize_.load()) { + unsafe_storage_ = storage_creator_(); + } + recall_strategy_->reset(tag); + should_initialize_.store(false); + } + recall_strategy_->reset(tag); + return failure_value; +} + +StorageConnectionWrapper::StorageConnectionWrapper( + std::function()> storage_creator, + std::shared_ptr recall_strategy) + : storage_creator_(std::move(storage_creator)), + unsafe_storage_(storage_creator_()), + recall_strategy_(std::move(recall_strategy)) {} + +// ------------------------- | Storage | --------------------------------- + +std::shared_ptr StorageConnectionWrapper::getWsvQuery() const { + auto call = [](auto &storage) { return storage->getWsvQuery(); }; + using ReturnType = std::remove_reference_t; + return reconnectionLoop( + call, nullptr, recall_strategy_->makeTag("getWsvQuery")); +} + +std::shared_ptr StorageConnectionWrapper::getBlockQuery() const { + auto call = [](auto &storage) { return storage->getBlockQuery(); }; + using ReturnType = std::remove_reference_t; + return reconnectionLoop( + call, nullptr, recall_strategy_->makeTag("getBlockQuery")); +} + +bool StorageConnectionWrapper::insertBlock( + std::shared_ptr block) { + auto call = [&block](auto &storage) { return storage->insertBlock(block); }; + return reconnectionLoop( + call, false, recall_strategy_->makeTag("getBlockQuery")); +} + +bool StorageConnectionWrapper::insertBlocks( + const std::vector> + &blocks) { + auto call = [&blocks](auto &storage) { + return storage->insertBlocks(blocks); + }; + return reconnectionLoop( + call, false, recall_strategy_->makeTag("insertBlocks")); +} + +rxcpp::observable> +StorageConnectionWrapper::on_commit() { + return unsafe_storage_->on_commit(); +} + +void StorageConnectionWrapper::reset() { + auto storage_copy = unsafe_storage_; + storage_copy->reset(); +} + +void StorageConnectionWrapper::dropStorage() { + auto storage_copy = unsafe_storage_; + storage_copy->dropStorage(); +} + +void StorageConnectionWrapper::freeConnections() { + auto storage_copy = unsafe_storage_; + storage_copy->freeConnections(); +} + +// ------------------------- | TemporaryFactory | ------------------------ + +iroha::expected::Result, std::string> +StorageConnectionWrapper::createTemporaryWsv() { + // // TODO: 2019-04-15 @muratovv this and further methods are not safe. + // Methods must be reworked with reconnectionLoop. + auto storage_copy = unsafe_storage_; + return storage_copy->createTemporaryWsv(); +} + +void StorageConnectionWrapper::prepareBlock(std::unique_ptr wsv) { + auto storage_copy = unsafe_storage_; + return storage_copy->prepareBlock(std::move(wsv)); +} + +// ------------------------- | MutableFactory | -------------------------- + +iroha::expected::Result, std::string> +StorageConnectionWrapper::createMutableStorage() { + auto storage_copy = unsafe_storage_; + return storage_copy->createMutableStorage(); +} + +boost::optional> +StorageConnectionWrapper::commit( + std::unique_ptr mutable_storage) { + auto storage_copy = unsafe_storage_; + return storage_copy->commit(std::move(mutable_storage)); +} + +boost::optional> +StorageConnectionWrapper::commitPrepared( + std::shared_ptr block) { + auto storage_copy = unsafe_storage_; + return storage_copy->commitPrepared(block); +} + +// ------------------------- | PeerQueryFactory | ------------------------ + +boost::optional> +StorageConnectionWrapper::createPeerQuery() const { + auto storage_copy = unsafe_storage_; + return storage_copy->createPeerQuery(); +} + +// ------------------------- | BlockQueryFactory | ----------------------- + +boost::optional> +StorageConnectionWrapper::createBlockQuery() const { + auto storage_copy = unsafe_storage_; + return storage_copy->createBlockQuery(); +} + +// ------------------------- | QueryExecutorFactory | -------------------- + +boost::optional> +StorageConnectionWrapper::createQueryExecutor( + std::shared_ptr pending_txs_storage, + std::shared_ptr + response_factory) const { + auto storage_copy = unsafe_storage_; + return storage_copy->createQueryExecutor(pending_txs_storage, + response_factory); +} diff --git a/irohad/ametsuchi/reconnection/reconnection_strategy.hpp b/irohad/ametsuchi/reconnection/reconnection_strategy.hpp new file mode 100644 index 0000000000..e364d5c406 --- /dev/null +++ b/irohad/ametsuchi/reconnection/reconnection_strategy.hpp @@ -0,0 +1,42 @@ +/** + * Copyright Soramitsu Co., Ltd. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +#ifndef IROHA_RECONNECTION_STRATEGY_HPP +#define IROHA_RECONNECTION_STRATEGY_HPP + +#include + +namespace iroha { + namespace ametsuchi { + /** + * The interface provides a strategy for reconnection to data base. + */ + class ReconnectionStorageStrategy { + public: + /// Identifier type of unique invocation + using Tag = std::string; + + /** + * check that invocation by tag will be appropriate + */ + virtual bool canInvoke(const Tag &) = 0; + + /** + * Reset invocations by tag + */ + virtual void reset(const Tag &) = 0; + + /** + * Create a tag based on passed + * @param passed - initial tag + */ + virtual Tag makeTag(const Tag &passed) = 0; + + virtual ~ReconnectionStorageStrategy() = default; + }; + } // namespace ametsuchi +} // namespace iroha + +#endif //IROHA_RECONNECTION_STRATEGY_HPP diff --git a/irohad/ametsuchi/reconnection/return_wrapper.hpp b/irohad/ametsuchi/reconnection/return_wrapper.hpp new file mode 100644 index 0000000000..e98791c89f --- /dev/null +++ b/irohad/ametsuchi/reconnection/return_wrapper.hpp @@ -0,0 +1,30 @@ +/** + * Copyright Soramitsu Co., Ltd. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +#ifndef IROHA_RETURN_WRAPPER_HPP +#define IROHA_RETURN_WRAPPER_HPP + +#include + +namespace iroha { + namespace ametsuchi { + + /** + * Common wrapper type for raw storage methods + */ + template + using ReturnWrapperType = boost::optional; + + /// default value of wrapper + static boost::none_t DefaultError = boost::none; + + template + inline boost::optional wrapValue(RV &&rv) { + return boost::make_optional(rv); + } + + } // namespace ametsuchi +} // namespace iroha +#endif // IROHA_RETURN_WRAPPER_HPP diff --git a/irohad/ametsuchi/reconnection/storage_connection_wrapper.hpp b/irohad/ametsuchi/reconnection/storage_connection_wrapper.hpp new file mode 100644 index 0000000000..6527d29637 --- /dev/null +++ b/irohad/ametsuchi/reconnection/storage_connection_wrapper.hpp @@ -0,0 +1,107 @@ +/** + * Copyright Soramitsu Co., Ltd. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +#ifndef IROHA_STORAGE_CONNECTION_WRAPPER_HPP +#define IROHA_STORAGE_CONNECTION_WRAPPER_HPP + +#include +#include +#include + +#include "ametsuchi/reconnection/reconnection_strategy.hpp" +#include "ametsuchi/reconnection/unsafe_storage.hpp" +#include "ametsuchi/storage.hpp" + +namespace iroha { + namespace ametsuchi { + class StorageConnectionWrapper : public Storage { + public: + StorageConnectionWrapper( + std::function()> storage_creator, + std::shared_ptr recall_strategy); + + // ------------------------- | Storage | --------------------------------- + + std::shared_ptr getWsvQuery() const override; + + std::shared_ptr getBlockQuery() const override; + + bool insertBlock( + std::shared_ptr block) override; + + bool insertBlocks( + const std::vector> + &blocks) override; + + rxcpp::observable> + on_commit() override; + + void reset() override; + + void dropStorage() override; + + void freeConnections() override; + + // ------------------------- | TemporaryFactory | ------------------------ + + expected::Result, std::string> + createTemporaryWsv() override; + + void prepareBlock(std::unique_ptr wsv) override; + + // ------------------------- | MutableFactory | -------------------------- + + expected::Result, std::string> + createMutableStorage() override; + + boost::optional> commit( + std::unique_ptr mutable_storage) override; + + boost::optional> commitPrepared( + std::shared_ptr block) override; + + // ------------------------- | PeerQueryFactory | ------------------------ + + boost::optional> createPeerQuery() + const override; + + // ------------------------- | BlockQueryFactory | ----------------------- + + boost::optional> createBlockQuery() + const override; + + // ------------------------- | QueryExecutorFactory | -------------------- + + boost::optional> createQueryExecutor( + std::shared_ptr pending_txs_storage, + std::shared_ptr + response_factory) const override; + + private: + /** + * Method provides a reconnection wrapper for every storage invocation + * @tparam ReturnValue - return type of original call + * @tparam Invoker - type of lambda which will invoke db method + * @param function_call - function which performs invocation of the target + * db method + * @param failure_value - value which return on fail of invocation + * @param tag - identifier of call + * @return original value from storage + */ + template + ReturnValue reconnectionLoop(Invoker function_call, + ReturnValue failure_value, + const std::string &tag) const; + + std::function()> storage_creator_; + mutable std::shared_ptr unsafe_storage_; + mutable std::shared_ptr recall_strategy_; + mutable std::shared_timed_mutex lock_; + mutable std::atomic should_initialize_{false}; + }; + } // namespace ametsuchi +} // namespace iroha + +#endif // IROHA_STORAGE_CONNECTION_WRAPPER_HPP diff --git a/irohad/ametsuchi/reconnection/unsafe_storage.hpp b/irohad/ametsuchi/reconnection/unsafe_storage.hpp new file mode 100644 index 0000000000..4b6f8a8566 --- /dev/null +++ b/irohad/ametsuchi/reconnection/unsafe_storage.hpp @@ -0,0 +1,96 @@ +/** + * Copyright Soramitsu Co., Ltd. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +#ifndef IROHA_UNSAFE_STORAGE_HPP +#define IROHA_UNSAFE_STORAGE_HPP + +#include + +#include +#include "ametsuchi/block_query_factory.hpp" +#include "ametsuchi/mutable_factory.hpp" +#include "ametsuchi/peer_query_factory.hpp" +#include "ametsuchi/query_executor_factory.hpp" +#include "ametsuchi/reconnection/return_wrapper.hpp" +#include "ametsuchi/temporary_factory.hpp" +#include "common/result.hpp" + +namespace shared_model { + namespace interface { + class Block; + } +} // namespace shared_model + +namespace iroha { + + namespace ametsuchi { + + class BlockQuery; + class WsvQuery; + + /** + * Storage interface, which allows queries on current committed state, and + * creation of state which can be mutated with blocks and transactions. + * Note: every call uses only one try to connect to data base. If the + * connection lost method will return error option of ReturnTypeWrapper + */ + class UnsafeStorage : public TemporaryFactory, + public MutableFactory, + public PeerQueryFactory, + public BlockQueryFactory, + public QueryExecutorFactory { + public: + virtual ReturnWrapperType> getWsvQuery() + const = 0; + + virtual ReturnWrapperType> getBlockQuery() + const = 0; + + /** + * Raw insertion of blocks without validation + * @param block - block for insertion + * @return true if inserted + */ + virtual ReturnWrapperType insertBlock( + std::shared_ptr block) = 0; + + /** + * Raw insertion of blocks without validation + * @param blocks - collection of blocks for insertion + * @return true if inserted + */ + virtual ReturnWrapperType insertBlocks( + const std::vector> + &blocks) = 0; + + /** + * method called when block is written to the storage + * @return observable with the Block committed + */ + virtual rxcpp::observable< + std::shared_ptr> + on_commit() = 0; + + /** + * Remove all records from the tables and remove all the blocks + */ + virtual void reset() = 0; + + /** + * Remove all information from ledger + * Tables and the database will be removed too + */ + virtual void dropStorage() = 0; + + virtual void freeConnections() = 0; + + virtual ~UnsafeStorage() = default; + }; + + } // namespace ametsuchi + +} // namespace iroha + +#endif // IROHA_UNSAFE_STORAGE_HPP diff --git a/irohad/ametsuchi/temporary_factory.hpp b/irohad/ametsuchi/temporary_factory.hpp index 1b91f3bfd8..7f44d3fba4 100644 --- a/irohad/ametsuchi/temporary_factory.hpp +++ b/irohad/ametsuchi/temporary_factory.hpp @@ -28,7 +28,7 @@ namespace iroha { /** * Prepare state which was accumulated in temporary WSV. - * After preparation, this state is not visible until commited. + * After preparation, this state is not visible until it isn't committed. * * @param wsv - state which will be prepared. */ diff --git a/irohad/main/application.cpp b/irohad/main/application.cpp index 2cc6ee7ab9..ff06e573e4 100644 --- a/irohad/main/application.cpp +++ b/irohad/main/application.cpp @@ -11,6 +11,9 @@ #include "ametsuchi/impl/storage_impl.hpp" #include "ametsuchi/impl/tx_presence_cache_impl.hpp" #include "ametsuchi/impl/wsv_restorer_impl.hpp" +#include "ametsuchi/reconnection/impl/k_times_reconnection_strategy.hpp" +#include "ametsuchi/reconnection/storage_connection_wrapper.hpp" +#include "ametsuchi/reconnection/unsafe_storage.hpp" #include "backend/protobuf/common_objects/proto_common_objects_factory.hpp" #include "backend/protobuf/proto_block_json_converter.hpp" #include "backend/protobuf/proto_permission_to_string.hpp" @@ -75,6 +78,7 @@ static constexpr iroha::consensus::yac::ConsistencyModel */ Irohad::Irohad(const std::string &block_store_dir, const std::string &pg_conn, + size_t number_of_attempts_to_reconnect, const std::string &listen_ip, size_t torii_port, size_t internal_port, @@ -97,10 +101,11 @@ Irohad::Irohad(const std::string &block_store_dir, proposal_delay_(proposal_delay), vote_delay_(vote_delay), is_mst_supported_(opt_mst_gossip_params), - mst_expiration_time_(mst_expiration_time), + number_of_attempts_to_reconnect_(number_of_attempts_to_reconnect), max_rounds_delay_(max_rounds_delay), stale_stream_max_rounds_(stale_stream_max_rounds), opt_mst_gossip_params_(opt_mst_gossip_params), + mst_expiration_time_(mst_expiration_time), keypair(keypair), ordering_init(logger_manager->getLogger()), log_manager_(std::move(logger_manager)), @@ -156,35 +161,51 @@ void Irohad::dropStorage() { * Initializing iroha daemon storage */ void Irohad::initStorage() { - common_objects_factory_ = - std::make_shared>(); - auto perm_converter = - std::make_shared(); - auto block_converter = - std::make_shared(); - auto block_storage_factory = std::make_unique( - []() { - return (boost::filesystem::temp_directory_path() - / boost::filesystem::unique_path()) - .string(); - }, - block_converter, - log_manager_); - auto storageResult = StorageImpl::create(block_store_dir_, - pg_conn_, - common_objects_factory_, - std::move(block_converter), - perm_converter, - std::move(block_storage_factory), - log_manager_->getChild("Storage")); - storageResult.match( - [&](expected::Value> &_storage) { - storage = _storage.value; - }, - [&](expected::Error &error) { log_->error(error.error); }); - - log_->info("[Init] => storage ({})", logger::logBool(storage)); + using UnsafeStorageType = std::shared_ptr; + auto storage_creation = [this]() -> UnsafeStorageType { + common_objects_factory_ = + std::make_shared>(); + auto perm_converter = + std::make_shared(); + auto block_converter = + std::make_shared(); + auto block_storage_factory = std::make_unique( + []() { + return (boost::filesystem::temp_directory_path() + / boost::filesystem::unique_path()) + .string(); + }, + block_converter, + log_manager_); + auto storageResult = StorageImpl::create(block_store_dir_, + pg_conn_, + common_objects_factory_, + std::move(block_converter), + perm_converter, + std::move(block_storage_factory), + log_manager_->getChild("Storage")); + return storageResult.match( + [&](expected::Value> &_storage) + -> UnsafeStorageType { + log_->info("[Init] => storage initialized"); + return _storage.value; + }, + [&](expected::Error &error) -> UnsafeStorageType { + log_->error( + " [Init] => storage initialization has failed. The reason: {}", + error.error); + return nullptr; + }); + }; + + std::shared_ptr + reconnection_strategy_ = + std::make_shared( + number_of_attempts_to_reconnect_); + + storage = std::make_shared( + storage_creation, reconnection_strategy_); } bool Irohad::restoreWsv() { diff --git a/irohad/main/application.hpp b/irohad/main/application.hpp index 77c5bf3cb7..fc517ac473 100644 --- a/irohad/main/application.hpp +++ b/irohad/main/application.hpp @@ -72,6 +72,8 @@ class Irohad { * Constructor that initializes common iroha pipeline * @param block_store_dir - folder where blocks will be stored * @param pg_conn - initialization string for postgre + * @param number_of_attempts_to_reconnect - number of tries for reconnection + * to data base * @param listen_ip - ip address for opening ports (internal & torii) * @param torii_port - port for torii binding * @param internal_port - port for internal communication - ordering service, @@ -94,6 +96,7 @@ class Irohad { */ Irohad(const std::string &block_store_dir, const std::string &pg_conn, + size_t number_of_attempts_to_reconnect, const std::string &listen_ip, size_t torii_port, size_t internal_port, @@ -193,6 +196,7 @@ class Irohad { size_t stale_stream_max_rounds_; boost::optional opt_mst_gossip_params_; + size_t number_of_attempts_to_reconnect_; // ------------------------| internal dependencies |------------------------- public: diff --git a/irohad/main/irohad.cpp b/irohad/main/irohad.cpp index fd71352c21..4c119a6993 100644 --- a/irohad/main/irohad.cpp +++ b/irohad/main/irohad.cpp @@ -158,6 +158,7 @@ int main(int argc, char *argv[]) { Irohad irohad( config.block_store_path, config.pg_opt, + 3, kListenIp, // TODO(mboldyrev) 17/10/2018: add a parameter in // config file and/or command-line arguments? config.torii_port, diff --git a/test/framework/integration_framework/test_irohad.hpp b/test/framework/integration_framework/test_irohad.hpp index 5c16307793..869f9ed4fe 100644 --- a/test/framework/integration_framework/test_irohad.hpp +++ b/test/framework/integration_framework/test_irohad.hpp @@ -34,6 +34,7 @@ namespace integration_framework { &opt_mst_gossip_params = boost::none) : Irohad(block_store_dir, pg_conn, + 1, listen_ip, torii_port, internal_port, diff --git a/test/integration/acceptance/query_permission_common_tests.cpp b/test/integration/acceptance/query_permission_common_tests.cpp index 4758a20b6d..3f6b08e81d 100644 --- a/test/integration/acceptance/query_permission_common_tests.cpp +++ b/test/integration/acceptance/query_permission_common_tests.cpp @@ -118,7 +118,7 @@ TYPED_TEST(QueryPermissionFixture, OwnWithPermissionForDomain) { TYPED_TEST(QueryPermissionFixture, OwnWithPermissionForAll) { this->impl_.prepareState(*this, {this->impl_.kPermissionToQueryEveryone}) .sendQuery(this->impl_.makeQuery(*this, kUserId, kUserId, kUserKeypair), - this->impl_.getGeneralResponseChecker()); + this->impl_.getGeneralResponseChecker()).done(); } /** diff --git a/test/module/irohad/ametsuchi/CMakeLists.txt b/test/module/irohad/ametsuchi/CMakeLists.txt index 63e12c49fc..2880b8ded6 100644 --- a/test/module/irohad/ametsuchi/CMakeLists.txt +++ b/test/module/irohad/ametsuchi/CMakeLists.txt @@ -89,3 +89,8 @@ target_link_libraries(ametsuchi_fixture INTERFACE SOCI::postgresql test_logger ) + +addtest(k_times_reconnection_strategy_test k_times_reconnection_strategy_test.cpp) +target_link_libraries(k_times_reconnection_strategy_test + ametsuchi + ) diff --git a/test/module/irohad/ametsuchi/ametsuchi_fixture.hpp b/test/module/irohad/ametsuchi/ametsuchi_fixture.hpp index dc84f15687..1bc658a468 100644 --- a/test/module/irohad/ametsuchi/ametsuchi_fixture.hpp +++ b/test/module/irohad/ametsuchi/ametsuchi_fixture.hpp @@ -14,6 +14,9 @@ #include #include "ametsuchi/impl/in_memory_block_storage_factory.hpp" #include "ametsuchi/impl/storage_impl.hpp" +#include "ametsuchi/reconnection/impl/k_times_reconnection_strategy.hpp" +#include "ametsuchi/reconnection/storage_connection_wrapper.hpp" +#include "ametsuchi/storage.hpp" #include "backend/protobuf/common_objects/proto_common_objects_factory.hpp" #include "backend/protobuf/proto_block_json_converter.hpp" #include "backend/protobuf/proto_permission_to_string.hpp" @@ -52,11 +55,28 @@ namespace iroha { perm_converter_, std::move(block_storage_factory), getTestLoggerManager()->getChild("Storage")) - .match([&](iroha::expected::Value> - &_storage) { storage = _storage.value; }, - [](iroha::expected::Error &error) { - FAIL() << "StorageImpl: " << error.error; - }); + .match( + [&](iroha::expected::Value> + &_storage) { + auto storage_creation = [s = _storage.value]() { + getTestLoggerManager() + ->getChild("Storage") + ->getLogger() + ->info("Initialized"); + return s; + }; + std::shared_ptr + recall_strategy_ = + std::make_shared( + 1); + + storage = std::make_shared< + iroha::ametsuchi::StorageConnectionWrapper>( + storage_creation, recall_strategy_); + }, + [](iroha::expected::Error &error) { + FAIL() << "StorageImpl: " << error.error; + }); sql = std::make_shared(*soci::factory_postgresql(), pgopt_); sql_query = @@ -87,7 +107,7 @@ namespace iroha { * static storage */ static logger::LoggerPtr storage_logger_; - static std::shared_ptr storage; + static std::shared_ptr storage; static std::unique_ptr sql_query; static std::shared_ptr @@ -213,7 +233,7 @@ CREATE TABLE IF NOT EXISTS index_by_id_height_asset ( // hold the storage static logger while the static storage is alive logger::LoggerPtr AmetsuchiTest::storage_logger_ = getTestLoggerManager()->getChild("Storage")->getLogger(); - std::shared_ptr AmetsuchiTest::storage = nullptr; + std::shared_ptr AmetsuchiTest::storage = nullptr; std::unique_ptr AmetsuchiTest::sql_query = nullptr; } // namespace ametsuchi diff --git a/test/module/irohad/ametsuchi/k_times_reconnection_strategy_test.cpp b/test/module/irohad/ametsuchi/k_times_reconnection_strategy_test.cpp new file mode 100644 index 0000000000..6c7e691566 --- /dev/null +++ b/test/module/irohad/ametsuchi/k_times_reconnection_strategy_test.cpp @@ -0,0 +1,61 @@ +/** + * Copyright Soramitsu Co., Ltd. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +#include + +#include + +#include "ametsuchi/reconnection/impl/k_times_reconnection_strategy.hpp" + +using namespace iroha::ametsuchi; + +class KTimesReconnectionStrategyTest : public testing::Test { + public: + void SetUp() override { + strategy_ = std::make_shared(N_); + } + + std::shared_ptr strategy_; + const size_t N_{10}; + const KTimesReconnectionStorageStrategy::Tag tag = "tag"; +}; + +/** + * @given initialized reconnection strategy with N limit + * @when call canInvoke N times + * @and one more time + * @then check first N times returns true + * @and last time return false + */ +TEST_F(KTimesReconnectionStrategyTest, NormalCase) { + for (size_t i = 0; i < N_; i++) { + ASSERT_TRUE(strategy_->canInvoke(tag)); + } + ASSERT_FALSE(strategy_->canInvoke(tag)); +} + +/** + * @given initialized reconnection strategy with N limit + * @when call canInvoke one time, + * @and call reset + * @and call canInvoke N times + * @then all canInvoke return true + */ +TEST_F(KTimesReconnectionStrategyTest, ResetCheck) { + strategy_->canInvoke(tag); + strategy_->reset(tag); + for (size_t i = 0; i < N_; i++) { + ASSERT_TRUE(strategy_->canInvoke(tag)); + } +} + +/** + * @given initialized reconnection strategy + * @when call makeTag twice + * @then returned tags are different + */ +TEST_F(KTimesReconnectionStrategyTest, TagConsistency) { + ASSERT_TRUE(strategy_->makeTag(tag) != strategy_->makeTag(tag)); +}