Skip to content

Commit

Permalink
Merge pull request #31 from phobosxy/fix_use_session_pool
Browse files Browse the repository at this point in the history
fix: fixed use sesison pool, reuse code;
  • Loading branch information
suboch authored May 5, 2023
2 parents 49e431f + bd7f7f8 commit f2e5d0f
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 23 deletions.
5 changes: 3 additions & 2 deletions .github/workflows/pull_request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,15 @@ jobs:
- name: Install dependencies on macos
if: runner.os == 'macOS'
run: |
brew install boost
brew install boost llvm openssl
ln -s "$(brew --prefix llvm)/bin/clang-format" "/usr/local/bin/clang-format"
ln -s "$(brew --prefix llvm)/bin/clang-tidy" "/usr/local/bin/clang-tidy"
brew install openssl
# Run tests with proper build type
- name: Test
env:
BUILD_TYPE: ${{ matrix.build-type }}
# Disbale Tsan on Linux because atomic_thread_fence unsupported by gcc
if: runner.os != 'Linux' || matrix.build-type != 'Tsan'
run: .github/workflows/run.sh test-type $BUILD_TYPE
# Lint code
- name: Lint
Expand Down
3 changes: 3 additions & 0 deletions include/stream-client/connector/connection_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,9 @@ class base_connection_pool

std::atomic_bool watch_pool_{false}; ///< Flag to stop @p pool_watcher_.
std::thread pool_watcher_; ///< Thread to run watch_pool_routine() in.

bool ensure_session(std::unique_lock<std::timed_mutex>& pool_lk, boost::system::error_code& ec,
const time_point_type& deadline) const;
};

//! Connections pool with sockets over plain TCP protocol.
Expand Down
40 changes: 21 additions & 19 deletions include/stream-client/connector/impl/connection_pool.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,29 @@ base_connection_pool<Connector, Strategy>::~base_connection_pool()
}

template <typename Connector, typename Strategy>
std::unique_ptr<typename base_connection_pool<Connector, Strategy>::stream_type>
base_connection_pool<Connector, Strategy>::get_session(boost::system::error_code& ec, const time_point_type& deadline)
bool base_connection_pool<Connector, Strategy>::ensure_session(std::unique_lock<std::timed_mutex>& pool_lk,
boost::system::error_code& ec,
const time_point_type& deadline) const
{
std::unique_lock<std::timed_mutex> pool_lk(pool_mutex_, std::defer_lock);
if (!pool_lk.try_lock_until(deadline)) {
// failed to lock pool_mutex_
ec = boost::asio::error::timed_out;
return nullptr;
return false;
}
if (sesson_pool_.empty() && !pool_cv_.wait_until(pool_lk, deadline, [this] { return !sesson_pool_.empty(); })) {
if (!pool_cv_.wait_until(pool_lk, deadline, [this] { return !sesson_pool_.empty(); })) {
// session pool is still empty
ec = boost::asio::error::not_found;
return false;
}
return true;
}

template <typename Connector, typename Strategy>
std::unique_ptr<typename base_connection_pool<Connector, Strategy>::stream_type>
base_connection_pool<Connector, Strategy>::get_session(boost::system::error_code& ec, const time_point_type& deadline)
{
std::unique_lock<std::timed_mutex> pool_lk(pool_mutex_, std::defer_lock);
if (!ensure_session(pool_lk, ec, deadline)) {
return nullptr;
}

Expand Down Expand Up @@ -103,16 +114,7 @@ bool base_connection_pool<Connector, Strategy>::is_connected(boost::system::erro
const time_point_type& deadline) const
{
std::unique_lock<std::timed_mutex> pool_lk(pool_mutex_, std::defer_lock);
if (!pool_lk.try_lock_until(deadline)) {
// failed to lock pool_mutex_
ec = boost::asio::error::timed_out;
return false;
}
if (sesson_pool_.empty() && !pool_cv_.wait_until(pool_lk, deadline, [this] { return !sesson_pool_.empty(); })) {
// session pool is still empty
return false;
}
return true;
return ensure_session(pool_lk, ec, deadline);
}

template <typename Connector, typename Strategy>
Expand All @@ -122,16 +124,16 @@ void base_connection_pool<Connector, Strategy>::watch_pool_routine()

while (watch_pool_.load(std::memory_order_acquire)) {
// try to lock pool mutex
std::unique_lock<std::timed_mutex> pool_lk(pool_mutex_, std::defer_lock);
if (!pool_lk.try_lock_for(lock_timeout)) {
std::unique_lock<std::timed_mutex> pool_lk(pool_mutex_, lock_timeout);
if (!pool_lk.owns_lock()) {
continue;
}

// remove session which idling past idle_timeout_
std::size_t pool_current_size = 0;
const auto now = clock_type::now();
for (auto pool_it = sesson_pool_.begin(); pool_it != sesson_pool_.end();) {
const auto idle_for = clock_type::now() - pool_it->first;
if (idle_for >= idle_timeout_) {
if (now - pool_it->first >= idle_timeout_) {
pool_it = sesson_pool_.erase(pool_it);
} else {
++pool_it;
Expand Down
6 changes: 4 additions & 2 deletions include/stream-client/connector/impl/connector.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ void base_connector<Stream>::resolve_routine()
static const auto lock_timeout = std::chrono::milliseconds(100);

while (resolving_thread_running_.load(std::memory_order_acquire)) {
std::unique_lock<std::timed_mutex> resolve_needed_lk(resolve_needed_mutex_, std::defer_lock);
if (!resolve_needed_lk.try_lock_for(lock_timeout) ||
std::unique_lock<std::timed_mutex> resolve_needed_lk(resolve_needed_mutex_, lock_timeout);
if (!resolve_needed_lk.owns_lock() ||
!resolve_needed_cv_.wait_for(resolve_needed_lk, lock_timeout, [this] { return resolve_needed_; })) {
continue;
}
Expand All @@ -101,12 +101,14 @@ void base_connector<Stream>::resolve_routine()
resolver_endpoint_iterator_type new_endpoints = resolver_.resolve(resolve_ec);
set_resolve_error(resolve_ec);
if (resolve_ec) {
resolve_needed_lk.unlock();
std::this_thread::sleep_for(std::chrono::milliseconds(50));
continue;
}

resolve_needed_ = false;
update_endpoints(std::move(new_endpoints));
resolve_needed_lk.unlock();
notify_resolve_done();
}
}
Expand Down

0 comments on commit f2e5d0f

Please sign in to comment.