Skip to content

Commit

Permalink
Make ClientSideWeightedRoundRobinLoadBalancer a
Browse files Browse the repository at this point in the history
`ThreadAwareLoadBalancer`.

Signed-off-by: Misha Efimov <mef@google.com>
  • Loading branch information
efimki committed Sep 22, 2024
1 parent e433b78 commit 64fd956
Show file tree
Hide file tree
Showing 5 changed files with 204 additions and 147 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,98 +16,73 @@
#include "absl/status/status.h"
#include "xds/data/orca/v3/orca_load_report.pb.h"

using Envoy::MonotonicTime;
using ::envoy::extensions::load_balancing_policies::client_side_weighted_round_robin::v3::
ClientSideWeightedRoundRobin;
using Envoy::Upstream::Host;
using xds::data::orca::v3::OrcaLoadReport;

#if TEST_THREAD_SUPPORTED
#define IS_MAIN_OR_TEST_THREAD() (Envoy::Thread::MainThread::isMainOrTestThread())
#else // !TEST_THREAD_SUPPORTED -- just check for the main thread.
#define IS_MAIN_OR_TEST_THREAD() (Envoy::Thread::MainThread::isMainThread())
#endif // TEST_THREAD_SUPPORTED
namespace Envoy {
namespace Upstream {

namespace {

std::string getHostAddress(const Host* host) {
if (host == nullptr || host->address() == nullptr) {
return "unknown";
}
return host->address()->asString();
}

} // namespace

namespace Envoy {
namespace Upstream {

ClientSideWeightedRoundRobinLoadBalancer::ClientSideWeightedRoundRobinLoadBalancer(
ClientSideWeightedRoundRobinLoadBalancer::WorkerLocalLb::WorkerLocalLb(
const PrioritySet& priority_set, const PrioritySet* local_priority_set, ClusterLbStats& stats,
Runtime::Loader& runtime, Random::RandomGenerator& random,
const envoy::config::cluster::v3::Cluster::CommonLbConfig& common_config,
const ClientSideWeightedRoundRobin& client_side_weighted_round_robin_config,
TimeSource& time_source, Event::Dispatcher& main_thread_dispatcher)
const ClientSideWeightedRoundRobinLbProto& client_side_weighted_round_robin_config,
TimeSource& time_source)
: EdfLoadBalancerBase(
priority_set, local_priority_set, stats, runtime, random,
PROTOBUF_PERCENT_TO_ROUNDED_INTEGER_OR_DEFAULT(common_config, healthy_panic_threshold,
100, 50),
LoadBalancerConfigHelper::localityLbConfigFromCommonLbConfig(common_config),
/*slow_start_config=*/std::nullopt, time_source) {
ENVOY_LOG(trace, "RoundRobinLbConfig config {}",
client_side_weighted_round_robin_config.DebugString());
initFromConfig(client_side_weighted_round_robin_config);
if (IS_MAIN_OR_TEST_THREAD()) {
startWeightUpdatesOnMainThread(main_thread_dispatcher);
}
initialize();
orca_load_report_handler_ = std::make_shared<OrcaLoadReportHandler>(
client_side_weighted_round_robin_config, time_source_);
}

// {LoadBalancer} Interface implementation.
void ClientSideWeightedRoundRobinLoadBalancer::refreshHostSource(const HostsSource& source) {
void ClientSideWeightedRoundRobinLoadBalancer::WorkerLocalLb::refreshHostSource(
const HostsSource& source) {
// insert() is used here on purpose so that we don't overwrite the index if
// the host source already exists. Note that host sources will never be
// removed, but given how uncommon this is it probably doesn't matter.
rr_indexes_.insert({source, seed_});
// If the list of hosts changes, the order of picks change. Discard the
// index.
peekahead_index_ = 0;

if (!IS_MAIN_OR_TEST_THREAD()) {
return;
}

// On the main thread ensure that all hosts have client side lb policy data.
addClientSideLbPolicyDataToHosts(hostSourceToHosts(source));
}

HostConstSharedPtr
ClientSideWeightedRoundRobinLoadBalancer::chooseHost(LoadBalancerContext* context) {
ClientSideWeightedRoundRobinLoadBalancer::WorkerLocalLb::chooseHost(LoadBalancerContext* context) {
HostConstSharedPtr host = EdfLoadBalancerBase::chooseHost(context);
if (context != nullptr) {
// Configure callbacks to receive ORCA load reports.
// Configure callbacks to receive ORCA load report.
context->setOrcaLoadReportCallbacks(orca_load_report_handler_);
}
return host;
}

double ClientSideWeightedRoundRobinLoadBalancer::hostWeight(const Host& host) const {
double ClientSideWeightedRoundRobinLoadBalancer::WorkerLocalLb::hostWeight(const Host& host) const {
ENVOY_LOG(trace, "hostWeight {} = {}", getHostAddress(&host), host.weight());
return host.weight();
}

HostConstSharedPtr
ClientSideWeightedRoundRobinLoadBalancer::unweightedHostPeek(const HostVector& hosts_to_use,
const HostsSource& source) {
HostConstSharedPtr ClientSideWeightedRoundRobinLoadBalancer::WorkerLocalLb::unweightedHostPeek(
const HostVector& hosts_to_use, const HostsSource& source) {
auto i = rr_indexes_.find(source);
if (i == rr_indexes_.end()) {
return nullptr;
}
return hosts_to_use[(i->second + (peekahead_index_)++) % hosts_to_use.size()];
}

HostConstSharedPtr
ClientSideWeightedRoundRobinLoadBalancer::unweightedHostPick(const HostVector& hosts_to_use,
const HostsSource& source) {
HostConstSharedPtr ClientSideWeightedRoundRobinLoadBalancer::WorkerLocalLb::unweightedHostPick(
const HostVector& hosts_to_use, const HostsSource& source) {
if (peekahead_index_ > 0) {
--peekahead_index_;
}
Expand All @@ -120,7 +95,7 @@ ClientSideWeightedRoundRobinLoadBalancer::unweightedHostPick(const HostVector& h
}

ClientSideWeightedRoundRobinLoadBalancer::OrcaLoadReportHandler::OrcaLoadReportHandler(
const ClientSideWeightedRoundRobin& client_side_weighted_round_robin_config,
const ClientSideWeightedRoundRobinLbProto& client_side_weighted_round_robin_config,
TimeSource& time_source)
: time_source_(time_source) {
metric_names_for_computing_utilization_ = std::vector<std::string>(
Expand All @@ -131,7 +106,7 @@ ClientSideWeightedRoundRobinLoadBalancer::OrcaLoadReportHandler::OrcaLoadReportH
}

absl::Status ClientSideWeightedRoundRobinLoadBalancer::OrcaLoadReportHandler::onOrcaLoadReport(
const OrcaLoadReport& orca_load_report, const HostDescription& host_description) {
const OrcaLoadReportProto& orca_load_report, const HostDescription& host_description) {
const Host* host = dynamic_cast<const Host*>(&host_description);
ENVOY_BUG(host != nullptr, "Unable to cast HostDescription to Host.");
ENVOY_LOG(trace,
Expand All @@ -149,12 +124,9 @@ absl::Status ClientSideWeightedRoundRobinLoadBalancer::OrcaLoadReportHandler::on
}

void ClientSideWeightedRoundRobinLoadBalancer::initFromConfig(
const envoy::extensions::load_balancing_policies::client_side_weighted_round_robin::v3::
ClientSideWeightedRoundRobin& client_side_weighted_round_robin_config) {
initialize();

orca_load_report_handler_ = std::make_shared<OrcaLoadReportHandler>(
client_side_weighted_round_robin_config, time_source_);
const ClientSideWeightedRoundRobinLbProto& client_side_weighted_round_robin_config) {
ENVOY_LOG(trace, "ClientSideWeightedRoundRobinLbConfig config {}",
client_side_weighted_round_robin_config.DebugString());
blackout_period_ = std::chrono::milliseconds(
PROTOBUF_GET_MS_OR_DEFAULT(client_side_weighted_round_robin_config, blackout_period, 10000));
weight_expiration_period_ = std::chrono::milliseconds(PROTOBUF_GET_MS_OR_DEFAULT(
Expand All @@ -165,9 +137,6 @@ void ClientSideWeightedRoundRobinLoadBalancer::initFromConfig(

void ClientSideWeightedRoundRobinLoadBalancer::startWeightUpdatesOnMainThread(
Event::Dispatcher& main_thread_dispatcher) {
if (!IS_MAIN_OR_TEST_THREAD()) {
return;
}
weight_calculation_timer_ = main_thread_dispatcher.createTimer([this]() -> void {
updateWeightsOnMainThread();
weight_calculation_timer_->enableTimer(weight_update_period_);
Expand All @@ -177,10 +146,8 @@ void ClientSideWeightedRoundRobinLoadBalancer::startWeightUpdatesOnMainThread(

void ClientSideWeightedRoundRobinLoadBalancer::updateWeightsOnMainThread() {
ENVOY_LOG(trace, "updateWeightsOnMainThread");
ENVOY_BUG(IS_MAIN_OR_TEST_THREAD(), "Update Weights NOT on MainThread");
for (uint32_t priority = 0; priority < priority_set_.hostSetsPerPriority().size(); ++priority) {
HostsSource source(priority, HostsSource::SourceType::AllHosts);
updateWeightsOnHosts(hostSourceToHosts(source));
for (const HostSetPtr& host_set : priority_set_.hostSetsPerPriority()) {
updateWeightsOnHosts(host_set->hosts());
}
}

Expand Down Expand Up @@ -246,7 +213,7 @@ ClientSideWeightedRoundRobinLoadBalancer::getClientSideWeightIfValidFromHost(
absl::MutexLock lock(&client_side_data->mu_);
// If non_empty_since_ is too recent, we should use the default weight.
if (client_side_data->non_empty_since_ > min_non_empty_since) {
ENVOY_LOG(error,
ENVOY_LOG(trace,
"Host {} ClientSideHostLbPolicyData non_empty_since_ is too "
"recent: {} > {}",
getHostAddress(&host), client_side_data->non_empty_since_.time_since_epoch().count(),
Expand All @@ -266,7 +233,7 @@ ClientSideWeightedRoundRobinLoadBalancer::getClientSideWeightIfValidFromHost(

double
ClientSideWeightedRoundRobinLoadBalancer::OrcaLoadReportHandler::getUtilizationFromOrcaReport(
const OrcaLoadReport& orca_load_report,
const OrcaLoadReportProto& orca_load_report,
const std::vector<std::string>& metric_names_for_computing_utilization) {
// If application_utilization is valid, use it as the utilization metric.
double utilization = orca_load_report.application_utilization();
Expand All @@ -285,7 +252,7 @@ ClientSideWeightedRoundRobinLoadBalancer::OrcaLoadReportHandler::getUtilizationF

absl::StatusOr<uint32_t>
ClientSideWeightedRoundRobinLoadBalancer::OrcaLoadReportHandler::calculateWeightFromOrcaReport(
const OrcaLoadReport& orca_load_report,
const OrcaLoadReportProto& orca_load_report,
const std::vector<std::string>& metric_names_for_computing_utilization,
double error_utilization_penalty) {
double qps = orca_load_report.rps_fractional();
Expand All @@ -312,7 +279,7 @@ ClientSideWeightedRoundRobinLoadBalancer::OrcaLoadReportHandler::calculateWeight
}

absl::Status ClientSideWeightedRoundRobinLoadBalancer::OrcaLoadReportHandler::
updateClientSideDataFromOrcaLoadReport(const OrcaLoadReport& orca_load_report,
updateClientSideDataFromOrcaLoadReport(const OrcaLoadReportProto& orca_load_report,
ClientSideHostLbPolicyData& client_side_data) {
const absl::StatusOr<uint32_t> weight = calculateWeightFromOrcaReport(
orca_load_report, metric_names_for_computing_utilization_, error_utilization_penalty_);
Expand All @@ -331,5 +298,36 @@ absl::Status ClientSideWeightedRoundRobinLoadBalancer::OrcaLoadReportHandler::
return absl::OkStatus();
}

Upstream::LoadBalancerPtr ClientSideWeightedRoundRobinLoadBalancer::WorkerLocalLbFactory::create(
Upstream::LoadBalancerParams params) {
const auto* typed_lb_config =
dynamic_cast<const ClientSideWeightedRoundRobinLbConfig*>(lb_config_.ptr());
return std::make_unique<Upstream::ClientSideWeightedRoundRobinLoadBalancer::WorkerLocalLb>(
params.priority_set, params.local_priority_set, cluster_info_.lbStats(), runtime_, random_,
cluster_info_.lbConfig(), typed_lb_config->lb_config_, time_source_);
}

ClientSideWeightedRoundRobinLoadBalancer::ClientSideWeightedRoundRobinLoadBalancer(
OptRef<const Upstream::LoadBalancerConfig> lb_config, const Upstream::ClusterInfo& cluster_info,
const Upstream::PrioritySet& priority_set, Runtime::Loader& runtime,
Envoy::Random::RandomGenerator& random, TimeSource& time_source)
: factory_(std::make_shared<WorkerLocalLbFactory>(lb_config, cluster_info, priority_set,
runtime, random, time_source)),
lb_config_(lb_config), cluster_info_(cluster_info), priority_set_(priority_set),
runtime_(runtime), random_(random), time_source_(time_source) {}

absl::Status ClientSideWeightedRoundRobinLoadBalancer::initialize() {
// Ensure that all hosts have client side lb policy data.
for (const HostSetPtr& host_set : priority_set_.hostSetsPerPriority()) {
addClientSideLbPolicyDataToHosts(host_set->hosts());
}

const auto* typed_lb_config =
dynamic_cast<const ClientSideWeightedRoundRobinLbConfig*>(lb_config_.ptr());
initFromConfig(typed_lb_config->lb_config_);
startWeightUpdatesOnMainThread(typed_lb_config->main_thread_dispatcher_);
return absl::OkStatus();
}

} // namespace Upstream
} // namespace Envoy
Loading

0 comments on commit 64fd956

Please sign in to comment.