diff --git a/irohad/main/CMakeLists.txt b/irohad/main/CMakeLists.txt index 62e5b2dbba..82d0e8deeb 100644 --- a/irohad/main/CMakeLists.txt +++ b/irohad/main/CMakeLists.txt @@ -53,6 +53,7 @@ target_link_libraries(application torii_service pending_txs_storage common + mst_propagation_to_pcs ) add_executable(irohad irohad.cpp) diff --git a/irohad/main/application.cpp b/irohad/main/application.cpp index a63cbd9265..95db8277a0 100644 --- a/irohad/main/application.cpp +++ b/irohad/main/application.cpp @@ -30,6 +30,7 @@ #include "multi_sig_transactions/mst_processor_impl.hpp" #include "multi_sig_transactions/mst_propagation_strategy_stub.hpp" #include "multi_sig_transactions/mst_time_provider_impl.hpp" +#include "multi_sig_transactions/propagation_to_pcs.hpp" #include "multi_sig_transactions/storage/mst_storage_impl.hpp" #include "multi_sig_transactions/transport/mst_transport_grpc.hpp" #include "multi_sig_transactions/transport/mst_transport_stub.hpp" @@ -39,6 +40,7 @@ #include "ordering/impl/on_demand_ordering_gate.hpp" #include "pending_txs_storage/impl/pending_txs_storage_impl.hpp" #include "simulator/impl/simulator.hpp" +#include "storage_shared_limit/batch_storage_limit_by_txs.hpp" #include "synchronizer/impl/synchronizer_impl.hpp" #include "torii/impl/command_service_impl.hpp" #include "torii/impl/command_service_transport_grpc.hpp" @@ -115,6 +117,7 @@ Irohad::Irohad(const std::string &block_store_dir, Irohad::~Irohad() { consensus_gate_events_subscription.unsubscribe(); + mst_to_pcs_propagation_subscription.unsubscribe(); } /** @@ -522,9 +525,11 @@ void Irohad::initMstProcessor() { log_manager_->getChild("MultiSignatureTransactions"); auto mst_state_logger = mst_logger_manager->getChild("State")->getLogger(); auto mst_completer = std::make_shared(mst_expiration_time_); + auto mst_storage_limit = + std::make_shared(mst_state_txs_limit_); auto mst_storage = std::make_shared( mst_completer, - mst_state_txs_limit_, + mst_storage_limit, mst_state_logger, mst_logger_manager->getChild("Storage")->getLogger()); std::shared_ptr mst_propagation; @@ -536,7 +541,7 @@ void Irohad::initMstProcessor() { transaction_batch_factory_, persistent_cache, mst_completer, - mst_state_txs_limit_, + mst_storage_limit, keypair.publicKey(), std::move(mst_state_logger), mst_logger_manager->getChild("Transport")->getLogger()); @@ -556,6 +561,17 @@ void Irohad::initMstProcessor() { mst_logger_manager->getChild("Processor")->getLogger()); mst_processor = fair_mst_processor; mst_transport->subscribe(fair_mst_processor); + + mst_to_pcs_propagation_subscription = + mst_processor->onPreparedBatches().subscribe( + [propagator = std::make_shared( + pcs, + mst_storage_limit, + ordering_gate->onReadyToAcceptTxs(), + mst_logger_manager->getChild("PropagationToPcs")->getLogger())]( + auto batch) { + propagator->notifyCompletedBatch(std::move(batch)); + }); log_->info("[Init] => MST processor"); } diff --git a/irohad/main/application.hpp b/irohad/main/application.hpp index 30031631cf..44a9b28207 100644 --- a/irohad/main/application.hpp +++ b/irohad/main/application.hpp @@ -297,6 +297,7 @@ class Irohad { // mst std::shared_ptr mst_transport; std::shared_ptr mst_processor; + rxcpp::composite_subscription mst_to_pcs_propagation_subscription; // pending transactions storage std::shared_ptr pending_txs_storage_; diff --git a/irohad/multi_sig_transactions/CMakeLists.txt b/irohad/multi_sig_transactions/CMakeLists.txt index 6a3442ce60..d4ee207829 100644 --- a/irohad/multi_sig_transactions/CMakeLists.txt +++ b/irohad/multi_sig_transactions/CMakeLists.txt @@ -28,3 +28,12 @@ add_library(mst_hash target_link_libraries(mst_hash shared_model_interfaces ) + +add_library(mst_propagation_to_pcs + impl/propagation_to_pcs.cpp + ) + +target_link_libraries(mst_propagation_to_pcs + rxcpp + logger + ) diff --git a/irohad/multi_sig_transactions/impl/mst_processor.cpp b/irohad/multi_sig_transactions/impl/mst_processor.cpp index 89ef876dba..551eea20df 100644 --- a/irohad/multi_sig_transactions/impl/mst_processor.cpp +++ b/irohad/multi_sig_transactions/impl/mst_processor.cpp @@ -13,12 +13,13 @@ namespace iroha { return this->propagateBatchImpl(batch); } - rxcpp::observable> MstProcessor::onStateUpdate() - const { + rxcpp::observable> + MstProcessor::onStateUpdate() const { return this->onStateUpdateImpl(); } - rxcpp::observable MstProcessor::onPreparedBatches() const { + rxcpp::observable> + MstProcessor::onPreparedBatches() const { return this->onPreparedBatchesImpl(); } diff --git a/irohad/multi_sig_transactions/impl/mst_processor_impl.cpp b/irohad/multi_sig_transactions/impl/mst_processor_impl.cpp index 8214e35067..b5e8749c31 100644 --- a/irohad/multi_sig_transactions/impl/mst_processor_impl.cpp +++ b/irohad/multi_sig_transactions/impl/mst_processor_impl.cpp @@ -35,8 +35,8 @@ namespace iroha { auto FairMstProcessor::propagateBatchImpl(const iroha::DataType &batch) -> decltype(propagateBatch(batch)) { auto state_update = storage_->updateOwnState(batch); - completedBatchesNotify(*state_update.completed_state_); - updatedBatchesNotify(*state_update.updated_state_); + completedBatchesNotify(state_update.completed_state_); + updatedBatchesNotify(state_update.updated_state_); expiredBatchesNotify( storage_->extractExpiredTransactions(time_provider_->getCurrentTime())); return state_update.updated_state_->contains(batch); @@ -58,18 +58,18 @@ namespace iroha { } // TODO [IR-1687] Akvinikym 10.09.18: three methods below should be one - void FairMstProcessor::completedBatchesNotify(ConstRefState state) const { - if (not state.isEmpty()) { - state.iterateBatches([this](const auto &batch) { - batches_subject_.get_subscriber().on_next(batch); - }); - } + void FairMstProcessor::completedBatchesNotify( + std::vector> completed) const { + std::for_each( + completed.begin(), completed.end(), [this](const auto &batch) { + batches_subject_.get_subscriber().on_next(batch); + }); } - void FairMstProcessor::updatedBatchesNotify(ConstRefState state) const { - if (not state.isEmpty()) { - state_subject_.get_subscriber().on_next( - std::make_shared(state)); + void FairMstProcessor::updatedBatchesNotify( + std::shared_ptr state) const { + if (not state->isEmpty()) { + state_subject_.get_subscriber().on_next(state); } } @@ -88,20 +88,20 @@ namespace iroha { // -------------------| MstTransportNotification override |------------------- void FairMstProcessor::onNewState(const shared_model::crypto::PublicKey &from, - ConstRefState new_state) { + MstState new_state) { log_->info("Applying new state"); auto current_time = time_provider_->getCurrentTime(); auto state_update = storage_->apply(from, new_state); // updated batches - updatedBatchesNotify(*state_update.updated_state_); + updatedBatchesNotify(state_update.updated_state_); log_->info("New state has {} batches and {} transactions.", state_update.updated_state_->batchesQuantity(), state_update.updated_state_->transactionsQuantity()); // completed batches - completedBatchesNotify(*state_update.completed_state_); + completedBatchesNotify(state_update.completed_state_); // expired batches expiredBatchesNotify(storage_->getDiffState(from, current_time)); diff --git a/irohad/multi_sig_transactions/impl/propagation_to_pcs.cpp b/irohad/multi_sig_transactions/impl/propagation_to_pcs.cpp new file mode 100644 index 0000000000..569236cbf9 --- /dev/null +++ b/irohad/multi_sig_transactions/impl/propagation_to_pcs.cpp @@ -0,0 +1,69 @@ +/** + * Copyright Soramitsu Co., Ltd. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +#include "multi_sig_transactions/propagation_to_pcs.hpp" + +#include + +#include "interfaces/iroha_internal/transaction_batch.hpp" +#include "logger/logger.hpp" + +using namespace iroha; + +MstToPcsPropagation::MstToPcsPropagation( + std::shared_ptr pcs, + std::shared_ptr> storage_limit, + rxcpp::observable propagation_available, + logger::LoggerPtr log) + : log_(std::move(log)), + pcs_(pcs), + pending_batches_(std::move(storage_limit), + std::make_unique()), + propagation_available_subscription_( + propagation_available.subscribe([this, pcs](size_t available_txs) { + pending_batches_.extract( + [pcs, &available_txs](InternalStorage &storage) { + std::vector extracted; + extracted.reserve(storage.pending_batches.size()); + auto it = storage.pending_batches.begin(); + while (it != storage.pending_batches.end()) { + const auto txs_in_batch = (*it)->transactions().size(); + if (txs_in_batch <= available_txs + and pcs->propagate_batch(*it)) { + available_txs -= txs_in_batch; + extracted.emplace_back(std::move(*it)); + it = storage.pending_batches.erase(it); + } else { + ++it; + } + } + return extracted; + }); + })) {} + +MstToPcsPropagation::~MstToPcsPropagation() { + propagation_available_subscription_.unsubscribe(); +} + +void MstToPcsPropagation::notifyCompletedBatch( + std::shared_ptr moved_batch) { + if (not pcs_->propagate_batch(moved_batch->get())) { + if (not pending_batches_.insert(std::move(moved_batch))) { + log_->critical( + "Dropped a completed MST batch because no place left in storage: {}", + moved_batch->get()); + assert(false); + } + } +} + +size_t MstToPcsPropagation::pendingBatchesQuantity() const { + return pending_batches_.itemsQuantity(); +} + +bool MstToPcsPropagation::InternalStorage::insert(BatchPtr batch) { + pending_batches.emplace_back(std::move(batch)); + return true; +} diff --git a/irohad/multi_sig_transactions/mst_processor.hpp b/irohad/multi_sig_transactions/mst_processor.hpp index 3f3b6223cb..1a49f92484 100644 --- a/irohad/multi_sig_transactions/mst_processor.hpp +++ b/irohad/multi_sig_transactions/mst_processor.hpp @@ -8,6 +8,7 @@ #include #include + #include #include "logger/logger_fwd.hpp" #include "multi_sig_transactions/mst_types.hpp" @@ -41,19 +42,22 @@ namespace iroha { /** * Prove updating of state for handling status of signing */ - rxcpp::observable> onStateUpdate() const; + rxcpp::observable> onStateUpdate() const; /** * Observable emit batches which are prepared for further processing in * system */ - rxcpp::observable onPreparedBatches() const; + rxcpp::observable> onPreparedBatches() const; /** * Observable emit expired by time transactions */ rxcpp::observable onExpiredBatches() const; + /// Get the next completed batch with at most max_txs transactions. + boost::optional getCompletedBatch(const size_t max_txs); + virtual ~MstProcessor() = default; protected: diff --git a/irohad/multi_sig_transactions/mst_processor_impl.hpp b/irohad/multi_sig_transactions/mst_processor_impl.hpp index 3d656c5478..548fd098e2 100644 --- a/irohad/multi_sig_transactions/mst_processor_impl.hpp +++ b/irohad/multi_sig_transactions/mst_processor_impl.hpp @@ -6,11 +6,12 @@ #ifndef IROHA_MST_PROCESSOR_IMPL_HPP #define IROHA_MST_PROCESSOR_IMPL_HPP +#include "multi_sig_transactions/mst_processor.hpp" + #include #include "cryptography/public_key.hpp" #include "logger/logger_fwd.hpp" -#include "multi_sig_transactions/mst_processor.hpp" #include "multi_sig_transactions/mst_propagation_strategy.hpp" #include "multi_sig_transactions/mst_time_provider.hpp" #include "multi_sig_transactions/storage/mst_storage.hpp" @@ -56,7 +57,7 @@ namespace iroha { // ------------------| MstTransportNotification override |------------------ void onNewState(const shared_model::crypto::PublicKey &from, - ConstRefState new_state) override; + MstState new_state) override; // ----------------------------| end override |----------------------------- @@ -74,14 +75,15 @@ namespace iroha { * signatures and ready to move forward * @param state with those batches */ - void completedBatchesNotify(ConstRefState state) const; + void completedBatchesNotify( + std::vector> completed) const; /** * Notify subscribers when some of the batches received new signatures, but * still are not completed * @param state with those batches */ - void updatedBatchesNotify(ConstRefState state) const; + void updatedBatchesNotify(std::shared_ptr state) const; /** * Notify subscribers when some of the bathes get expired @@ -98,10 +100,10 @@ namespace iroha { // rx subjects /// use for share new states from other peers - rxcpp::subjects::subject> state_subject_; + rxcpp::subjects::subject> state_subject_; /// use for share completed batches - rxcpp::subjects::subject batches_subject_; + rxcpp::subjects::subject> batches_subject_; /// use for share expired batches rxcpp::subjects::subject expired_subject_; diff --git a/irohad/multi_sig_transactions/mst_types.hpp b/irohad/multi_sig_transactions/mst_types.hpp index 4179323448..f4565f3c46 100644 --- a/irohad/multi_sig_transactions/mst_types.hpp +++ b/irohad/multi_sig_transactions/mst_types.hpp @@ -9,6 +9,7 @@ #include #include "interfaces/common_objects/types.hpp" +#include "storage_shared_limit/moved_item.hpp" namespace shared_model { namespace interface { @@ -21,6 +22,7 @@ namespace shared_model { namespace iroha { using BatchPtr = std::shared_ptr; + using MovedBatch = MovedItem; using ConstPeer = const shared_model::interface::Peer; using TimeType = shared_model::interface::types::TimestampType; using TxResponse = @@ -38,20 +40,6 @@ namespace iroha { using ConstRefState = ConstRefT; using DataType = BatchPtr; - - /** - * Contains result of updating local state: - * - state with completed batches - * - state with updated (still not enough signatures) batches - */ - struct StateUpdateResult { - StateUpdateResult(std::shared_ptr completed_state, - std::shared_ptr updated_state) - : completed_state_{std::move(completed_state)}, - updated_state_{std::move(updated_state)} {} - std::shared_ptr completed_state_; - std::shared_ptr updated_state_; - }; } // namespace iroha #endif // IROHA_MST_TYPES_HPP diff --git a/irohad/multi_sig_transactions/propagation_to_pcs.hpp b/irohad/multi_sig_transactions/propagation_to_pcs.hpp new file mode 100644 index 0000000000..21acee398a --- /dev/null +++ b/irohad/multi_sig_transactions/propagation_to_pcs.hpp @@ -0,0 +1,53 @@ +/** + * Copyright Soramitsu Co., Ltd. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +#ifndef IROHA_MST_PROPAGATION_TO_PCS +#define IROHA_MST_PROPAGATION_TO_PCS + +#include + +#include +#include "logger/logger_fwd.hpp" +#include "multi_sig_transactions/mst_types.hpp" +#include "network/peer_communication_service.hpp" +#include "storage_shared_limit/limitable_storage.hpp" +#include "storage_shared_limit/limited_storage.hpp" +#include "storage_shared_limit/storage_limit.hpp" + +namespace iroha { + + class MstToPcsPropagation { + public: + MstToPcsPropagation( + std::shared_ptr pcs, + std::shared_ptr> storage_limit, + rxcpp::observable propagation_available, + logger::LoggerPtr log); + + virtual ~MstToPcsPropagation(); + + void notifyCompletedBatch(std::shared_ptr batch); + + size_t pendingBatchesQuantity() const; + + private: + logger::LoggerPtr log_; + std::shared_ptr pcs_; + + struct InternalStorage : public LimitableStorage { + bool insert(BatchPtr batch) override; + + // Batches not yet accepted by PCS in the order they were added. + std::list pending_batches; + }; + + LimitedStorage pending_batches_; + + rxcpp::composite_subscription propagation_available_subscription_; + }; + +} // namespace iroha + +#endif // IROHA_MST_PROPAGATION_TO_PCS diff --git a/irohad/multi_sig_transactions/state/impl/mst_state.cpp b/irohad/multi_sig_transactions/state/impl/mst_state.cpp index de5a9e9386..710b2f8adc 100644 --- a/irohad/multi_sig_transactions/state/impl/mst_state.cpp +++ b/irohad/multi_sig_transactions/state/impl/mst_state.cpp @@ -72,60 +72,73 @@ namespace iroha { // ------------------------------| public api |------------------------------- + MstState::MstState(MstState &&other) + : batches_(std::move(other.batches_)), + txs_quantity_(other.txs_quantity_), + log_(std::move(other.log_)) {} + MstState MstState::empty(const CompleterType &completer, - size_t transaction_limit, + std::shared_ptr storage_limit, logger::LoggerPtr log) { - return MstState(completer, transaction_limit, std::move(log)); + return MstState(completer, std::move(storage_limit), std::move(log)); } StateUpdateResult MstState::operator+=(const DataType &rhs) { - auto state_update = - StateUpdateResult{std::make_shared( - MstState::empty(completer_, txs_limit_, log_)), - std::make_shared( - MstState::empty(completer_, txs_limit_, log_))}; + auto state_update = StateUpdateResult{completer_, log_}; insertOne(state_update, rhs); return state_update; } StateUpdateResult MstState::operator+=(const MstState &rhs) { - auto state_update = - StateUpdateResult{std::make_shared( - MstState::empty(completer_, txs_limit_, log_)), - std::make_shared( - MstState::empty(completer_, txs_limit_, log_))}; - for (auto &&rhs_tx : rhs.batches_.right | boost::adaptors::map_keys) { - insertOne(state_update, rhs_tx); - } - return state_update; + auto state_update = StateUpdateResult{completer_, log_}; + return rhs.batches_.access([this, &state_update](const auto &storage) { + for (auto &&rhs_tx : storage.batches.right | boost::adaptors::map_keys) { + this->insertOne(state_update, rhs_tx); + } + return state_update; + }); } MstState MstState::operator-(const MstState &rhs) const { - const auto &my_batches = batches_.right | boost::adaptors::map_keys; std::vector difference; - difference.reserve(boost::size(batches_)); - for (const auto &batch : my_batches) { - if (rhs.batches_.right.find(batch) == rhs.batches_.right.end()) { - difference.push_back(batch); - } - } - return MstState(this->completer_, txs_limit_, difference, log_); + batches_.access([&difference, &rhs](const auto &storage) { + difference.reserve(boost::size(storage.batches)); + rhs.batches_.access( + [&difference, + my_batches = storage.batches.right + | boost::adaptors::map_keys](const auto &storage) { + for (const auto &batch : my_batches) { + if (storage.batches.right.find(batch) + == storage.batches.right.end()) { + difference.push_back(batch); + } + } + }); + }); + return MstState(this->completer_, batches_.sharedLimit(), difference, log_); } bool MstState::isEmpty() const { - return batches_.empty(); + return batches_.access( + [](const auto &storage) { return storage.batches.empty(); }); } std::unordered_set MstState::getBatches() const { - const auto batches_range = batches_.right | boost::adaptors::map_keys; - return {batches_range.begin(), batches_range.end()}; + return batches_.access([](const auto &storage) { + const auto batches_range = + storage.batches.right | boost::adaptors::map_keys; + return std::unordered_set{batches_range.begin(), + batches_range.end()}; + }); } MstState MstState::extractExpired(const TimeType ¤t_time) { - MstState out = MstState::empty(completer_, txs_limit_, log_); + MstState out = empty(completer_, batches_.sharedLimit(), log_); extractExpiredImpl(current_time, out); return out; } @@ -135,13 +148,15 @@ namespace iroha { } size_t MstState::transactionsQuantity() const { - assert(txs_quantity_ - == countTxsInBatches(batches_.right | boost::adaptors::map_keys)); + assert(txs_quantity_ == batches_.access([](const auto &storage) { + return countTxsInBatches(storage.batches.right + | boost::adaptors::map_keys); + })); return txs_quantity_; } size_t MstState::batchesQuantity() const { - return batches_.right.size(); + return batches_.itemsQuantity(); } // ------------------------------| private api |------------------------------ @@ -172,23 +187,23 @@ namespace iroha { } MstState::MstState(const CompleterType &completer, - size_t transaction_limit, + std::shared_ptr storage_limit, logger::LoggerPtr log) : MstState(completer, - transaction_limit, + std::move(storage_limit), std::vector{}, std::move(log)) {} MstState::MstState(const CompleterType &completer, - size_t transaction_limit, + std::shared_ptr storage_limit, const BatchesForwardCollectionType &batches, logger::LoggerPtr log) : completer_(completer), - txs_limit_(transaction_limit), + batches_(std::move(storage_limit), std::make_unique()), txs_quantity_(0), log_(std::move(log)) { for (const auto &batch : batches) { - batches_.insert({oldestTimestamp(batch), batch}); + batches_.insert(batch); txs_quantity_ += batch->transactions().size(); } } @@ -196,68 +211,86 @@ namespace iroha { void MstState::insertOne(StateUpdateResult &state_update, const DataType &rhs_batch) { log_->info("batch: {}", *rhs_batch); - auto corresponding = batches_.right.find(rhs_batch); - if (corresponding == batches_.right.end()) { - // when state does not contain transaction - if (transactionsQuantity() + boost::size(rhs_batch->transactions()) - <= txs_limit_) { - // there is enough room for the new batch - rawInsert(rhs_batch); - state_update.updated_state_->rawInsert(rhs_batch); - } else { - // there is not enough room for the new batch - log_->info( - "Dropped a batch because it would exceed the transaction limit " - "(currently have {} out of {} transactions in state): {}", - transactionsQuantity(), - txs_limit_, - *rhs_batch); + auto completed_batches = batches_.move([this, &state_update, &rhs_batch]( + auto &storage) + -> std::vector { + auto corresponding = storage.batches.right.find(rhs_batch); + if (corresponding == storage.batches.right.end()) { + // when state does not contain transaction + if (this->rawInsert(rhs_batch)) { + // there is enough room for the new batch + BOOST_VERIFY_MSG(state_update.updated_state_->rawInsert(rhs_batch), + "Could not insert new MST batch to state update."); + } else { + // there is not enough room for the new batch + log_->info("Dropped a batch because it did not fit into storage: {}", + *rhs_batch); + } + return {}; } - return; - } - DataType found = corresponding->first; - // Append new signatures to the existing state - auto inserted_new_signatures = mergeSignaturesInBatch(found, rhs_batch); - - if (completer_->isCompleted(found)) { - // state already has completed transaction, - // remove from state and return it - assert(txs_quantity_ >= boost::size(found->transactions())); - txs_quantity_ -= boost::size(found->transactions()); - batches_.right.erase(found); - state_update.completed_state_->rawInsert(found); - return; - } + DataType found = corresponding->first; + // Append new signatures to the existing state + auto inserted_new_signatures = mergeSignaturesInBatch(found, rhs_batch); + + if (completer_->isCompleted(found)) { + // state already has completed transaction, + // remove from state and return it + assert(txs_quantity_ >= boost::size(found->transactions())); + txs_quantity_ -= boost::size(found->transactions()); + storage.batches.right.erase(found); + return {found}; + } - // if batch still isn't completed, return it, if new signatures were - // inserted - if (inserted_new_signatures) { - state_update.updated_state_->rawInsert(found); + // if batch still isn't completed, return it, if new signatures + // were inserted + if (inserted_new_signatures) { + state_update.updated_state_->rawInsert(found); + } + return {}; + }); + for (auto &batch : completed_batches) { + state_update.completed_state_.emplace_back(std::move(batch)); } } - void MstState::rawInsert(const DataType &rhs_batch) { - txs_quantity_ += boost::size(rhs_batch->transactions()); - batches_.insert({oldestTimestamp(rhs_batch), rhs_batch}); + bool MstState::rawInsert(const DataType &rhs_batch) { + if (batches_.insert(rhs_batch)) { + txs_quantity_ += boost::size(rhs_batch->transactions()); + return true; + } + return false; } bool MstState::contains(const DataType &element) const { - return batches_.right.find(element) != batches_.right.end(); + return batches_.access([&element](const auto &storage) { + return storage.batches.right.find(element) != storage.batches.right.end(); + }); } void MstState::extractExpiredImpl(const TimeType ¤t_time, - boost::optional extracted) { - for (auto it = batches_.left.begin(); it != batches_.left.end() - and completer_->isExpired(it->second, current_time);) { - if (extracted) { - *extracted += it->second; + boost::optional opt_extracted) { + auto extracted = batches_.extract([this, ¤t_time, &opt_extracted]( + auto &storage) { + std::vector extracted; + for (auto it = storage.batches.left.begin(); + it != storage.batches.left.end() + and completer_->isExpired(it->second, current_time);) { + assert(txs_quantity_ >= boost::size(it->second->transactions())); + txs_quantity_ -= boost::size(it->second->transactions()); + if (opt_extracted) { + *opt_extracted += it->second; + } + extracted.emplace_back(it->second); + it = storage.batches.left.erase(it); + assert(it == storage.batches.left.begin()); } - assert(txs_quantity_ >= boost::size(it->second->transactions())); - txs_quantity_ -= boost::size(it->second->transactions()); - it = batches_.left.erase(it); - assert(it == batches_.left.begin()); - } + return extracted; + }); + } + + bool MstState::InternalStorage::insert(BatchPtr batch) { + return batches.insert({oldestTimestamp(batch), batch}).second; } } // namespace iroha diff --git a/irohad/multi_sig_transactions/state/mst_state.hpp b/irohad/multi_sig_transactions/state/mst_state.hpp index 7d3a772698..1407ae7b31 100644 --- a/irohad/multi_sig_transactions/state/mst_state.hpp +++ b/irohad/multi_sig_transactions/state/mst_state.hpp @@ -22,6 +22,10 @@ #include "logger/logger_fwd.hpp" #include "multi_sig_transactions/hash.hpp" #include "multi_sig_transactions/mst_types.hpp" +#include "storage_shared_limit/limitable_storage.hpp" +#include "storage_shared_limit/limited_storage.hpp" +#include "storage_shared_limit/storage_limit.hpp" +#include "storage_shared_limit/storage_limit_none.hpp" namespace iroha { @@ -86,10 +90,16 @@ namespace iroha { using CompleterType = std::shared_ptr; + struct StateUpdateResult; + class MstState { public: // -----------------------------| public api |------------------------------ + using BatchStorageLimit = StorageLimit; + + MstState(MstState &&other); + /** * Create empty state * @param completer - strategy for determine completed and expired batches @@ -98,7 +108,7 @@ namespace iroha { * @return empty mst state */ static MstState empty(const CompleterType &completer, - size_t transaction_limit, + std::shared_ptr storage_limit, logger::LoggerPtr log); /** @@ -167,18 +177,21 @@ namespace iroha { /// Apply visitor to all batches. template inline void iterateBatches(const Visitor &visitor) const { - const auto batches_range = batches_.right | boost::adaptors::map_keys; - std::for_each(batches_range.begin(), batches_range.end(), visitor); + batches_.access([&visitor](const auto &storage) { + const auto batches_range = + storage.batches.right | boost::adaptors::map_keys; + std::for_each(batches_range.begin(), batches_range.end(), visitor); + }); } /// Apply visitor to all transactions. template inline void iterateTransactions(const Visitor &visitor) const { - for (const auto &batch : batches_.right | boost::adaptors::map_keys) { + iterateBatches([&visitor](const BatchPtr &batch) { std::for_each(batch->transactions().begin(), batch->transactions().end(), visitor); - } + }); } private: @@ -195,11 +208,11 @@ namespace iroha { BatchHashEquality>>; MstState(const CompleterType &completer, - size_t transaction_limit, + std::shared_ptr storage_limit, logger::LoggerPtr log); MstState(const CompleterType &completer, - size_t transaction_limit, + std::shared_ptr storage_limit, const BatchesForwardCollectionType &batches, logger::LoggerPtr log); @@ -214,8 +227,9 @@ namespace iroha { /** * Insert new value in state with keeping invariant * @param rhs_tx - data for insertion + * @return whether the batch was inserted */ - void rawInsert(const DataType &rhs_tx); + bool rawInsert(const DataType &rhs_tx); /** * Erase expired batches, optionally returning them. @@ -229,14 +243,37 @@ namespace iroha { CompleterType completer_; - BatchesBimap batches_; + struct InternalStorage : public LimitableStorage { + bool insert(BatchPtr batch) override; + + BatchesBimap batches; + }; + + LimitedStorage batches_; - size_t txs_limit_; size_t txs_quantity_{0}; logger::LoggerPtr log_; }; + /** + * Contains result of updating local state: + * - state with completed batches + * - state with updated (still not enough signatures) batches + */ + struct StateUpdateResult { + StateUpdateResult(std::shared_ptr completer, + logger::LoggerPtr log) + : updated_state_(std::make_shared(MstState::empty( + std::move(completer), + std::make_shared>(), + std::move(log)))) {} + + std::vector> completed_state_; + + std::shared_ptr updated_state_; + }; + } // namespace iroha #endif // IROHA_MST_STATE_HPP diff --git a/irohad/multi_sig_transactions/storage/impl/mst_storage_impl.cpp b/irohad/multi_sig_transactions/storage/impl/mst_storage_impl.cpp index d120958763..6dcd0dd037 100644 --- a/irohad/multi_sig_transactions/storage/impl/mst_storage_impl.cpp +++ b/irohad/multi_sig_transactions/storage/impl/mst_storage_impl.cpp @@ -13,22 +13,25 @@ namespace iroha { auto target_state_iter = peer_states_.find(target_peer_key); if (target_state_iter == peer_states_.end()) { return peer_states_ - .insert({target_peer_key, - MstState::empty(completer_, txs_limit_, mst_state_logger_)}) + .emplace( + target_peer_key, + MstState::empty(completer_, storage_limit_, mst_state_logger_)) .first; } return target_state_iter; } // -----------------------------| interface API |----------------------------- - MstStorageStateImpl::MstStorageStateImpl(const CompleterType &completer, - size_t transaction_limit, - logger::LoggerPtr mst_state_logger, - logger::LoggerPtr log) + MstStorageStateImpl::MstStorageStateImpl( + const CompleterType &completer, + std::shared_ptr> storage_limit, + logger::LoggerPtr mst_state_logger, + logger::LoggerPtr log) : MstStorage(log), completer_(completer), - txs_limit_(transaction_limit), - own_state_(MstState::empty(completer_, txs_limit_, mst_state_logger)), + storage_limit_(std::move(storage_limit)), + own_state_( + MstState::empty(completer_, storage_limit_, mst_state_logger)), mst_state_logger_(std::move(mst_state_logger)) {} auto MstStorageStateImpl::applyImpl( diff --git a/irohad/multi_sig_transactions/storage/mst_storage_impl.hpp b/irohad/multi_sig_transactions/storage/mst_storage_impl.hpp index 420329f918..e16fb62512 100644 --- a/irohad/multi_sig_transactions/storage/mst_storage_impl.hpp +++ b/irohad/multi_sig_transactions/storage/mst_storage_impl.hpp @@ -10,6 +10,7 @@ #include "logger/logger_fwd.hpp" #include "multi_sig_transactions/hash.hpp" #include "multi_sig_transactions/storage/mst_storage.hpp" +#include "storage_shared_limit/storage_limit.hpp" namespace iroha { class MstStorageStateImpl : public MstStorage { @@ -33,7 +34,7 @@ namespace iroha { * @param log - the logger to use in the new object */ MstStorageStateImpl(const CompleterType &completer, - size_t transaction_limit, + std::shared_ptr> storage_limit, logger::LoggerPtr mst_state_logger, logger::LoggerPtr log); @@ -61,7 +62,7 @@ namespace iroha { // ---------------------------| private fields |---------------------------- const CompleterType completer_; - size_t txs_limit_; + std::shared_ptr> storage_limit_; std::unordered_map diff --git a/irohad/multi_sig_transactions/transport/impl/mst_transport_grpc.cpp b/irohad/multi_sig_transactions/transport/impl/mst_transport_grpc.cpp index 42797fa790..87c2326b08 100644 --- a/irohad/multi_sig_transactions/transport/impl/mst_transport_grpc.cpp +++ b/irohad/multi_sig_transactions/transport/impl/mst_transport_grpc.cpp @@ -35,7 +35,7 @@ MstTransportGrpc::MstTransportGrpc( transaction_batch_factory, std::shared_ptr tx_presence_cache, std::shared_ptr mst_completer, - size_t transaction_limit, + std::shared_ptr> mst_storage_limit, shared_model::crypto::PublicKey my_key, logger::LoggerPtr mst_state_logger, logger::LoggerPtr log) @@ -45,7 +45,7 @@ MstTransportGrpc::MstTransportGrpc( batch_factory_(std::move(transaction_batch_factory)), tx_presence_cache_(std::move(tx_presence_cache)), mst_completer_(std::move(mst_completer)), - mst_state_txs_limit_(transaction_limit), + mst_storage_limit_(mst_storage_limit), my_key_(shared_model::crypto::toBinaryString(my_key)), mst_state_logger_(std::move(mst_state_logger)), log_(std::move(log)) {} @@ -90,7 +90,7 @@ grpc::Status MstTransportGrpc::SendState( auto batches = batch_parser_->parseBatches(transactions); MstState new_state = - MstState::empty(mst_completer_, mst_state_txs_limit_, mst_state_logger_); + MstState::empty(mst_completer_, mst_storage_limit_, mst_state_logger_); for (auto &batch : batches) { batch_factory_->createTransactionBatch(batch).match( diff --git a/irohad/multi_sig_transactions/transport/mst_transport_grpc.hpp b/irohad/multi_sig_transactions/transport/mst_transport_grpc.hpp index 1ff6c63cf4..81d996d160 100644 --- a/irohad/multi_sig_transactions/transport/mst_transport_grpc.hpp +++ b/irohad/multi_sig_transactions/transport/mst_transport_grpc.hpp @@ -15,8 +15,10 @@ #include "interfaces/iroha_internal/transaction_batch_factory.hpp" #include "interfaces/iroha_internal/transaction_batch_parser.hpp" #include "logger/logger_fwd.hpp" +#include "multi_sig_transactions/mst_types.hpp" #include "multi_sig_transactions/state/mst_state.hpp" #include "network/impl/async_grpc_client.hpp" +#include "storage_shared_limit/storage_limit.hpp" namespace iroha { @@ -43,7 +45,7 @@ namespace iroha { transaction_batch_factory, std::shared_ptr tx_presence_cache, std::shared_ptr mst_completer, - size_t transaction_limit, + std::shared_ptr> mst_storage_limit, shared_model::crypto::PublicKey my_key, logger::LoggerPtr mst_state_logger, logger::LoggerPtr log); @@ -84,7 +86,7 @@ namespace iroha { std::shared_ptr tx_presence_cache_; /// source peer key for MST propogation messages std::shared_ptr mst_completer_; - size_t mst_state_txs_limit_; + std::shared_ptr> mst_storage_limit_; const std::string my_key_; logger::LoggerPtr mst_state_logger_; ///< Logger for created MstState diff --git a/irohad/network/mst_transport.hpp b/irohad/network/mst_transport.hpp index f238f40d49..dc6d8aa6d6 100644 --- a/irohad/network/mst_transport.hpp +++ b/irohad/network/mst_transport.hpp @@ -23,9 +23,8 @@ namespace iroha { * @param from - key of the peer emitted the state * @param new_state - state propagated from peer */ - virtual void onNewState( - const shared_model::crypto::PublicKey &from, - const MstState &new_state) = 0; + virtual void onNewState(const shared_model::crypto::PublicKey &from, + MstState new_state) = 0; virtual ~MstTransportNotification() = default; }; diff --git a/irohad/network/ordering_gate.hpp b/irohad/network/ordering_gate.hpp index 47c6dad9dd..736811d892 100644 --- a/irohad/network/ordering_gate.hpp +++ b/irohad/network/ordering_gate.hpp @@ -41,6 +41,15 @@ namespace iroha { */ virtual rxcpp::observable onProposal() = 0; + /** + * If propagateBatch returns false, which means the batch was not + * accepted by the OrderingGate, this observable signals when the + * OrderingGate is ready to accept more batches, so the propagateBatch + * method can be called again. The observable emits a rough amount of + * transactions that the gate is ready to accept for propagation. + */ + virtual rxcpp::observable onReadyToAcceptTxs() = 0; + virtual ~OrderingGate() = default; }; } // namespace network diff --git a/irohad/ordering/impl/on_demand_ordering_gate.cpp b/irohad/ordering/impl/on_demand_ordering_gate.cpp index db21dfeb87..c0ae94131c 100644 --- a/irohad/ordering/impl/on_demand_ordering_gate.cpp +++ b/irohad/ordering/impl/on_demand_ordering_gate.cpp @@ -51,6 +51,7 @@ OnDemandOrderingGate::OnDemandOrderingGate( // notify our ordering service about new round ordering_service_->onCollaborationOutcome(new_round); + // this rotates the cache_ this->sendCachedTransactions(); // request proposal for the current round @@ -59,6 +60,12 @@ OnDemandOrderingGate::OnDemandOrderingGate( // vote for the object received from the network proposal_notifier_.get_subscriber().on_next( network::OrderingEvent{std::move(proposal), new_round}); + + const auto can_accept_txs_number = cache_->availableTxsCapacity(); + if (can_accept_txs_number > 0) { + can_accept_txs_notifier_.get_subscriber().on_next( + can_accept_txs_number); + } })), cache_(std::move(cache)), proposal_factory_(std::move(factory)), @@ -85,6 +92,10 @@ rxcpp::observable OnDemandOrderingGate::onProposal() { return proposal_notifier_.get_observable(); } +rxcpp::observable OnDemandOrderingGate::onReadyToAcceptTxs() { + return can_accept_txs_notifier_.get_observable(); +} + boost::optional> OnDemandOrderingGate::processProposalRequest( boost::optional< diff --git a/irohad/ordering/impl/on_demand_ordering_gate.hpp b/irohad/ordering/impl/on_demand_ordering_gate.hpp index 3e22f9667e..b1417ad26b 100644 --- a/irohad/ordering/impl/on_demand_ordering_gate.hpp +++ b/irohad/ordering/impl/on_demand_ordering_gate.hpp @@ -56,6 +56,10 @@ namespace iroha { rxcpp::observable onProposal() override; + /// Emits a rough amount of transactions that the gate is ready to + /// accept for propagation. + rxcpp::observable onReadyToAcceptTxs() override; + private: /** * Handle an incoming proposal from ordering service @@ -89,6 +93,7 @@ namespace iroha { std::shared_ptr tx_cache_; rxcpp::subjects::subject proposal_notifier_; + rxcpp::subjects::subject can_accept_txs_notifier_; }; } // namespace ordering diff --git a/irohad/ordering/impl/ordering_gate_cache/on_demand_cache.cpp b/irohad/ordering/impl/ordering_gate_cache/on_demand_cache.cpp index 3b90c48b86..d0c15b29f4 100644 --- a/irohad/ordering/impl/ordering_gate_cache/on_demand_cache.cpp +++ b/irohad/ordering/impl/ordering_gate_cache/on_demand_cache.cpp @@ -87,3 +87,7 @@ void OnDemandCache::rotate() { auto second_element_it = boost::next(circ_buffer.begin()); circ_buffer.rotate(second_element_it); } + +size_t OnDemandCache::availableTxsCapacity() const { + return max_cache_size_ - circ_buffer.back().first; +} diff --git a/irohad/ordering/impl/ordering_gate_cache/on_demand_cache.hpp b/irohad/ordering/impl/ordering_gate_cache/on_demand_cache.hpp index 95250cfa22..623421d479 100644 --- a/irohad/ordering/impl/ordering_gate_cache/on_demand_cache.hpp +++ b/irohad/ordering/impl/ordering_gate_cache/on_demand_cache.hpp @@ -37,6 +37,8 @@ namespace iroha { virtual void rotate() override; + size_t availableTxsCapacity() const override; + private: const uint64_t max_cache_size_; mutable std::shared_timed_mutex mutex_; diff --git a/irohad/ordering/impl/ordering_gate_cache/ordering_gate_cache.hpp b/irohad/ordering/impl/ordering_gate_cache/ordering_gate_cache.hpp index ed2723c6ef..9818584b2e 100644 --- a/irohad/ordering/impl/ordering_gate_cache/ordering_gate_cache.hpp +++ b/irohad/ordering/impl/ordering_gate_cache/ordering_gate_cache.hpp @@ -81,6 +81,8 @@ namespace iroha { */ virtual void rotate() = 0; + virtual size_t availableTxsCapacity() const = 0; + virtual ~OrderingGateCache() = default; }; diff --git a/irohad/pending_txs_storage/impl/pending_txs_storage_impl.cpp b/irohad/pending_txs_storage/impl/pending_txs_storage_impl.cpp index 67aaf36ac0..acdb316fd5 100644 --- a/irohad/pending_txs_storage/impl/pending_txs_storage_impl.cpp +++ b/irohad/pending_txs_storage/impl/pending_txs_storage_impl.cpp @@ -7,20 +7,21 @@ #include "interfaces/transaction.hpp" #include "multi_sig_transactions/state/mst_state.hpp" +#include "storage_shared_limit/moved_item.hpp" namespace iroha { PendingTransactionStorageImpl::PendingTransactionStorageImpl( StateObservable updated_batches, - BatchObservable prepared_batch, + rxcpp::observable> prepared_batch, BatchObservable expired_batch) { updated_batches_subscription_ = updated_batches.subscribe([this](const SharedState &batches) { this->updatedBatchesHandler(batches); }); prepared_batch_subscription_ = - prepared_batch.subscribe([this](const SharedBatch &preparedBatch) { - this->removeBatch(preparedBatch); + prepared_batch.subscribe([this](const auto &preparedBatch) { + this->removeBatch(preparedBatch->get()); }); expired_batch_subscription_ = expired_batch.subscribe([this](const SharedBatch &expiredBatch) { diff --git a/irohad/pending_txs_storage/impl/pending_txs_storage_impl.hpp b/irohad/pending_txs_storage/impl/pending_txs_storage_impl.hpp index d29ed0f5a7..0d6556af95 100644 --- a/irohad/pending_txs_storage/impl/pending_txs_storage_impl.hpp +++ b/irohad/pending_txs_storage/impl/pending_txs_storage_impl.hpp @@ -17,6 +17,8 @@ namespace iroha { + template + class MovedItem; class MstState; class PendingTransactionStorageImpl : public PendingTransactionStorage { @@ -26,14 +28,16 @@ namespace iroha { using SharedTxsCollectionType = shared_model::interface::types::SharedTxsCollectionType; using TransactionBatch = shared_model::interface::TransactionBatch; - using SharedState = std::shared_ptr; + using SharedState = std::shared_ptr; using SharedBatch = std::shared_ptr; using StateObservable = rxcpp::observable; using BatchObservable = rxcpp::observable; - PendingTransactionStorageImpl(StateObservable updated_batches, - BatchObservable prepared_batch, - BatchObservable expired_batch); + PendingTransactionStorageImpl( + StateObservable updated_batches, + rxcpp::observable>> + prepared_batch, + BatchObservable expired_batch); ~PendingTransactionStorageImpl() override; diff --git a/irohad/torii/processor/impl/transaction_processor_impl.cpp b/irohad/torii/processor/impl/transaction_processor_impl.cpp index 8daeab84c4..0fef562e6d 100644 --- a/irohad/torii/processor/impl/transaction_processor_impl.cpp +++ b/irohad/torii/processor/impl/transaction_processor_impl.cpp @@ -108,14 +108,7 @@ namespace iroha { }); mst_processor_->onPreparedBatches().subscribe([this](auto &&batch) { log_->info("MST batch prepared"); - this->publishEnoughSignaturesStatus(batch->transactions()); - if (not this->pcs_->propagate_batch(batch)) { - log_->error("PCS was unable to serve the batch received from MST {}", - batch->toString()); - // TODO IR-430 igor-egorov, a batch might not be accepted by pcs, need - // to investigate the lifetime of a batch and IR-432 handle it somehow - // in case when pcs is not ready - } + this->publishEnoughSignaturesStatus(batch->get()->transactions()); }); mst_processor_->onExpiredBatches().subscribe([this](auto &&batch) { log_->info("MST batch {} is expired", batch->reducedHash()); diff --git a/test/framework/integration_framework/fake_peer/fake_peer.cpp b/test/framework/integration_framework/fake_peer/fake_peer.cpp index 3cca6a1b34..00d282043c 100644 --- a/test/framework/integration_framework/fake_peer/fake_peer.cpp +++ b/test/framework/integration_framework/fake_peer/fake_peer.cpp @@ -37,8 +37,6 @@ using namespace shared_model::crypto; using namespace framework::expected; -static constexpr size_t kMstStateTxLimit = 100; - static std::shared_ptr createPeer( const std::shared_ptr &common_objects_factory, @@ -109,7 +107,8 @@ namespace integration_framework { tx_presence_cache, std::make_shared( std::chrono::minutes(0)), - kMstStateTxLimit, + std::make_shared>>(), keypair_->publicKey(), mst_log_manager_->getChild("State")->getLogger(), mst_log_manager_->getChild("Transport")->getLogger())), diff --git a/test/framework/integration_framework/fake_peer/network/mst_message.hpp b/test/framework/integration_framework/fake_peer/network/mst_message.hpp index 076c340958..024bd38b8b 100644 --- a/test/framework/integration_framework/fake_peer/network/mst_message.hpp +++ b/test/framework/integration_framework/fake_peer/network/mst_message.hpp @@ -12,9 +12,8 @@ namespace integration_framework { namespace fake_peer { struct MstMessage final { - MstMessage(const shared_model::crypto::PublicKey &f, - const iroha::MstState &s) - : from(f), state(s) {} + MstMessage(const shared_model::crypto::PublicKey &f, iroha::MstState s) + : from(f), state(std::move(s)) {} shared_model::crypto::PublicKey from; iroha::MstState state; }; diff --git a/test/framework/integration_framework/fake_peer/network/mst_network_notifier.cpp b/test/framework/integration_framework/fake_peer/network/mst_network_notifier.cpp index f99bb5b006..014387b307 100644 --- a/test/framework/integration_framework/fake_peer/network/mst_network_notifier.cpp +++ b/test/framework/integration_framework/fake_peer/network/mst_network_notifier.cpp @@ -10,10 +10,10 @@ namespace integration_framework { void MstNetworkNotifier::onNewState( const shared_model::crypto::PublicKey &from, - const iroha::MstState &new_state) { + iroha::MstState new_state) { std::lock_guard guard(mst_subject_mutex_); mst_subject_.get_subscriber().on_next( - std::make_shared(from, new_state)); + std::make_shared(from, std::move(new_state))); } rxcpp::observable> diff --git a/test/framework/integration_framework/fake_peer/network/mst_network_notifier.hpp b/test/framework/integration_framework/fake_peer/network/mst_network_notifier.hpp index bce80ff7cd..942d9d95f0 100644 --- a/test/framework/integration_framework/fake_peer/network/mst_network_notifier.hpp +++ b/test/framework/integration_framework/fake_peer/network/mst_network_notifier.hpp @@ -21,7 +21,7 @@ namespace integration_framework { : public iroha::network::MstTransportNotification { public: void onNewState(const shared_model::crypto::PublicKey &from, - const iroha::MstState &new_state) override; + iroha::MstState new_state) override; rxcpp::observable> getObservable(); diff --git a/test/framework/integration_framework/integration_test_framework.cpp b/test/framework/integration_framework/integration_test_framework.cpp index 32e3eedcd7..c71dba1e44 100644 --- a/test/framework/integration_framework/integration_test_framework.cpp +++ b/test/framework/integration_framework/integration_test_framework.cpp @@ -375,7 +375,7 @@ namespace integration_framework { iroha_instance_->run(); } - rxcpp::observable> + rxcpp::observable> IntegrationTestFramework::getMstStateUpdateObservable() { return iroha_instance_->getIrohaInstance() ->getMstProcessor() @@ -386,7 +386,8 @@ namespace integration_framework { IntegrationTestFramework::getMstPreparedBatchesObservable() { return iroha_instance_->getIrohaInstance() ->getMstProcessor() - ->onPreparedBatches(); + ->onPreparedBatches() + .map([](const auto &moved_batch) { return moved_batch->get(); }); } rxcpp::observable diff --git a/test/framework/integration_framework/integration_test_framework.hpp b/test/framework/integration_framework/integration_test_framework.hpp index cdece0fd6a..7b8b4295df 100644 --- a/test/framework/integration_framework/integration_test_framework.hpp +++ b/test/framework/integration_framework/integration_test_framework.hpp @@ -376,7 +376,7 @@ namespace integration_framework { */ IntegrationTestFramework &skipBlock(); - rxcpp::observable> + rxcpp::observable> getMstStateUpdateObservable(); rxcpp::observable getMstPreparedBatchesObservable(); diff --git a/test/module/irohad/multi_sig_transactions/CMakeLists.txt b/test/module/irohad/multi_sig_transactions/CMakeLists.txt index cc00644825..2f466022e4 100644 --- a/test/module/irohad/multi_sig_transactions/CMakeLists.txt +++ b/test/module/irohad/multi_sig_transactions/CMakeLists.txt @@ -57,3 +57,12 @@ AddTest(mst_net_input_test mst_net_input_test.cpp) target_link_libraries(mst_net_input_test integration_framework ) + +AddTest(mst_to_pcs_propagation_test mst_to_psc_propagation_test.cpp) +target_link_libraries(mst_to_pcs_propagation_test + mst_propagation_to_pcs + schema + shared_model_interfaces_factories + shared_model_proto_backend + test_logger + ) diff --git a/test/module/irohad/multi_sig_transactions/mst_mocks.hpp b/test/module/irohad/multi_sig_transactions/mst_mocks.hpp index b5add1d6c0..db830850de 100644 --- a/test/module/irohad/multi_sig_transactions/mst_mocks.hpp +++ b/test/module/irohad/multi_sig_transactions/mst_mocks.hpp @@ -13,6 +13,8 @@ #include "multi_sig_transactions/mst_time_provider.hpp" #include "multi_sig_transactions/mst_types.hpp" #include "network/mst_transport.hpp" +#include "storage_shared_limit/moved_item.hpp" +#include "storage_shared_limit/storage_limit_none.hpp" namespace iroha { @@ -31,7 +33,12 @@ namespace iroha { class MockMstTransportNotification : public network::MstTransportNotification { public: - MOCK_METHOD2(onNewState, + void onNewState(const shared_model::crypto::PublicKey &from, + MstState new_state) override { + onNewStateMock(from, new_state); + } + + MOCK_METHOD2(onNewStateMock, void(const shared_model::crypto::PublicKey &from, const MstState &state)); }; @@ -56,10 +63,23 @@ namespace iroha { MockMstProcessor(logger::LoggerPtr log) : MstProcessor(std::move(log)) {} MOCK_METHOD1(propagateBatchImpl, bool(const DataType &)); MOCK_CONST_METHOD0(onStateUpdateImpl, - rxcpp::observable>()); - MOCK_CONST_METHOD0(onPreparedBatchesImpl, rxcpp::observable()); + rxcpp::observable>()); + MOCK_CONST_METHOD0(onPreparedBatchesImpl, + rxcpp::observable>()); MOCK_CONST_METHOD0(onExpiredBatchesImpl, rxcpp::observable()); MOCK_CONST_METHOD1(batchInStorageImpl, bool(const DataType &)); }; + + struct MockMovedBatch : public MovedItem { + explicit MockMovedBatch(BatchPtr batch) + : MovedBatch(batch, std::make_shared>()) { + EXPECT_CALL(*this, get()).WillRepeatedly(::testing::Return(batch)); + EXPECT_CALL(*this, extract()) + .Times(::testing::AtMost(1)) + .WillRepeatedly(::testing::Return(batch)); + } + MOCK_CONST_METHOD0(get, BatchPtr()); + MOCK_METHOD0(extract, BatchPtr()); + }; } // namespace iroha #endif // IROHA_MST_MOCKS_HPP diff --git a/test/module/irohad/multi_sig_transactions/mst_processor_test.cpp b/test/module/irohad/multi_sig_transactions/mst_processor_test.cpp index db764132fc..be5ec33028 100644 --- a/test/module/irohad/multi_sig_transactions/mst_processor_test.cpp +++ b/test/module/irohad/multi_sig_transactions/mst_processor_test.cpp @@ -15,6 +15,7 @@ #include "module/shared_model/interface_mocks.hpp" #include "multi_sig_transactions/mst_processor_impl.hpp" #include "multi_sig_transactions/storage/mst_storage_impl.hpp" +#include "storage_shared_limit/storage_limit_none.hpp" auto log_ = getTestLogger("MstProcessorTest"); @@ -24,6 +25,8 @@ using namespace framework::test_subscriber; using testing::_; using testing::Return; +using StorageLimitDummy = StorageLimitNone; + class MstProcessorTest : public testing::Test { public: // --------------------------------| fields |--------------------------------- @@ -34,7 +37,6 @@ class MstProcessorTest : public testing::Test { /// use effective implementation of storage std::shared_ptr storage; std::shared_ptr mst_processor; - size_t mst_state_txs_limit{10}; // ---------------------------------| mocks |--------------------------------- @@ -50,11 +52,11 @@ class MstProcessorTest : public testing::Test { protected: void SetUp() override { transport = std::make_shared(); - storage = - std::make_shared(std::make_shared(), - mst_state_txs_limit, - getTestLogger("MstState"), - getTestLogger("MstStorage")); + storage = std::make_shared( + std::make_shared(), + std::make_shared(), + getTestLogger("MstState"), + getTestLogger("MstStorage")); propagation_strategy = std::make_shared(); EXPECT_CALL(*propagation_strategy, emitter()) @@ -256,12 +258,13 @@ TEST_F(MstProcessorTest, onUpdateFromTransportUsecase) { // ---------------------------------| when |---------------------------------- shared_model::crypto::PublicKey another_peer_key("another_pubkey"); - auto transported_state = MstState::empty(std::make_shared(), - mst_state_txs_limit, - getTestLogger("MstState")); + auto transported_state = + MstState::empty(std::make_shared(), + std::make_shared(), + getTestLogger("MstState")); transported_state += addSignaturesFromKeyPairs( makeTestBatch(txBuilder(1, time_now, quorum)), 0, makeKey()); - mst_processor->onNewState(another_peer_key, transported_state); + mst_processor->onNewState(another_peer_key, std::move(transported_state)); // ---------------------------------| then |---------------------------------- check(observers); @@ -310,7 +313,7 @@ TEST_F(MstProcessorTest, emptyStatePropagation) { auto another_peer_state = MstState::empty( std::make_shared(std::chrono::minutes(0)), - mst_state_txs_limit, + std::make_shared(), getTestLogger("MstState")); another_peer_state += makeTestBatch(txBuilder(1)); @@ -323,35 +326,3 @@ TEST_F(MstProcessorTest, emptyStatePropagation) { another_peer}; propagation_subject.get_subscriber().on_next(peers); } - -/** - * @given initialised mst processor with 10 as transactions limit - * - * @when two batches of 6 transactions are propagated via mst processor - * - * @then only one batch is accepted due to a limit of transactions - */ -TEST_F(MstProcessorTest, MstRespectsTransactionsLimit) { - auto batch1 = - addSignaturesFromKeyPairs(makeTestBatch(txBuilder(1, time_now), - txBuilder(1, time_now + 1), - txBuilder(1, time_now + 2), - txBuilder(1, time_now + 3), - txBuilder(1, time_now + 4), - txBuilder(1, time_now + 5)), - 0, - makeKey()); - - auto batch2 = - addSignaturesFromKeyPairs(makeTestBatch(txBuilder(1, time_now + 6), - txBuilder(1, time_now + 7), - txBuilder(1, time_now + 8), - txBuilder(1, time_now + 9), - txBuilder(1, time_now + 10), - txBuilder(1, time_now + 11)), - 0, - makeKey()); - - ASSERT_TRUE(mst_processor->propagateBatch(batch1)); - ASSERT_FALSE(mst_processor->propagateBatch(batch2)); -} diff --git a/test/module/irohad/multi_sig_transactions/mst_to_psc_propagation_test.cpp b/test/module/irohad/multi_sig_transactions/mst_to_psc_propagation_test.cpp new file mode 100644 index 0000000000..6949ecf3b4 --- /dev/null +++ b/test/module/irohad/multi_sig_transactions/mst_to_psc_propagation_test.cpp @@ -0,0 +1,105 @@ +/** + * Copyright Soramitsu Co., Ltd. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +#include "multi_sig_transactions/propagation_to_pcs.hpp" + +#include +#include +#include "framework/test_logger.hpp" +#include "module/irohad/multi_sig_transactions/mst_mocks.hpp" +#include "module/irohad/multi_sig_transactions/mst_test_helpers.hpp" +#include "module/irohad/network/network_mocks.hpp" +#include "storage_shared_limit/storage_limit_none.hpp" + +using namespace iroha; +using namespace testing; + +class MstToPcsPropagationTest : public ::testing::Test { + protected: + std::shared_ptr makeTx() { + return clone(txBuilder(tx_counter_++).build()); + } + + std::shared_ptr makeBatch( + size_t num_txs) { + shared_model::interface::types::SharedTxsCollectionType txs; + std::generate_n( + std::back_inserter(txs), num_txs, [this]() { return this->makeTx(); }); + return createMockBatchWithTransactions(txs, {}); + } + + rxcpp::subjects::subject propagation_available_subject_; + std::shared_ptr pcs_{ + std::make_shared()}; + MstToPcsPropagation propagator{ + pcs_, + std::make_shared>(), + propagation_available_subject_.get_observable(), + getTestLogger("MstToPcsPropagation")}; + + private: + size_t tx_counter_{0}; +}; + +/** + * @given a batch and a PCS that can accept it + * @when propagator receives the batch + * @then propagator propagates the batch to PCS immediately + */ +TEST_F(MstToPcsPropagationTest, PropagateImmediately) { + const auto batch = makeBatch(1); + EXPECT_CALL(*pcs_, propagate_batch(batch)).WillOnce(Return(true)); + + propagator.notifyCompletedBatch(std::make_shared(batch)); + EXPECT_EQ(propagator.pendingBatchesQuantity(), 0) + << "Batch must have been propagated immediately."; +} + +/** + * @given several batches and a PCS that is full at first, but then notifies of + * some available space + * @when propagator receives the batches + * @then propagator propagates the batches to PCS respecting the availability + */ +TEST_F(MstToPcsPropagationTest, PropagateWhenAvailabilityNotified) { + std::vector batches{{makeBatch(100), + makeBatch(5), + makeBatch(80), + makeBatch(25), + makeBatch(40), + makeBatch(10)}}; + + // feed the batches to propagator + for (auto batch : batches) { + EXPECT_CALL(*pcs_, propagate_batch(batch)).WillOnce(Return(false)); + propagator.notifyCompletedBatch(std::make_shared(batch)); + } + + // notifies propagator of available transactions for propagation to PCS, sets + // expectations on PCS and removes batches that must have been propagated + auto notify_available_and_check_propagation = + [&batches, this](const size_t available_txs) { + size_t available_after_propagation = available_txs; + for (auto it = batches.begin(); it != batches.end();) { + const size_t txs_in_batch = boost::size((*it)->transactions()); + if (txs_in_batch <= available_after_propagation) { + EXPECT_CALL(*pcs_, propagate_batch(*it)).WillOnce(Return(true)); + available_after_propagation -= txs_in_batch; + it = batches.erase(it); + } else { + EXPECT_CALL(*pcs_, propagate_batch(*it)).Times(0); + ++it; + } + } + propagation_available_subject_.get_subscriber().on_next(available_txs); + EXPECT_EQ(propagator.pendingBatchesQuantity(), batches.size()); + }; + + notify_available_and_check_propagation(50); + notify_available_and_check_propagation(90); + notify_available_and_check_propagation(50); + notify_available_and_check_propagation(50); + notify_available_and_check_propagation(100); +} diff --git a/test/module/irohad/multi_sig_transactions/state_test.cpp b/test/module/irohad/multi_sig_transactions/state_test.cpp index 96092a1383..4bffe1505f 100644 --- a/test/module/irohad/multi_sig_transactions/state_test.cpp +++ b/test/module/irohad/multi_sig_transactions/state_test.cpp @@ -8,15 +8,18 @@ #include "logger/logger.hpp" #include "module/irohad/multi_sig_transactions/mst_test_helpers.hpp" #include "multi_sig_transactions/state/mst_state.hpp" +#include "storage_shared_limit/batch_storage_limit_by_txs.hpp" +#include "storage_shared_limit/storage_limit_none.hpp" using namespace std; using namespace iroha; using namespace iroha::model; +using StorageLimitDummy = StorageLimitNone; + auto mst_state_log_ = getTestLogger("MstState"); auto log_ = getTestLogger("MstStateTest"); auto completer_ = std::make_shared(); -constexpr size_t kMstStateTxLimit{10}; /** * @given empty state @@ -24,7 +27,8 @@ constexpr size_t kMstStateTxLimit{10}; * @then checks that state contains the inserted batch */ TEST(StateTest, CreateState) { - auto state = MstState::empty(completer_, kMstStateTxLimit, mst_state_log_); + auto state = MstState::empty( + completer_, std::make_shared(), mst_state_log_); ASSERT_EQ(0, state.batchesQuantity()); auto tx = addSignatures( makeTestBatch(txBuilder(1)), 0, makeSignature("1", "pub_key_1")); @@ -39,7 +43,8 @@ TEST(StateTest, CreateState) { * @then checks that signatures are merged into the state */ TEST(StateTest, UpdateExistingState) { - auto state = MstState::empty(completer_, kMstStateTxLimit, mst_state_log_); + auto state = MstState::empty( + completer_, std::make_shared(), mst_state_log_); auto time = iroha::time::now(); auto first_signature = makeSignature("1", "pub_key_1"); auto second_signature = makeSignature("2", "pub_key_2"); @@ -62,7 +67,8 @@ TEST(StateTest, UpdateExistingState) { * @then "contains" method shows presence of the batch */ TEST(StateTest, ContainsMethodFindsInsertedBatch) { - auto state = MstState::empty(completer_, kMstStateTxLimit, mst_state_log_); + auto state = MstState::empty( + completer_, std::make_shared(), mst_state_log_); auto first_signature = makeSignature("1", "pub_key_1"); auto batch = makeTestBatch(txBuilder(1, iroha::time::now())); auto tx = addSignatures(batch, 0, first_signature); @@ -77,7 +83,8 @@ TEST(StateTest, ContainsMethodFindsInsertedBatch) { * @then "contains" method shows absence of the batch */ TEST(StateTest, ContainsMethodDoesNotFindNonInsertedBatch) { - auto state = MstState::empty(completer_, kMstStateTxLimit, mst_state_log_); + auto state = MstState::empty( + completer_, std::make_shared(), mst_state_log_); auto batch = makeTestBatch(txBuilder(1, iroha::time::now())); EXPECT_FALSE(state.contains(batch)); } @@ -90,7 +97,8 @@ TEST(StateTest, ContainsMethodDoesNotFindNonInsertedBatch) { TEST(StateTest, UpdateStateWhenTransactionsSame) { log_->info("Create empty state => insert two equal transaction"); - auto state = MstState::empty(completer_, kMstStateTxLimit, mst_state_log_); + auto state = MstState::empty( + completer_, std::make_shared(), mst_state_log_); auto time = iroha::time::now(); state += addSignatures( makeTestBatch(txBuilder(1, time)), 0, makeSignature("1", "1")); @@ -116,7 +124,8 @@ TEST(StateTest, UpdateStateWhenTransactionsSame) { TEST(StateTest, DifferentSignaturesUnionTest) { log_->info("Create two states => merge them"); - auto state1 = MstState::empty(completer_, kMstStateTxLimit, mst_state_log_); + auto state1 = MstState::empty( + completer_, std::make_shared(), mst_state_log_); state1 += addSignatures(makeTestBatch(txBuilder(1)), 0, makeSignature("1", "1")); @@ -127,16 +136,14 @@ TEST(StateTest, DifferentSignaturesUnionTest) { ASSERT_EQ(3, state1.batchesQuantity()); - auto state2 = MstState::empty(completer_, kMstStateTxLimit, mst_state_log_); + auto state2 = MstState::empty( + completer_, std::make_shared(), mst_state_log_); state2 += addSignatures(makeTestBatch(txBuilder(4)), 0, makeSignature("4", "4")); state2 += addSignatures(makeTestBatch(txBuilder(5)), 0, makeSignature("5", "5")); ASSERT_EQ(2, state2.batchesQuantity()); - ASSERT_LE(state1.transactionsQuantity() + state2.transactionsQuantity(), - kMstStateTxLimit) - << "Bad test!"; state1 += state2; ASSERT_EQ(5, state1.batchesQuantity()); } @@ -155,8 +162,10 @@ TEST(StateTest, UnionStateWhenSameTransactionHaveDifferentSignatures) { auto time = iroha::time::now(); - auto state1 = MstState::empty(completer_, kMstStateTxLimit, mst_state_log_); - auto state2 = MstState::empty(completer_, kMstStateTxLimit, mst_state_log_); + auto state1 = MstState::empty( + completer_, std::make_shared(), mst_state_log_); + auto state2 = MstState::empty( + completer_, std::make_shared(), mst_state_log_); state1 += addSignatures( makeTestBatch(txBuilder(1, time)), 0, makeSignature("1", "1")); state2 += addSignatures( @@ -183,7 +192,8 @@ TEST(StateTest, UnionStateWhenSameTransactionHaveDifferentSignatures) { TEST(StateTest, UnionStateWhenTransactionsSame) { auto time = iroha::time::now(); - auto state1 = MstState::empty(completer_, kMstStateTxLimit, mst_state_log_); + auto state1 = MstState::empty( + completer_, std::make_shared(), mst_state_log_); state1 += addSignatures( makeTestBatch(txBuilder(1, time)), 0, makeSignature("1", "1")); state1 += addSignatures( @@ -191,7 +201,8 @@ TEST(StateTest, UnionStateWhenTransactionsSame) { ASSERT_EQ(2, state1.batchesQuantity()); - auto state2 = MstState::empty(completer_, kMstStateTxLimit, mst_state_log_); + auto state2 = MstState::empty( + completer_, std::make_shared(), mst_state_log_); state2 += addSignatures( makeTestBatch(txBuilder(1, time)), 0, makeSignature("1", "1")); state2 += addSignatures( @@ -219,11 +230,13 @@ TEST(StateTest, DifferenceTest) { auto common_batch = makeTestBatch(txBuilder(1, time)); auto another_batch = makeTestBatch(txBuilder(3)); - auto state1 = MstState::empty(completer_, kMstStateTxLimit, mst_state_log_); + auto state1 = MstState::empty( + completer_, std::make_shared(), mst_state_log_); state1 += addSignatures(common_batch, 0, first_signature); state1 += addSignatures(common_batch, 0, second_signature); - auto state2 = MstState::empty(completer_, kMstStateTxLimit, mst_state_log_); + auto state2 = MstState::empty( + completer_, std::make_shared(), mst_state_log_); state2 += addSignatures(common_batch, 0, second_signature); state2 += addSignatures(common_batch, 0, third_signature); state2 += addSignatures(another_batch, 0, another_signature); @@ -245,22 +258,24 @@ TEST(StateTest, UpdateTxUntillQuorum) { auto quorum = 3u; auto time = iroha::time::now(); - auto state = MstState::empty(completer_, kMstStateTxLimit, mst_state_log_); + auto state = MstState::empty( + completer_, std::make_shared(), mst_state_log_); auto state_after_one_tx = state += addSignatures( makeTestBatch(txBuilder(1, time, quorum)), 0, makeSignature("1", "1")); ASSERT_EQ(1, state_after_one_tx.updated_state_->batchesQuantity()); - ASSERT_EQ(0, state_after_one_tx.completed_state_->batchesQuantity()); + ASSERT_EQ(0, state_after_one_tx.completed_state_.size()); auto state_after_two_txes = state += addSignatures( makeTestBatch(txBuilder(1, time, quorum)), 0, makeSignature("2", "2")); ASSERT_EQ(1, state_after_two_txes.updated_state_->batchesQuantity()); - ASSERT_EQ(0, state_after_two_txes.completed_state_->batchesQuantity()); + ASSERT_EQ(0, state_after_two_txes.completed_state_.size()); auto state_after_three_txes = state += addSignatures( makeTestBatch(txBuilder(1, time, quorum)), 0, makeSignature("3", "3")); ASSERT_EQ(0, state_after_three_txes.updated_state_->batchesQuantity()); - ASSERT_EQ(1, state_after_three_txes.completed_state_->getBatches().size()); - ASSERT_TRUE((*state_after_three_txes.completed_state_->getBatches().begin()) + ASSERT_EQ(1, state_after_three_txes.completed_state_.size()); + ASSERT_TRUE(state_after_three_txes.completed_state_.front() + ->get() ->hasAllSignatures()); ASSERT_EQ(0, state.batchesQuantity()); } @@ -275,7 +290,8 @@ TEST(StateTest, UpdateStateWithNewStateUntilQuorum) { auto keypair = makeKey(); auto time = iroha::time::now(); - auto state1 = MstState::empty(completer_, kMstStateTxLimit, mst_state_log_); + auto state1 = MstState::empty( + completer_, std::make_shared(), mst_state_log_); state1 += addSignatures(makeTestBatch(txBuilder(1, time, quorum)), 0, makeSignature("1_1", "1_1")); @@ -285,7 +301,8 @@ TEST(StateTest, UpdateStateWithNewStateUntilQuorum) { makeTestBatch(txBuilder(2, time)), 0, makeSignature("3", "3")); ASSERT_EQ(2, state1.batchesQuantity()); - auto state2 = MstState::empty(completer_, kMstStateTxLimit, mst_state_log_); + auto state2 = MstState::empty( + completer_, std::make_shared(), mst_state_log_); state2 += addSignatures(makeTestBatch(txBuilder(1, time, quorum)), 0, makeSignature("1_2", "1_2")); @@ -295,7 +312,7 @@ TEST(StateTest, UpdateStateWithNewStateUntilQuorum) { ASSERT_EQ(1, state2.batchesQuantity()); auto final_state = state1 += state2; - ASSERT_EQ(1, final_state.completed_state_->batchesQuantity()); + ASSERT_EQ(1, final_state.completed_state_.size()); ASSERT_EQ(1, state1.batchesQuantity()); } @@ -313,7 +330,8 @@ TEST(StateTest, TimeIndexInsertionByTx) { 0, makeSignature("1_1", "1_1")); - auto state = MstState::empty(completer_, kMstStateTxLimit, mst_state_log_); + auto state = MstState::empty( + completer_, std::make_shared(), mst_state_log_); state += prepared_batch; auto expired_state = state.extractExpired(time + 1); @@ -332,7 +350,8 @@ TEST(StateTest, TimeIndexInsertionByAddState) { auto quorum = 3u; auto time = iroha::time::now(); - auto state1 = MstState::empty(completer_, kMstStateTxLimit, mst_state_log_); + auto state1 = MstState::empty( + completer_, std::make_shared(), mst_state_log_); state1 += addSignatures(makeTestBatch(txBuilder(1, time, quorum)), 0, makeSignature("1_1", "1_1")); @@ -340,14 +359,15 @@ TEST(StateTest, TimeIndexInsertionByAddState) { 0, makeSignature("1_2", "1_2")); - auto state2 = MstState::empty(completer_, kMstStateTxLimit, mst_state_log_); + auto state2 = MstState::empty( + completer_, std::make_shared(), mst_state_log_); state2 += addSignatures( makeTestBatch(txBuilder(2, time)), 0, makeSignature("2", "2")); state2 += addSignatures( makeTestBatch(txBuilder(3, time)), 0, makeSignature("3", "3")); auto final_state = state1 += state2; - ASSERT_EQ(0, final_state.completed_state_->batchesQuantity()); + ASSERT_EQ(0, final_state.completed_state_.size()); ASSERT_EQ(2, final_state.updated_state_->batchesQuantity()); } @@ -361,13 +381,15 @@ TEST(StateTest, TimeIndexInsertionByAddState) { TEST(StateTest, RemovingTestWhenByTimeHasExpired) { auto time = iroha::time::now(); - auto state1 = MstState::empty(completer_, kMstStateTxLimit, mst_state_log_); + auto state1 = MstState::empty( + completer_, std::make_shared(), mst_state_log_); state1 += addSignatures( makeTestBatch(txBuilder(1, time)), 0, makeSignature("2", "2")); state1 += addSignatures( makeTestBatch(txBuilder(2, time)), 0, makeSignature("2", "2")); - auto state2 = MstState::empty(completer_, kMstStateTxLimit, mst_state_log_); + auto state2 = MstState::empty( + completer_, std::make_shared(), mst_state_log_); auto diff_state = state1 - state2; ASSERT_EQ(2, diff_state.batchesQuantity()); @@ -380,27 +402,29 @@ TEST(StateTest, RemovingTestWhenByTimeHasExpired) { * @then the transaction limit is not exceeded */ TEST(StateTest, TxsLimit) { - auto state = MstState::empty(completer_, kMstStateTxLimit, mst_state_log_); - - auto gen_additional_state = - [](size_t first_tx_num, auto... tx_builders) { - auto sign_batch = [first_tx_num](auto &&batch) { - const auto n_txs = boost::size(batch->transactions()); - for (size_t tx_num = 0; tx_num < n_txs; ++tx_num) { - std::string key_str = std::to_string(first_tx_num + tx_num); - auto sig = makeSignature(key_str, key_str); - batch->addSignature(tx_num, sig.first, sig.second); - } - return std::move(batch); - }; - - auto additional_state = - MstState::empty(completer_, sizeof...(tx_builders), mst_state_log_); - additional_state += sign_batch(makeTestBatch(tx_builders...)); - EXPECT_EQ(additional_state.transactionsQuantity(), - sizeof...(tx_builders)); - return additional_state; - }; + const size_t kMstStateTxLimit{10}; + auto state = MstState::empty( + completer_, + std::make_shared(kMstStateTxLimit), + mst_state_log_); + + auto gen_additional_state = [](size_t first_tx_num, auto... tx_builders) { + auto sign_batch = [first_tx_num](auto &&batch) { + const auto n_txs = boost::size(batch->transactions()); + for (size_t tx_num = 0; tx_num < n_txs; ++tx_num) { + std::string key_str = std::to_string(first_tx_num + tx_num); + auto sig = makeSignature(key_str, key_str); + batch->addSignature(tx_num, sig.first, sig.second); + } + return std::move(batch); + }; + + auto additional_state = MstState::empty( + completer_, std::make_shared(), mst_state_log_); + additional_state += sign_batch(makeTestBatch(tx_builders...)); + EXPECT_EQ(additional_state.transactionsQuantity(), sizeof...(tx_builders)); + return additional_state; + }; auto try_insert = [&](auto... tx_builders) { const auto current_size = state.transactionsQuantity(); @@ -412,7 +436,7 @@ TEST(StateTest, TxsLimit) { ASSERT_EQ(state.transactionsQuantity(), expected_size); }; - auto next_tx_builder = []{ + auto next_tx_builder = [] { static size_t counter = 0; return txBuilder(counter++); }; diff --git a/test/module/irohad/multi_sig_transactions/storage_test.cpp b/test/module/irohad/multi_sig_transactions/storage_test.cpp index f954baa666..f1b204ff58 100644 --- a/test/module/irohad/multi_sig_transactions/storage_test.cpp +++ b/test/module/irohad/multi_sig_transactions/storage_test.cpp @@ -9,11 +9,13 @@ #include "logger/logger.hpp" #include "module/irohad/multi_sig_transactions/mst_test_helpers.hpp" #include "multi_sig_transactions/storage/mst_storage_impl.hpp" +#include "storage_shared_limit/storage_limit_none.hpp" using namespace iroha; +using StorageLimitDummy = iroha::StorageLimitNone; + auto log_ = getTestLogger("MstStorageTest"); -constexpr size_t kMstStateTxLimit{10}; class StorageTest : public testing::Test { public: @@ -21,11 +23,11 @@ class StorageTest : public testing::Test { void SetUp() override { completer_ = std::make_shared(); - storage = - std::make_shared(completer_, - kMstStateTxLimit, - getTestLogger("MstState"), - getTestLogger("MstStorage")); + storage = std::make_shared( + completer_, + std::make_shared(), + getTestLogger("MstState"), + getTestLogger("MstStorage")); fillOwnState(); } @@ -49,8 +51,9 @@ TEST_F(StorageTest, StorageWhenApplyOtherState) { "create state with default peers and other state => " "apply state"); - auto new_state = - MstState::empty(completer_, kMstStateTxLimit, getTestLogger("MstState")); + auto new_state = MstState::empty(completer_, + std::make_shared(), + getTestLogger("MstState")); new_state += makeTestBatch(txBuilder(5, creation_time)); new_state += makeTestBatch(txBuilder(6, creation_time)); new_state += makeTestBatch(txBuilder(7, creation_time)); diff --git a/test/module/irohad/multi_sig_transactions/transport_test.cpp b/test/module/irohad/multi_sig_transactions/transport_test.cpp index d7f0e51ab2..5d2b74e40c 100644 --- a/test/module/irohad/multi_sig_transactions/transport_test.cpp +++ b/test/module/irohad/multi_sig_transactions/transport_test.cpp @@ -28,6 +28,8 @@ using ::testing::_; using ::testing::A; using ::testing::Invoke; +using StorageLimitDummy = iroha::StorageLimitNone; + class TransportTest : public ::testing::Test { public: TransportTest() @@ -51,7 +53,6 @@ class TransportTest : public ::testing::Test { std::shared_ptr completer_; std::shared_ptr mst_notification_transport_; - size_t mst_state_txs_limit_{10}; }; static bool statesEqual(const iroha::MstState &a, const iroha::MstState &b) { @@ -102,7 +103,7 @@ TEST_F(TransportTest, SendAndReceive) { std::move(batch_factory_), std::move(tx_presence_cache_), completer_, - mst_state_txs_limit_, + std::make_shared(), my_key_.publicKey(), getTestLogger("MstState"), getTestLogger("MstTransportGrpc")); @@ -112,8 +113,9 @@ TEST_F(TransportTest, SendAndReceive) { std::condition_variable cv; auto time = iroha::time::now(); - auto state = iroha::MstState::empty( - completer_, mst_state_txs_limit_, getTestLogger("MstState")); + auto state = iroha::MstState::empty(completer_, + std::make_shared(), + getTestLogger("MstState")); state += addSignaturesFromKeyPairs( makeTestBatch(txBuilder(1, time)), 0, makeKey()); state += addSignaturesFromKeyPairs( @@ -144,7 +146,7 @@ TEST_F(TransportTest, SendAndReceive) { std::shared_ptr peer = makePeer(address, pk); // we want to ensure that server side will call onNewState() // with same parameters as on the client side - EXPECT_CALL(*mst_notification_transport_, onNewState(_, _)) + EXPECT_CALL(*mst_notification_transport_, onNewStateMock(_, _)) .WillOnce(Invoke( [this, &cv, &state](const auto &from_key, auto const &target_state) { EXPECT_EQ(this->my_key_.publicKey(), from_key); @@ -192,7 +194,7 @@ TEST_F(TransportTest, ReplayAttack) { std::move(batch_factory_), tx_presence_cache_, completer_, - mst_state_txs_limit_, + std::make_shared(), my_key_.publicKey(), getTestLogger("MstState"), getTestLogger("MstTransportGrpc")); @@ -200,12 +202,13 @@ TEST_F(TransportTest, ReplayAttack) { transport->subscribe(mst_notification_transport_); auto batch = makeTestBatch(txBuilder(1), txBuilder(2)); - auto state = iroha::MstState::empty( - completer_, mst_state_txs_limit_, getTestLogger("MstState")); + auto state = iroha::MstState::empty(completer_, + std::make_shared(), + getTestLogger("MstState")); state += addSignaturesFromKeyPairs( addSignaturesFromKeyPairs(batch, 0, makeKey()), 1, makeKey()); - EXPECT_CALL(*mst_notification_transport_, onNewState(_, _)) + EXPECT_CALL(*mst_notification_transport_, onNewStateMock(_, _)) .Times(1) // an empty state should not be propagated .WillOnce( Invoke([&batch](::testing::Unused, const iroha::MstState &state) { diff --git a/test/module/irohad/network/network_mocks.hpp b/test/module/irohad/network/network_mocks.hpp index cc5b77931a..76c903be7b 100644 --- a/test/module/irohad/network/network_mocks.hpp +++ b/test/module/irohad/network/network_mocks.hpp @@ -72,6 +72,8 @@ namespace iroha { MOCK_METHOD0(onProposal, rxcpp::observable()); MOCK_METHOD1(setPcs, void(const PeerCommunicationService &)); + + MOCK_METHOD0(onReadyToAcceptTxs, rxcpp::observable()); }; class MockConsensusGate : public ConsensusGate { diff --git a/test/module/irohad/ordering/ordering_mocks.hpp b/test/module/irohad/ordering/ordering_mocks.hpp index 034eff7260..12be33a46c 100644 --- a/test/module/irohad/ordering/ordering_mocks.hpp +++ b/test/module/irohad/ordering/ordering_mocks.hpp @@ -33,6 +33,7 @@ namespace iroha { MOCK_METHOD1(remove, void(const HashesSetType &)); MOCK_CONST_METHOD0(front, const BatchesSetType &()); MOCK_CONST_METHOD0(back, const BatchesSetType &()); + MOCK_CONST_METHOD0(availableTxsCapacity, size_t()); }; } // namespace cache diff --git a/test/module/irohad/pending_txs_storage/pending_txs_storage_test.cpp b/test/module/irohad/pending_txs_storage/pending_txs_storage_test.cpp index 242b5f042c..ef09b6348a 100644 --- a/test/module/irohad/pending_txs_storage/pending_txs_storage_test.cpp +++ b/test/module/irohad/pending_txs_storage/pending_txs_storage_test.cpp @@ -7,9 +7,15 @@ #include #include "datetime/time.hpp" #include "framework/test_logger.hpp" +#include "module/irohad/multi_sig_transactions/mst_mocks.hpp" #include "module/irohad/multi_sig_transactions/mst_test_helpers.hpp" #include "multi_sig_transactions/state/mst_state.hpp" #include "pending_txs_storage/impl/pending_txs_storage_impl.hpp" +#include "storage_shared_limit/storage_limit_none.hpp" + +using namespace iroha; + +using StorageLimitDummy = iroha::StorageLimitNone; class PendingTxsStorageFixture : public ::testing::Test { public: @@ -33,7 +39,11 @@ class PendingTxsStorageFixture : public ::testing::Test { std::shared_ptr completer_ = std::make_shared(std::chrono::minutes(0)); - size_t mst_state_txs_limit_{10}; + + rxcpp::observable> dummy_completed = + rxcpp::observable<>::empty>(); + rxcpp::observable> dummy_expired = + rxcpp::observable<>::empty>(); logger::LoggerPtr mst_state_log_{getTestLogger("MstState")}; logger::LoggerPtr log_{getTestLogger("PendingTxsStorageFixture")}; @@ -47,8 +57,8 @@ class PendingTxsStorageFixture : public ::testing::Test { * @then the transactions can be added to MST state successfully */ TEST_F(PendingTxsStorageFixture, FixutureSelfCheck) { - auto state = std::make_shared( - iroha::MstState::empty(completer_, mst_state_txs_limit_, mst_state_log_)); + auto state = std::make_shared(iroha::MstState::empty( + completer_, std::make_shared(), mst_state_log_)); auto transactions = addSignatures(makeTestBatch(txBuilder(1, getUniqueTime()), txBuilder(1, getUniqueTime())), @@ -68,8 +78,8 @@ TEST_F(PendingTxsStorageFixture, FixutureSelfCheck) { * @then list of pending transactions can be received for all batch creators */ TEST_F(PendingTxsStorageFixture, InsertionTest) { - auto state = std::make_shared( - iroha::MstState::empty(completer_, mst_state_txs_limit_, mst_state_log_)); + auto state = std::make_shared(iroha::MstState::empty( + completer_, std::make_shared(), mst_state_log_)); auto transactions = addSignatures( makeTestBatch(txBuilder(2, getUniqueTime(), 2, "alice@iroha"), txBuilder(2, getUniqueTime(), 2, "bob@iroha")), @@ -77,14 +87,15 @@ TEST_F(PendingTxsStorageFixture, InsertionTest) { makeSignature("1", "pub_key_1")); *state += transactions; - auto updates = rxcpp::observable<>::create([&state](auto s) { - s.on_next(state); - s.on_completed(); - }); - auto dummy = rxcpp::observable<>::create>( - [](auto s) { s.on_completed(); }); + auto updates = rxcpp::observable<>::create>( + [&state](auto s) { + s.on_next(state); + s.on_completed(); + }); + auto completed = rxcpp::observable<>::empty>(); + auto expired = rxcpp::observable<>::empty>(); - iroha::PendingTransactionStorageImpl storage(updates, dummy, dummy); + iroha::PendingTransactionStorageImpl storage(updates, completed, expired); for (const auto &creator : {"alice@iroha", "bob@iroha"}) { auto pending = storage.getPendingTransactions(creator); ASSERT_EQ(pending.size(), 2) @@ -107,10 +118,10 @@ TEST_F(PendingTxsStorageFixture, InsertionTest) { * @then pending transactions response is also updated */ TEST_F(PendingTxsStorageFixture, SignaturesUpdate) { - auto state1 = std::make_shared( - iroha::MstState::empty(completer_, mst_state_txs_limit_, mst_state_log_)); - auto state2 = std::make_shared( - iroha::MstState::empty(completer_, mst_state_txs_limit_, mst_state_log_)); + auto state1 = std::make_shared(iroha::MstState::empty( + completer_, std::make_shared(), mst_state_log_)); + auto state2 = std::make_shared(iroha::MstState::empty( + completer_, std::make_shared(), mst_state_log_)); auto transactions = addSignatures( makeTestBatch(txBuilder(3, getUniqueTime(), 3, "alice@iroha")), 0, @@ -120,16 +131,15 @@ TEST_F(PendingTxsStorageFixture, SignaturesUpdate) { addSignatures(transactions, 0, makeSignature("2", "pub_key_2")); *state2 += transactions; - auto updates = - rxcpp::observable<>::create([&state1, &state2](auto s) { + auto updates = rxcpp::observable<>::create>( + [&state1, &state2](auto s) { s.on_next(state1); s.on_next(state2); s.on_completed(); }); - auto dummy = rxcpp::observable<>::create>( - [](auto s) { s.on_completed(); }); - iroha::PendingTransactionStorageImpl storage(updates, dummy, dummy); + iroha::PendingTransactionStorageImpl storage( + updates, dummy_completed, dummy_expired); auto pending = storage.getPendingTransactions("alice@iroha"); ASSERT_EQ(pending.size(), 1); ASSERT_EQ(boost::size(pending.front()->signatures()), 2); @@ -142,8 +152,8 @@ TEST_F(PendingTxsStorageFixture, SignaturesUpdate) { * @then users receives correct responses */ TEST_F(PendingTxsStorageFixture, SeveralBatches) { - auto state = std::make_shared( - iroha::MstState::empty(completer_, mst_state_txs_limit_, mst_state_log_)); + auto state = std::make_shared(iroha::MstState::empty( + completer_, std::make_shared(), mst_state_log_)); auto batch1 = addSignatures( makeTestBatch(txBuilder(2, getUniqueTime(), 2, "alice@iroha"), txBuilder(2, getUniqueTime(), 2, "bob@iroha")), @@ -162,14 +172,14 @@ TEST_F(PendingTxsStorageFixture, SeveralBatches) { *state += batch2; *state += batch3; - auto updates = rxcpp::observable<>::create([&state](auto s) { - s.on_next(state); - s.on_completed(); - }); - auto dummy = rxcpp::observable<>::create>( - [](auto s) { s.on_completed(); }); + auto updates = rxcpp::observable<>::create>( + [&state](auto s) { + s.on_next(state); + s.on_completed(); + }); - iroha::PendingTransactionStorageImpl storage(updates, dummy, dummy); + iroha::PendingTransactionStorageImpl storage( + updates, dummy_completed, dummy_expired); auto alice_pending = storage.getPendingTransactions("alice@iroha"); ASSERT_EQ(alice_pending.size(), 4); @@ -184,16 +194,16 @@ TEST_F(PendingTxsStorageFixture, SeveralBatches) { * @then updates don't overwrite the whole storage state */ TEST_F(PendingTxsStorageFixture, SeparateBatchesDoNotOverwriteStorage) { - auto state1 = std::make_shared( - iroha::MstState::empty(completer_, mst_state_txs_limit_, mst_state_log_)); + auto state1 = std::make_shared(iroha::MstState::empty( + completer_, std::make_shared(), mst_state_log_)); auto batch1 = addSignatures( makeTestBatch(txBuilder(2, getUniqueTime(), 2, "alice@iroha"), txBuilder(2, getUniqueTime(), 2, "bob@iroha")), 0, makeSignature("1", "pub_key_1")); *state1 += batch1; - auto state2 = std::make_shared( - iroha::MstState::empty(completer_, mst_state_txs_limit_, mst_state_log_)); + auto state2 = std::make_shared(iroha::MstState::empty( + completer_, std::make_shared(), mst_state_log_)); auto batch2 = addSignatures( makeTestBatch(txBuilder(2, getUniqueTime(), 2, "alice@iroha"), txBuilder(3, getUniqueTime(), 3, "alice@iroha")), @@ -201,16 +211,15 @@ TEST_F(PendingTxsStorageFixture, SeparateBatchesDoNotOverwriteStorage) { makeSignature("1", "pub_key_1")); *state2 += batch2; - auto updates = - rxcpp::observable<>::create([&state1, &state2](auto s) { + auto updates = rxcpp::observable<>::create>( + [&state1, &state2](auto s) { s.on_next(state1); s.on_next(state2); s.on_completed(); }); - auto dummy = rxcpp::observable<>::create>( - [](auto s) { s.on_completed(); }); - iroha::PendingTransactionStorageImpl storage(updates, dummy, dummy); + iroha::PendingTransactionStorageImpl storage( + updates, dummy_completed, dummy_expired); auto alice_pending = storage.getPendingTransactions("alice@iroha"); ASSERT_EQ(alice_pending.size(), 4); @@ -226,8 +235,8 @@ TEST_F(PendingTxsStorageFixture, SeparateBatchesDoNotOverwriteStorage) { * @then storage removes the batch */ TEST_F(PendingTxsStorageFixture, PreparedBatch) { - auto state = std::make_shared( - iroha::MstState::empty(completer_, mst_state_txs_limit_, mst_state_log_)); + auto state = std::make_shared(iroha::MstState::empty( + completer_, std::make_shared(), mst_state_log_)); std::shared_ptr batch = addSignatures( makeTestBatch(txBuilder(3, getUniqueTime(), 3, "alice@iroha")), @@ -235,21 +244,22 @@ TEST_F(PendingTxsStorageFixture, PreparedBatch) { makeSignature("1", "pub_key_1")); *state += batch; - rxcpp::subjects::subject prepared_batches_subject; - auto updates = rxcpp::observable<>::create([&state](auto s) { - s.on_next(state); - s.on_completed(); - }); - auto dummy = rxcpp::observable<>::create>( - [](auto s) { s.on_completed(); }); + rxcpp::subjects::subject> + prepared_batches_subject; + auto updates = rxcpp::observable<>::create>( + [&state](auto s) { + s.on_next(state); + s.on_completed(); + }); iroha::PendingTransactionStorageImpl storage( - updates, prepared_batches_subject.get_observable(), dummy); + updates, prepared_batches_subject.get_observable(), dummy_expired); batch = addSignatures(batch, 0, makeSignature("2", "pub_key_2"), makeSignature("3", "pub_key_3")); - prepared_batches_subject.get_subscriber().on_next(batch); + prepared_batches_subject.get_subscriber().on_next( + std::make_shared(batch)); prepared_batches_subject.get_subscriber().on_completed(); auto pending = storage.getPendingTransactions("alice@iroha"); ASSERT_EQ(pending.size(), 0); @@ -263,7 +273,9 @@ TEST_F(PendingTxsStorageFixture, PreparedBatch) { */ TEST_F(PendingTxsStorageFixture, ExpiredBatch) { auto state = std::make_shared( - iroha::MstState::empty(completer_, mst_state_txs_limit_, mst_state_log_)); + iroha::MstState::empty(completer_, + std::make_shared(), + mst_state_log_)); std::shared_ptr batch = addSignatures( makeTestBatch(txBuilder(3, getUniqueTime(), 3, "alice@iroha")), @@ -272,14 +284,13 @@ TEST_F(PendingTxsStorageFixture, ExpiredBatch) { *state += batch; rxcpp::subjects::subject expired_batches_subject; - auto updates = rxcpp::observable<>::create([&state](auto s) { - s.on_next(state); - s.on_completed(); - }); - auto dummy = rxcpp::observable<>::create>( - [](auto s) { s.on_completed(); }); + auto updates = rxcpp::observable<>::create>( + [&state](auto s) { + s.on_next(state); + s.on_completed(); + }); iroha::PendingTransactionStorageImpl storage( - updates, dummy, expired_batches_subject.get_observable()); + updates, dummy_completed, expired_batches_subject.get_observable()); expired_batches_subject.get_subscriber().on_next(batch); expired_batches_subject.get_subscriber().on_completed(); diff --git a/test/module/irohad/torii/processor/transaction_processor_test.cpp b/test/module/irohad/torii/processor/transaction_processor_test.cpp index 00a1e0fc60..f22c94a28b 100644 --- a/test/module/irohad/torii/processor/transaction_processor_test.cpp +++ b/test/module/irohad/torii/processor/transaction_processor_test.cpp @@ -120,9 +120,10 @@ class TransactionProcessorTest : public ::testing::Test { } } - rxcpp::subjects::subject> + rxcpp::subjects::subject> mst_update_notifier; - rxcpp::subjects::subject mst_prepared_notifier; + rxcpp::subjects::subject> + mst_prepared_notifier; rxcpp::subjects::subject mst_expired_notifier; rxcpp::subjects::subject< std::shared_ptr> @@ -431,23 +432,6 @@ TEST_F(TransactionProcessorTest, MultisigTransactionToMst) { tp->batchHandle(std::move(after_mst)); } -/** - * @given batch one transaction with quorum 2 - * AND one signature - * @when MST emits the batch - * @then checks that PCS is invoked. - * This happens because tx processor is subscribed for MST - */ -TEST_F(TransactionProcessorTest, MultisigTransactionFromMst) { - auto &&tx = addSignaturesFromKeyPairs(baseTestTx(2), makeKey(), makeKey()); - - auto &&after_mst = framework::batch::createBatchFromSingleTransaction( - std::shared_ptr(clone(tx))); - - EXPECT_CALL(*pcs, propagate_batch(_)).Times(1); - mst_prepared_notifier.get_subscriber().on_next(after_mst); -} - /** * @given valid multisig tx * @when transaction_processor handle it