From 06f6d7ced188e59591aa159b2a6423403594060c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Felix=20G=C3=BCndling?= Date: Sun, 1 Dec 2024 21:47:57 +0100 Subject: [PATCH] wip (not working) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Felix Gündling --- .pkg | 2 +- .pkg.lock | 4 +- include/motis/scheduler/runner.h | 41 +++++++++ include/motis/scheduler/scheduler_algo.h | 54 +++++++++++ src/scheduler/scheduler_algo.cc | 110 +++++++++++++++++++++++ src/server.cc | 17 ++-- 6 files changed, 217 insertions(+), 11 deletions(-) create mode 100644 include/motis/scheduler/runner.h create mode 100644 include/motis/scheduler/scheduler_algo.h create mode 100644 src/scheduler/scheduler_algo.cc diff --git a/.pkg b/.pkg index a99ece219..5e5ffe28e 100644 --- a/.pkg +++ b/.pkg @@ -25,7 +25,7 @@ [net] url=git@github.com:motis-project/net.git branch=master - commit=9c660c3fe22f5acc0eb39b00061b8e7027fef975 + commit=30f8b05a435af6f8327376d6e95089b4341ac4f7 [openapi-cpp] url=git@github.com:triptix-tech/openapi-cpp.git branch=master diff --git a/.pkg.lock b/.pkg.lock index 082810db5..137032988 100644 --- a/.pkg.lock +++ b/.pkg.lock @@ -1,4 +1,4 @@ -12309006678901967697 +15260926559282078310 cista 847b27100b7e730370b810ce62206a66b0bf2d79 zlib-ng 68ab3e2d80253ec5dc3c83691d9ff70477b32cd3 boost 082a2d83c827e43f3b7eb8d6f0a1102cddb897ad @@ -11,7 +11,7 @@ res b759b93316afeb529b6cb5b2548b24c41e382fb0 date ce88cc33b5551f66655614eeebb7c5b7189025fb yaml-cpp 1d8ca1f35eb3a9c9142462b28282a848e5d29a91 openapi-cpp dac46d043f07a119d8b7d9ccb47e51049b259bfe -net 9c660c3fe22f5acc0eb39b00061b8e7027fef975 +net 30f8b05a435af6f8327376d6e95089b4341ac4f7 PEGTL 1c1aa6e650e4d26f10fa398f148ec0cdc5f0808d oh d21c30f40e52a83d6dc09bcffd0067598b5ec069 doctest 70e8f76437b76dd5e9c0a2eb9b907df190ab71a0 diff --git a/include/motis/scheduler/runner.h b/include/motis/scheduler/runner.h new file mode 100644 index 000000000..c259d37a2 --- /dev/null +++ b/include/motis/scheduler/runner.h @@ -0,0 +1,41 @@ +#pragma once + +#include +#include + +#include "boost/fiber/algo/work_stealing.hpp" +#include "boost/fiber/buffered_channel.hpp" +#include "boost/fiber/operations.hpp" + +#include "net/web_server/query_router.h" + +#include "motis/scheduler/scheduler_algo.h" + +namespace motis { + +struct runner { + runner(std::size_t const n_threads, std::size_t const buffer_size) + : init_barrier_{n_threads}, schedulers_{n_threads}, ch_{buffer_size} {} + + auto run_fn() { + return [&]() { + /* + boost::fibers::use_scheduling_algorithm< + boost::fibers::algo::work_stealing>(schedulers_.size()); + */ + boost::fibers::use_scheduling_algorithm( + init_barrier_, schedulers_, ++next_id_); + auto t = net::fiber_exec::task_t{}; + while (ch_.pop(t) != boost::fibers::channel_op_status::closed) { + t(); + } + }; + } + + boost::fibers::detail::thread_barrier init_barrier_; + std::atomic_uint32_t next_id_{0U}; + std::vector schedulers_; + net::fiber_exec::channel_t ch_; +}; + +} // namespace motis \ No newline at end of file diff --git a/include/motis/scheduler/scheduler_algo.h b/include/motis/scheduler/scheduler_algo.h new file mode 100644 index 000000000..1ef90faad --- /dev/null +++ b/include/motis/scheduler/scheduler_algo.h @@ -0,0 +1,54 @@ +#pragma once + +#include +#include + +#include "boost/context/detail/prefetch.hpp" +#include "boost/fiber/algo/algorithm.hpp" +#include "boost/fiber/detail/context_spinlock_queue.hpp" +#include "boost/fiber/detail/thread_barrier.hpp" +#include "boost/fiber/properties.hpp" +#include "boost/fiber/scheduler.hpp" + +namespace motis { + +struct fiber_props : public boost::fibers::fiber_properties { + fiber_props(boost::fibers::context*); + + // In order to keep request latency low, finishing already started requests + // has to be prioritized over new requests. Otherwise, the server only starts + // new requests and never finishes anything. + enum class type : std::uint8_t { + kWork, // follow-up work scheduled by work or I/O + kIo // initial work scheduled by I/O (web request / batch query) + } type_{type::kIo}; +}; + +struct scheduler_algo + : public boost::fibers::algo::algorithm_with_properties { + using ready_queue_t = boost::fibers::scheduler::ready_queue_type; + + scheduler_algo(boost::fibers::detail::thread_barrier&, + std::vector& schedulers, + std::uint32_t id); + + boost::fibers::context* steal() noexcept; + + virtual void awakened(boost::fibers::context* ctx, + fiber_props& props) noexcept override; + virtual boost::fibers::context* pick_next() noexcept override; + virtual bool has_ready_fibers() const noexcept override; + virtual void suspend_until( + std::chrono::steady_clock::time_point const&) noexcept override; + virtual void notify() noexcept override; + + std::vector& schedulers_; + bool suspend_{false}; + std::uint32_t id_; + boost::fibers::detail::context_spinlock_queue work_queue_, io_queue_; + std::mutex mtx_{}; + std::condition_variable cnd_{}; + bool flag_{false}; +}; + +} // namespace motis \ No newline at end of file diff --git a/src/scheduler/scheduler_algo.cc b/src/scheduler/scheduler_algo.cc new file mode 100644 index 000000000..ea9d642a5 --- /dev/null +++ b/src/scheduler/scheduler_algo.cc @@ -0,0 +1,110 @@ +#include "motis/scheduler/scheduler_algo.h" + +#include "boost/context/detail/prefetch.hpp" +#include "boost/fiber/context.hpp" +#include "boost/fiber/detail/context_spinlock_queue.hpp" +#include "boost/fiber/properties.hpp" +#include "boost/fiber/scheduler.hpp" +#include "boost/fiber/type.hpp" + +namespace bf = boost::fibers; + +namespace motis { + +fiber_props::fiber_props(bf::context* ctx) : fiber_properties{ctx} {} + +scheduler_algo::scheduler_algo(boost::fibers::detail::thread_barrier& b, + std::vector& schedulers, + std::uint32_t const id) + : schedulers_{schedulers}, id_{id} { + schedulers_[id] = this; + b.wait(); +} + +void scheduler_algo::awakened(bf::context* ctx, fiber_props& props) noexcept { + if (!ctx->is_context(bf::type::pinned_context)) { + ctx->detach(); + } + auto const orig_type = props.type_; + props.type_ = fiber_props::type::kWork; // Continuations are prioritized. + orig_type == fiber_props::type::kWork ? work_queue_.push(ctx) + : io_queue_.push(ctx); +} + +bf::context* scheduler_algo::pick_next() noexcept { + using boost::context::detail::prefetch_range; + bf::context* victim = nullptr; + if (victim = work_queue_.pop(); victim != nullptr) { + // Highest priority: work continuation. + prefetch_range(victim, sizeof(bf::context)); + if (!victim->is_context(bf::type::pinned_context)) { + bf::context::active()->attach(victim); + } + } else if (victim = io_queue_.pop(); victim != nullptr) { + // Lower priority: I/O from our own queue. + prefetch_range(victim, sizeof(bf::context)); + if (!victim->is_context(bf::type::pinned_context)) { + bf::context::active()->attach(victim); + } + } else { // Fallback: try to steal from another thread. + auto id = 0U; + auto count = std::size_t{0U}; + auto size = schedulers_.size(); + static thread_local std::minstd_rand generator{std::random_device{}()}; + auto distribution = std::uniform_int_distribution{ + 0, static_cast(size - 1)}; + + do { + do { + ++count; + id = distribution(generator); + } while (id == id_ /* don't steal from own scheduler */); + victim = schedulers_[id]->steal(); + } while (victim == nullptr && count < size); + + if (victim != nullptr) { + prefetch_range(victim, sizeof(bf::context)); + BOOST_ASSERT(!victim->is_context(bf::type::pinned_context)); + bf::context::active()->attach(victim); + } + } + return victim; +} + +bool scheduler_algo::has_ready_fibers() const noexcept { + return !(work_queue_.empty() && io_queue_.empty()); +} + +bf::context* scheduler_algo::steal() noexcept { + auto work = work_queue_.pop(); + if (work != nullptr) { + return work; + } + return io_queue_.pop(); +} + +void scheduler_algo::suspend_until( + std::chrono::steady_clock::time_point const& time_point) noexcept { + if (suspend_) { + if ((std::chrono::steady_clock::time_point::max)() == time_point) { + auto lk = std::unique_lock{mtx_}; + cnd_.wait(lk, [this]() { return flag_; }); + flag_ = false; + } else { + auto lk = std::unique_lock{mtx_}; + cnd_.wait_until(lk, time_point, [this]() { return flag_; }); + flag_ = false; + } + } +} + +void scheduler_algo::notify() noexcept { + if (suspend_) { + auto lk = std::unique_lock{mtx_}; + flag_ = true; + lk.unlock(); + cnd_.notify_all(); + } +} + +} // namespace motis \ No newline at end of file diff --git a/src/server.cc b/src/server.cc index b2c109202..90c0d72e0 100644 --- a/src/server.cc +++ b/src/server.cc @@ -32,10 +32,11 @@ #include "motis/endpoints/update_elevator.h" #include "motis/gbfs/update.h" #include "motis/rt_update.h" +#include "motis/scheduler/runner.h" +#include "motis/scheduler/scheduler_algo.h" namespace fs = std::filesystem; namespace asio = boost::asio; -namespace bf = boost::fibers; namespace motis { @@ -54,10 +55,12 @@ void POST(auto&& r, std::string target, From& from) { } int server(data d, config const& c) { + auto const server_config = c.server_.value_or(config::server{}); + auto ioc = asio::io_context{}; auto s = net::web_server{ioc}; - auto ch = net::fiber_exec::channel_t{2048U}; - auto qr = net::query_router{net::fiber_exec(ioc, ch)}; + auto r = runner{server_config.n_threads_, 1024U}; + auto qr = net::query_router{net::fiber_exec{ioc, r.ch_}}; POST(qr, "/api/matches", d); POST(qr, "/api/elevators", d); @@ -82,7 +85,6 @@ int server(data d, config const& c) { qr.route("GET", "/tiles/", ep::tiles{*d.tiles_}); } - auto const server_config = c.server_.value_or(config::server{}); qr.serve_files(server_config.web_folder_); qr.enable_cors(); s.set_timeout(std::chrono::minutes{5}); @@ -119,10 +121,9 @@ int server(data d, config const& c) { }); } - auto threads = std::vector( - static_cast(std::max(1U, server_config.n_threads_))); + auto threads = std::vector{server_config.n_threads_}; for (auto [i, t] : utl::enumerate(threads)) { - t = std::thread(net::fiber_exec::run(ch, server_config.n_threads_)); + t = std::thread{r.run_fn()}; utl::set_thread_name(t, fmt::format("motis worker {}", i)); } @@ -143,7 +144,7 @@ int server(data d, config const& c) { server_config.host_, server_config.port_, server_config.port_); net::run(ioc)(); - ch.close(); + r.ch_.close(); for (auto& t : threads) { t.join(); }