Skip to content

Commit

Permalink
wip (not working)
Browse files Browse the repository at this point in the history
Signed-off-by: Felix Gündling <felix.guendling@gmail.com>
  • Loading branch information
felixguendling committed Dec 1, 2024
1 parent 8438534 commit 06f6d7c
Show file tree
Hide file tree
Showing 6 changed files with 217 additions and 11 deletions.
2 changes: 1 addition & 1 deletion .pkg
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
[net]
url=git@github.com:motis-project/net.git
branch=master
commit=9c660c3fe22f5acc0eb39b00061b8e7027fef975
commit=30f8b05a435af6f8327376d6e95089b4341ac4f7
[openapi-cpp]
url=git@github.com:triptix-tech/openapi-cpp.git
branch=master
Expand Down
4 changes: 2 additions & 2 deletions .pkg.lock
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
12309006678901967697
15260926559282078310
cista 847b27100b7e730370b810ce62206a66b0bf2d79
zlib-ng 68ab3e2d80253ec5dc3c83691d9ff70477b32cd3
boost 082a2d83c827e43f3b7eb8d6f0a1102cddb897ad
Expand All @@ -11,7 +11,7 @@ res b759b93316afeb529b6cb5b2548b24c41e382fb0
date ce88cc33b5551f66655614eeebb7c5b7189025fb
yaml-cpp 1d8ca1f35eb3a9c9142462b28282a848e5d29a91
openapi-cpp dac46d043f07a119d8b7d9ccb47e51049b259bfe
net 9c660c3fe22f5acc0eb39b00061b8e7027fef975
net 30f8b05a435af6f8327376d6e95089b4341ac4f7
PEGTL 1c1aa6e650e4d26f10fa398f148ec0cdc5f0808d
oh d21c30f40e52a83d6dc09bcffd0067598b5ec069
doctest 70e8f76437b76dd5e9c0a2eb9b907df190ab71a0
Expand Down
41 changes: 41 additions & 0 deletions include/motis/scheduler/runner.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
#pragma once

#include <atomic>
#include <functional>

#include "boost/fiber/algo/work_stealing.hpp"
#include "boost/fiber/buffered_channel.hpp"
#include "boost/fiber/operations.hpp"

#include "net/web_server/query_router.h"

#include "motis/scheduler/scheduler_algo.h"

namespace motis {

struct runner {
runner(std::size_t const n_threads, std::size_t const buffer_size)
: init_barrier_{n_threads}, schedulers_{n_threads}, ch_{buffer_size} {}

auto run_fn() {
return [&]() {
/*
boost::fibers::use_scheduling_algorithm<
boost::fibers::algo::work_stealing>(schedulers_.size());
*/
boost::fibers::use_scheduling_algorithm<scheduler_algo>(
init_barrier_, schedulers_, ++next_id_);
auto t = net::fiber_exec::task_t{};
while (ch_.pop(t) != boost::fibers::channel_op_status::closed) {
t();
}
};
}

boost::fibers::detail::thread_barrier init_barrier_;
std::atomic_uint32_t next_id_{0U};
std::vector<scheduler_algo*> schedulers_;
net::fiber_exec::channel_t ch_;
};

} // namespace motis
54 changes: 54 additions & 0 deletions include/motis/scheduler/scheduler_algo.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
#pragma once

#include <cinttypes>
#include <vector>

#include "boost/context/detail/prefetch.hpp"
#include "boost/fiber/algo/algorithm.hpp"
#include "boost/fiber/detail/context_spinlock_queue.hpp"
#include "boost/fiber/detail/thread_barrier.hpp"
#include "boost/fiber/properties.hpp"
#include "boost/fiber/scheduler.hpp"

namespace motis {

struct fiber_props : public boost::fibers::fiber_properties {
fiber_props(boost::fibers::context*);

// In order to keep request latency low, finishing already started requests
// has to be prioritized over new requests. Otherwise, the server only starts
// new requests and never finishes anything.
enum class type : std::uint8_t {
kWork, // follow-up work scheduled by work or I/O
kIo // initial work scheduled by I/O (web request / batch query)
} type_{type::kIo};
};

struct scheduler_algo
: public boost::fibers::algo::algorithm_with_properties<fiber_props> {
using ready_queue_t = boost::fibers::scheduler::ready_queue_type;

scheduler_algo(boost::fibers::detail::thread_barrier&,
std::vector<scheduler_algo*>& schedulers,
std::uint32_t id);

boost::fibers::context* steal() noexcept;

virtual void awakened(boost::fibers::context* ctx,
fiber_props& props) noexcept override;
virtual boost::fibers::context* pick_next() noexcept override;
virtual bool has_ready_fibers() const noexcept override;
virtual void suspend_until(
std::chrono::steady_clock::time_point const&) noexcept override;
virtual void notify() noexcept override;

std::vector<scheduler_algo*>& schedulers_;
bool suspend_{false};
std::uint32_t id_;
boost::fibers::detail::context_spinlock_queue work_queue_, io_queue_;
std::mutex mtx_{};
std::condition_variable cnd_{};
bool flag_{false};
};

} // namespace motis
110 changes: 110 additions & 0 deletions src/scheduler/scheduler_algo.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
#include "motis/scheduler/scheduler_algo.h"

#include "boost/context/detail/prefetch.hpp"
#include "boost/fiber/context.hpp"
#include "boost/fiber/detail/context_spinlock_queue.hpp"
#include "boost/fiber/properties.hpp"
#include "boost/fiber/scheduler.hpp"
#include "boost/fiber/type.hpp"

namespace bf = boost::fibers;

namespace motis {

fiber_props::fiber_props(bf::context* ctx) : fiber_properties{ctx} {}

scheduler_algo::scheduler_algo(boost::fibers::detail::thread_barrier& b,
std::vector<scheduler_algo*>& schedulers,
std::uint32_t const id)
: schedulers_{schedulers}, id_{id} {
schedulers_[id] = this;
b.wait();
}

void scheduler_algo::awakened(bf::context* ctx, fiber_props& props) noexcept {
if (!ctx->is_context(bf::type::pinned_context)) {
ctx->detach();
}
auto const orig_type = props.type_;
props.type_ = fiber_props::type::kWork; // Continuations are prioritized.
orig_type == fiber_props::type::kWork ? work_queue_.push(ctx)
: io_queue_.push(ctx);
}

bf::context* scheduler_algo::pick_next() noexcept {
using boost::context::detail::prefetch_range;
bf::context* victim = nullptr;
if (victim = work_queue_.pop(); victim != nullptr) {
// Highest priority: work continuation.
prefetch_range(victim, sizeof(bf::context));
if (!victim->is_context(bf::type::pinned_context)) {
bf::context::active()->attach(victim);
}
} else if (victim = io_queue_.pop(); victim != nullptr) {
// Lower priority: I/O from our own queue.
prefetch_range(victim, sizeof(bf::context));
if (!victim->is_context(bf::type::pinned_context)) {
bf::context::active()->attach(victim);
}
} else { // Fallback: try to steal from another thread.
auto id = 0U;
auto count = std::size_t{0U};
auto size = schedulers_.size();
static thread_local std::minstd_rand generator{std::random_device{}()};
auto distribution = std::uniform_int_distribution<std::uint32_t>{
0, static_cast<std::uint32_t>(size - 1)};

do {
do {
++count;
id = distribution(generator);
} while (id == id_ /* don't steal from own scheduler */);
victim = schedulers_[id]->steal();
} while (victim == nullptr && count < size);

if (victim != nullptr) {
prefetch_range(victim, sizeof(bf::context));
BOOST_ASSERT(!victim->is_context(bf::type::pinned_context));
bf::context::active()->attach(victim);
}
}
return victim;
}

bool scheduler_algo::has_ready_fibers() const noexcept {
return !(work_queue_.empty() && io_queue_.empty());
}

bf::context* scheduler_algo::steal() noexcept {
auto work = work_queue_.pop();
if (work != nullptr) {
return work;
}
return io_queue_.pop();
}

void scheduler_algo::suspend_until(
std::chrono::steady_clock::time_point const& time_point) noexcept {
if (suspend_) {
if ((std::chrono::steady_clock::time_point::max)() == time_point) {
auto lk = std::unique_lock<std::mutex>{mtx_};
cnd_.wait(lk, [this]() { return flag_; });
flag_ = false;
} else {
auto lk = std::unique_lock<std::mutex>{mtx_};
cnd_.wait_until(lk, time_point, [this]() { return flag_; });
flag_ = false;
}
}
}

void scheduler_algo::notify() noexcept {
if (suspend_) {
auto lk = std::unique_lock<std::mutex>{mtx_};
flag_ = true;
lk.unlock();
cnd_.notify_all();
}
}

} // namespace motis
17 changes: 9 additions & 8 deletions src/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,11 @@
#include "motis/endpoints/update_elevator.h"
#include "motis/gbfs/update.h"
#include "motis/rt_update.h"
#include "motis/scheduler/runner.h"
#include "motis/scheduler/scheduler_algo.h"

namespace fs = std::filesystem;
namespace asio = boost::asio;
namespace bf = boost::fibers;

namespace motis {

Expand All @@ -54,10 +55,12 @@ void POST(auto&& r, std::string target, From& from) {
}

int server(data d, config const& c) {
auto const server_config = c.server_.value_or(config::server{});

auto ioc = asio::io_context{};
auto s = net::web_server{ioc};
auto ch = net::fiber_exec::channel_t{2048U};
auto qr = net::query_router{net::fiber_exec(ioc, ch)};
auto r = runner{server_config.n_threads_, 1024U};
auto qr = net::query_router{net::fiber_exec{ioc, r.ch_}};

POST<ep::matches>(qr, "/api/matches", d);
POST<ep::elevators>(qr, "/api/elevators", d);
Expand All @@ -82,7 +85,6 @@ int server(data d, config const& c) {
qr.route("GET", "/tiles/", ep::tiles{*d.tiles_});
}

auto const server_config = c.server_.value_or(config::server{});
qr.serve_files(server_config.web_folder_);
qr.enable_cors();
s.set_timeout(std::chrono::minutes{5});
Expand Down Expand Up @@ -119,10 +121,9 @@ int server(data d, config const& c) {
});
}

auto threads = std::vector<std::thread>(
static_cast<unsigned>(std::max(1U, server_config.n_threads_)));
auto threads = std::vector<std::thread>{server_config.n_threads_};
for (auto [i, t] : utl::enumerate(threads)) {
t = std::thread(net::fiber_exec::run(ch, server_config.n_threads_));
t = std::thread{r.run_fn()};
utl::set_thread_name(t, fmt::format("motis worker {}", i));
}

Expand All @@ -143,7 +144,7 @@ int server(data d, config const& c) {
server_config.host_, server_config.port_, server_config.port_);
net::run(ioc)();

ch.close();
r.ch_.close();
for (auto& t : threads) {
t.join();
}
Expand Down

0 comments on commit 06f6d7c

Please sign in to comment.