Skip to content
This repository has been archived by the owner on Apr 17, 2019. It is now read-only.

Feature/storage reconnection #2229

Open
wants to merge 2 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions irohad/ametsuchi/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ add_library(ametsuchi
impl/in_memory_block_storage_factory.cpp
impl/flat_file_block_storage.cpp
impl/flat_file_block_storage_factory.cpp
reconnection/impl/storage_connection_wrapper.cpp
reconnection/impl/k_times_reconnection_strategy.cpp
)

target_link_libraries(ametsuchi
Expand All @@ -34,6 +36,7 @@ target_link_libraries(ametsuchi
shared_model_stateless_validation
SOCI::core
SOCI::postgresql
tbb
)

target_compile_definitions(ametsuchi
Expand Down
36 changes: 20 additions & 16 deletions irohad/ametsuchi/impl/storage_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,11 @@ namespace iroha {
if (block_is_prepared) {
rollbackPrepared(*sql);
}
auto block_result = getBlockQuery()->getTopBlock();
auto block_query = getBlockQuery();
if (not block_query) {
return expected::makeError("Can't create blockQuery");
}
auto block_result = (*block_query)->getTopBlock();
return expected::makeValue<std::unique_ptr<MutableStorage>>(
std::make_unique<MutableStorageImpl>(
block_result.match(
Expand All @@ -181,17 +185,13 @@ namespace iroha {
if (not wsv) {
return boost::none;
}
return boost::make_optional<std::shared_ptr<PeerQuery>>(
std::make_shared<PeerQueryWsv>(wsv));
auto peer_query = std::make_shared<PeerQueryWsv>(*wsv);
return boost::make_optional<std::shared_ptr<PeerQuery>>(peer_query);
}

boost::optional<std::shared_ptr<BlockQuery>> StorageImpl::createBlockQuery()
const {
auto block_query = getBlockQuery();
if (not block_query) {
return boost::none;
}
return boost::make_optional(block_query);
return getBlockQuery();
}

boost::optional<std::shared_ptr<QueryExecutor>>
Expand All @@ -215,7 +215,7 @@ namespace iroha {
log_manager_->getChild("QueryExecutor")));
}

bool StorageImpl::insertBlock(
ReturnWrapperType<bool> StorageImpl::insertBlock(
std::shared_ptr<const shared_model::interface::Block> block) {
log_->info("create mutable storage");
auto storageResult = createMutableStorage();
Expand All @@ -234,7 +234,7 @@ namespace iroha {
return inserted;
}

bool StorageImpl::insertBlocks(
ReturnWrapperType<bool> StorageImpl::insertBlocks(
const std::vector<std::shared_ptr<shared_model::interface::Block>>
&blocks) {
log_->info("create mutable storage");
Expand Down Expand Up @@ -518,29 +518,33 @@ namespace iroha {
}
}

std::shared_ptr<WsvQuery> StorageImpl::getWsvQuery() const {
ReturnWrapperType<std::shared_ptr<WsvQuery>> StorageImpl::getWsvQuery()
const {
std::shared_lock<std::shared_timed_mutex> lock(drop_mutex);
if (not connection_) {
log_->info("connection to database is not initialised");
return nullptr;
return DefaultError;
}
return std::make_shared<PostgresWsvQuery>(
auto wsv = std::make_shared<PostgresWsvQuery>(
std::make_unique<soci::session>(*connection_),
factory_,
log_manager_->getChild("WsvQuery")->getLogger());
return wrapValue<std::shared_ptr<WsvQuery>>(wsv);
}

std::shared_ptr<BlockQuery> StorageImpl::getBlockQuery() const {
ReturnWrapperType<std::shared_ptr<BlockQuery>> StorageImpl::getBlockQuery()
const {
std::shared_lock<std::shared_timed_mutex> lock(drop_mutex);
if (not connection_) {
log_->info("connection to database is not initialised");
return nullptr;
return DefaultError;
}
return std::make_shared<PostgresBlockQuery>(
auto block_query = std::make_shared<PostgresBlockQuery>(
std::make_unique<soci::session>(*connection_),
*block_store_,
converter_,
log_manager_->getChild("PostgresBlockQuery")->getLogger());
return wrapValue<std::shared_ptr<BlockQuery>>(block_query);
}

rxcpp::observable<std::shared_ptr<const shared_model::interface::Block>>
Expand Down
13 changes: 7 additions & 6 deletions irohad/ametsuchi/impl/storage_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
#ifndef IROHA_STORAGE_IMPL_HPP
#define IROHA_STORAGE_IMPL_HPP

#include "ametsuchi/storage.hpp"
#include "ametsuchi/reconnection/unsafe_storage.hpp"

#include <atomic>
#include <cmath>
Expand Down Expand Up @@ -34,7 +34,7 @@ namespace iroha {
std::unique_ptr<KeyValueStorage> block_store;
};

class StorageImpl : public Storage {
class StorageImpl : public UnsafeStorage {
protected:
static expected::Result<bool, std::string> createDatabaseIfNotExist(
const std::string &dbname,
Expand Down Expand Up @@ -83,15 +83,15 @@ namespace iroha {
* @param blocks - block for insertion
* @return true if all blocks are inserted
*/
bool insertBlock(
ReturnWrapperType<bool> insertBlock(
std::shared_ptr<const shared_model::interface::Block> block) override;

/**
* Insert blocks without validation
* @param blocks - collection of blocks for insertion
* @return true if inserted
*/
bool insertBlocks(
ReturnWrapperType<bool> insertBlocks(
const std::vector<std::shared_ptr<shared_model::interface::Block>>
&blocks) override;

Expand All @@ -107,9 +107,10 @@ namespace iroha {
boost::optional<std::unique_ptr<LedgerState>> commitPrepared(
std::shared_ptr<const shared_model::interface::Block> block) override;

std::shared_ptr<WsvQuery> getWsvQuery() const override;
ReturnWrapperType<std::shared_ptr<WsvQuery>> getWsvQuery() const override;

std::shared_ptr<BlockQuery> getBlockQuery() const override;
ReturnWrapperType<std::shared_ptr<BlockQuery>> getBlockQuery()
const override;

rxcpp::observable<std::shared_ptr<const shared_model::interface::Block>>
on_commit() override;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/**
* Copyright Soramitsu Co., Ltd. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/

#include "ametsuchi/reconnection/impl/k_times_reconnection_strategy.hpp"

using namespace iroha::ametsuchi;

KTimesReconnectionStorageStrategy::KTimesReconnectionStorageStrategy(
size_t number_of_recalls)
: number_of_recalls_(number_of_recalls) {}

bool KTimesReconnectionStorageStrategy::canInvoke(const Tag &tag) {
CollectionType::accessor accessor;
while (true) {
bool is_found = invokes_.find(accessor, tag);
if (is_found) {
// assume the tag is unique and doesn't share between threads
accessor->second++;
} else {
bool is_inserted = invokes_.insert(accessor, tag);
if (is_inserted) {
accessor->second = 1;
} else {
continue;
}
}
break;
}
return accessor->second <= number_of_recalls_;
}

void KTimesReconnectionStorageStrategy::reset(const Tag &tag) {
// intentionally don't care about status of erasing
invokes_.erase(tag);
}

KTimesReconnectionStorageStrategy::Tag
KTimesReconnectionStorageStrategy::makeTag(const Tag &prefix) {
uint64_t gotten_value = tag_counter++;
return prefix + std::to_string(gotten_value);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/**
* Copyright Soramitsu Co., Ltd. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/

#ifndef IROHA_K_TIMES_RECONNECTION_STRATEGY_HPP
#define IROHA_K_TIMES_RECONNECTION_STRATEGY_HPP

#include "ametsuchi/reconnection/reconnection_strategy.hpp"

#include <atomic>

#include "tbb/concurrent_hash_map.h"

namespace iroha {
namespace ametsuchi {

/**
* Class provides implementation of reconnection strategy based on number of
* attempts.
*
* Note: all methods are thread-safe
*/
class KTimesReconnectionStorageStrategy
: public ReconnectionStorageStrategy {
public:
/**
* @param number_of_recalls - number of failure attempts before
* reconnection
*/
KTimesReconnectionStorageStrategy(size_t number_of_attempts);

bool canInvoke(const Tag &) override;

void reset(const Tag &) override;

Tag makeTag(const Tag &prefix) override;

private:
/// shortcut for tag collection type
using CollectionType =
tbb::concurrent_hash_map<ReconnectionStorageStrategy::Tag, size_t>;
CollectionType invokes_;
size_t number_of_recalls_;

/// counter for making uniqueness tags
std::atomic<uint64_t> tag_counter{0u};
};
} // namespace ametsuchi
} // namespace iroha

#endif // IROHA_K_TIMES_RECONNECTION_STRATEGY_HPP
Loading