Skip to content
This repository has been archived by the owner on Apr 17, 2019. It is now read-only.

Feature/mst shared limit #2230

Open
wants to merge 5 commits into
base: feature/shared-storage-limit
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions irohad/main/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ target_link_libraries(application
torii_service
pending_txs_storage
common
mst_propagation_to_pcs
)

add_executable(irohad irohad.cpp)
Expand Down
20 changes: 18 additions & 2 deletions irohad/main/application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "multi_sig_transactions/mst_processor_impl.hpp"
#include "multi_sig_transactions/mst_propagation_strategy_stub.hpp"
#include "multi_sig_transactions/mst_time_provider_impl.hpp"
#include "multi_sig_transactions/propagation_to_pcs.hpp"
#include "multi_sig_transactions/storage/mst_storage_impl.hpp"
#include "multi_sig_transactions/transport/mst_transport_grpc.hpp"
#include "multi_sig_transactions/transport/mst_transport_stub.hpp"
Expand All @@ -39,6 +40,7 @@
#include "ordering/impl/on_demand_ordering_gate.hpp"
#include "pending_txs_storage/impl/pending_txs_storage_impl.hpp"
#include "simulator/impl/simulator.hpp"
#include "storage_shared_limit/batch_storage_limit_by_txs.hpp"
#include "synchronizer/impl/synchronizer_impl.hpp"
#include "torii/impl/command_service_impl.hpp"
#include "torii/impl/command_service_transport_grpc.hpp"
Expand Down Expand Up @@ -115,6 +117,7 @@ Irohad::Irohad(const std::string &block_store_dir,

Irohad::~Irohad() {
consensus_gate_events_subscription.unsubscribe();
mst_to_pcs_propagation_subscription.unsubscribe();
}

/**
Expand Down Expand Up @@ -522,9 +525,11 @@ void Irohad::initMstProcessor() {
log_manager_->getChild("MultiSignatureTransactions");
auto mst_state_logger = mst_logger_manager->getChild("State")->getLogger();
auto mst_completer = std::make_shared<DefaultCompleter>(mst_expiration_time_);
auto mst_storage_limit =
std::make_shared<iroha::BatchStorageLimitByTxs>(mst_state_txs_limit_);
auto mst_storage = std::make_shared<MstStorageStateImpl>(
mst_completer,
mst_state_txs_limit_,
mst_storage_limit,
mst_state_logger,
mst_logger_manager->getChild("Storage")->getLogger());
std::shared_ptr<iroha::PropagationStrategy> mst_propagation;
Expand All @@ -536,7 +541,7 @@ void Irohad::initMstProcessor() {
transaction_batch_factory_,
persistent_cache,
mst_completer,
mst_state_txs_limit_,
mst_storage_limit,
keypair.publicKey(),
std::move(mst_state_logger),
mst_logger_manager->getChild("Transport")->getLogger());
Expand All @@ -556,6 +561,17 @@ void Irohad::initMstProcessor() {
mst_logger_manager->getChild("Processor")->getLogger());
mst_processor = fair_mst_processor;
mst_transport->subscribe(fair_mst_processor);

mst_to_pcs_propagation_subscription =
mst_processor->onPreparedBatches().subscribe(
[propagator = std::make_shared<iroha::MstToPcsPropagation>(
pcs,
mst_storage_limit,
ordering_gate->onReadyToAcceptTxs(),
mst_logger_manager->getChild("PropagationToPcs")->getLogger())](
auto batch) {
propagator->notifyCompletedBatch(std::move(batch));
});
log_->info("[Init] => MST processor");
}

Expand Down
1 change: 1 addition & 0 deletions irohad/main/application.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,7 @@ class Irohad {
// mst
std::shared_ptr<iroha::network::MstTransport> mst_transport;
std::shared_ptr<iroha::MstProcessor> mst_processor;
rxcpp::composite_subscription mst_to_pcs_propagation_subscription;

// pending transactions storage
std::shared_ptr<iroha::PendingTransactionStorage> pending_txs_storage_;
Expand Down
9 changes: 9 additions & 0 deletions irohad/multi_sig_transactions/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,12 @@ add_library(mst_hash
target_link_libraries(mst_hash
shared_model_interfaces
)

add_library(mst_propagation_to_pcs
impl/propagation_to_pcs.cpp
)

target_link_libraries(mst_propagation_to_pcs
rxcpp
logger
)
7 changes: 4 additions & 3 deletions irohad/multi_sig_transactions/impl/mst_processor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,13 @@ namespace iroha {
return this->propagateBatchImpl(batch);
}

rxcpp::observable<std::shared_ptr<MstState>> MstProcessor::onStateUpdate()
const {
rxcpp::observable<std::shared_ptr<const MstState>>
MstProcessor::onStateUpdate() const {
return this->onStateUpdateImpl();
}

rxcpp::observable<DataType> MstProcessor::onPreparedBatches() const {
rxcpp::observable<std::shared_ptr<MovedBatch>>
MstProcessor::onPreparedBatches() const {
return this->onPreparedBatchesImpl();
}

Expand Down
30 changes: 15 additions & 15 deletions irohad/multi_sig_transactions/impl/mst_processor_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ namespace iroha {
auto FairMstProcessor::propagateBatchImpl(const iroha::DataType &batch)
-> decltype(propagateBatch(batch)) {
auto state_update = storage_->updateOwnState(batch);
completedBatchesNotify(*state_update.completed_state_);
updatedBatchesNotify(*state_update.updated_state_);
completedBatchesNotify(state_update.completed_state_);
updatedBatchesNotify(state_update.updated_state_);
expiredBatchesNotify(
storage_->extractExpiredTransactions(time_provider_->getCurrentTime()));
return state_update.updated_state_->contains(batch);
Expand All @@ -58,18 +58,18 @@ namespace iroha {
}

// TODO [IR-1687] Akvinikym 10.09.18: three methods below should be one
void FairMstProcessor::completedBatchesNotify(ConstRefState state) const {
if (not state.isEmpty()) {
state.iterateBatches([this](const auto &batch) {
batches_subject_.get_subscriber().on_next(batch);
});
}
void FairMstProcessor::completedBatchesNotify(
std::vector<std::shared_ptr<MovedBatch>> completed) const {
std::for_each(
completed.begin(), completed.end(), [this](const auto &batch) {
batches_subject_.get_subscriber().on_next(batch);
});
}

void FairMstProcessor::updatedBatchesNotify(ConstRefState state) const {
if (not state.isEmpty()) {
state_subject_.get_subscriber().on_next(
std::make_shared<MstState>(state));
void FairMstProcessor::updatedBatchesNotify(
std::shared_ptr<const MstState> state) const {
if (not state->isEmpty()) {
state_subject_.get_subscriber().on_next(state);
}
}

Expand All @@ -88,20 +88,20 @@ namespace iroha {
// -------------------| MstTransportNotification override |-------------------

void FairMstProcessor::onNewState(const shared_model::crypto::PublicKey &from,
ConstRefState new_state) {
MstState new_state) {
log_->info("Applying new state");
auto current_time = time_provider_->getCurrentTime();

auto state_update = storage_->apply(from, new_state);

// updated batches
updatedBatchesNotify(*state_update.updated_state_);
updatedBatchesNotify(state_update.updated_state_);
log_->info("New state has {} batches and {} transactions.",
state_update.updated_state_->batchesQuantity(),
state_update.updated_state_->transactionsQuantity());

// completed batches
completedBatchesNotify(*state_update.completed_state_);
completedBatchesNotify(state_update.completed_state_);

// expired batches
expiredBatchesNotify(storage_->getDiffState(from, current_time));
Expand Down
69 changes: 69 additions & 0 deletions irohad/multi_sig_transactions/impl/propagation_to_pcs.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/**
* Copyright Soramitsu Co., Ltd. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/

#include "multi_sig_transactions/propagation_to_pcs.hpp"

#include <utility>

#include "interfaces/iroha_internal/transaction_batch.hpp"
#include "logger/logger.hpp"

using namespace iroha;

MstToPcsPropagation::MstToPcsPropagation(
std::shared_ptr<iroha::network::PeerCommunicationService> pcs,
std::shared_ptr<StorageLimit<BatchPtr>> storage_limit,
rxcpp::observable<size_t> propagation_available,
logger::LoggerPtr log)
: log_(std::move(log)),
pcs_(pcs),
pending_batches_(std::move(storage_limit),
std::make_unique<InternalStorage>()),
propagation_available_subscription_(
propagation_available.subscribe([this, pcs](size_t available_txs) {
pending_batches_.extract(
[pcs, &available_txs](InternalStorage &storage) {
std::vector<BatchPtr> extracted;
extracted.reserve(storage.pending_batches.size());
auto it = storage.pending_batches.begin();
while (it != storage.pending_batches.end()) {
const auto txs_in_batch = (*it)->transactions().size();
if (txs_in_batch <= available_txs
and pcs->propagate_batch(*it)) {
available_txs -= txs_in_batch;
extracted.emplace_back(std::move(*it));
it = storage.pending_batches.erase(it);
} else {
++it;
}
}
return extracted;
});
})) {}

MstToPcsPropagation::~MstToPcsPropagation() {
propagation_available_subscription_.unsubscribe();
}

void MstToPcsPropagation::notifyCompletedBatch(
std::shared_ptr<MovedBatch> moved_batch) {
if (not pcs_->propagate_batch(moved_batch->get())) {
if (not pending_batches_.insert(std::move(moved_batch))) {
log_->critical(
"Dropped a completed MST batch because no place left in storage: {}",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Usually, space is used in that context (even if place is more technically correct because it is internals that are not an object of interest for the external interface user).

Suggested change
"Dropped a completed MST batch because no place left in storage: {}",
"Dropped a completed MST batch because no space left in storage: {}",

moved_batch->get());
assert(false);
}
}
}

size_t MstToPcsPropagation::pendingBatchesQuantity() const {
return pending_batches_.itemsQuantity();
}

bool MstToPcsPropagation::InternalStorage::insert(BatchPtr batch) {
pending_batches.emplace_back(std::move(batch));
return true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unconditional return type 🤔

}
8 changes: 6 additions & 2 deletions irohad/multi_sig_transactions/mst_processor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#include <memory>
#include <mutex>

#include <rxcpp/rx.hpp>
#include "logger/logger_fwd.hpp"
#include "multi_sig_transactions/mst_types.hpp"
Expand Down Expand Up @@ -41,19 +42,22 @@ namespace iroha {
/**
* Prove updating of state for handling status of signing
*/
rxcpp::observable<std::shared_ptr<MstState>> onStateUpdate() const;
rxcpp::observable<std::shared_ptr<const MstState>> onStateUpdate() const;

/**
* Observable emit batches which are prepared for further processing in
* system
*/
rxcpp::observable<DataType> onPreparedBatches() const;
rxcpp::observable<std::shared_ptr<MovedBatch>> onPreparedBatches() const;

/**
* Observable emit expired by time transactions
*/
rxcpp::observable<DataType> onExpiredBatches() const;

/// Get the next completed batch with at most max_txs transactions.
boost::optional<BatchPtr> getCompletedBatch(const size_t max_txs);

virtual ~MstProcessor() = default;

protected:
Expand Down
14 changes: 8 additions & 6 deletions irohad/multi_sig_transactions/mst_processor_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@
#ifndef IROHA_MST_PROCESSOR_IMPL_HPP
#define IROHA_MST_PROCESSOR_IMPL_HPP

#include "multi_sig_transactions/mst_processor.hpp"

#include <memory>

#include "cryptography/public_key.hpp"
#include "logger/logger_fwd.hpp"
#include "multi_sig_transactions/mst_processor.hpp"
#include "multi_sig_transactions/mst_propagation_strategy.hpp"
#include "multi_sig_transactions/mst_time_provider.hpp"
#include "multi_sig_transactions/storage/mst_storage.hpp"
Expand Down Expand Up @@ -56,7 +57,7 @@ namespace iroha {
// ------------------| MstTransportNotification override |------------------

void onNewState(const shared_model::crypto::PublicKey &from,
ConstRefState new_state) override;
MstState new_state) override;

// ----------------------------| end override |-----------------------------

Expand All @@ -74,14 +75,15 @@ namespace iroha {
* signatures and ready to move forward
* @param state with those batches
*/
void completedBatchesNotify(ConstRefState state) const;
void completedBatchesNotify(
std::vector<std::shared_ptr<MovedBatch>> completed) const;

/**
* Notify subscribers when some of the batches received new signatures, but
* still are not completed
* @param state with those batches
*/
void updatedBatchesNotify(ConstRefState state) const;
void updatedBatchesNotify(std::shared_ptr<const MstState> state) const;

/**
* Notify subscribers when some of the bathes get expired
Expand All @@ -98,10 +100,10 @@ namespace iroha {
// rx subjects

/// use for share new states from other peers
rxcpp::subjects::subject<std::shared_ptr<MstState>> state_subject_;
rxcpp::subjects::subject<std::shared_ptr<const MstState>> state_subject_;

/// use for share completed batches
rxcpp::subjects::subject<DataType> batches_subject_;
rxcpp::subjects::subject<std::shared_ptr<MovedBatch>> batches_subject_;

/// use for share expired batches
rxcpp::subjects::subject<DataType> expired_subject_;
Expand Down
16 changes: 2 additions & 14 deletions irohad/multi_sig_transactions/mst_types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <memory>

#include "interfaces/common_objects/types.hpp"
#include "storage_shared_limit/moved_item.hpp"

namespace shared_model {
namespace interface {
Expand All @@ -21,6 +22,7 @@ namespace shared_model {
namespace iroha {

using BatchPtr = std::shared_ptr<shared_model::interface::TransactionBatch>;
using MovedBatch = MovedItem<BatchPtr>;
using ConstPeer = const shared_model::interface::Peer;
using TimeType = shared_model::interface::types::TimestampType;
using TxResponse =
Expand All @@ -38,20 +40,6 @@ namespace iroha {
using ConstRefState = ConstRefT<MstState>;

using DataType = BatchPtr;

/**
* Contains result of updating local state:
* - state with completed batches
* - state with updated (still not enough signatures) batches
*/
struct StateUpdateResult {
StateUpdateResult(std::shared_ptr<MstState> completed_state,
std::shared_ptr<MstState> updated_state)
: completed_state_{std::move(completed_state)},
updated_state_{std::move(updated_state)} {}
std::shared_ptr<MstState> completed_state_;
std::shared_ptr<MstState> updated_state_;
};
} // namespace iroha

#endif // IROHA_MST_TYPES_HPP
Loading