Skip to content

Commit

Permalink
tfscheduler: add parameter for stale TF duration (consul)
Browse files Browse the repository at this point in the history
  • Loading branch information
ironMann committed Oct 27, 2021
1 parent d0c1d14 commit 92ca505
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 15 deletions.
7 changes: 5 additions & 2 deletions script/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,5 +62,8 @@ Make sure that only a single instance of the proxy is started.
## Consul parameters (tuning option)

### TfScheduler
*MaxNumTfInBuilding* (`epn/data-dist/parameters/TfScheduler/MaxNumTfInBuilding`)
Override the number of TFs each TfBuilder is allowed to build concurrently. This variable can be increased if a small number of EPNs is used.
*MaxNumTfInBuilding* (`epn/data-dist/parameters/TfScheduler/MaxNumTfInBuilding`)
Override the number of TFs each TfBuilder is allowed to build concurrently. This variable can be increased if a small number of EPNs is used.

*StaleStfTimeoutMs* (`epn/data-dist/parameters/TfScheduler/StaleStfTimeoutMs`)
Timeout (milliseconds) at which point the non-complete TFs will be scheduled for building or deletion. Default is 5000 (5s)
38 changes: 25 additions & 13 deletions src/TfScheduler/TfSchedulerStfInfo.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -154,10 +154,16 @@ void TfSchedulerStfInfo::StaleCleanupThread()

std::vector<StfInfo> lStfInfos;

auto lStaleStfTimeoutMs = mDiscoveryConfig->getUInt64Param(cStaleStfTimeoutMsKey, 5000); // 5 seconds

while (mRunning) {
std::this_thread::sleep_for(std::chrono::seconds(sStfDiscardTimeout));
std::this_thread::sleep_for(std::chrono::milliseconds(250));
lLastDiscardTime = std::chrono::steady_clock::now();

// update housekeeping parameters
lStaleStfTimeoutMs = std::clamp(mDiscoveryConfig->getUInt64Param(cStaleStfTimeoutMsKey, 5000), std::uint64_t(500), std::uint64_t(30000));
DDDLOG_RL(10000, "Dropping stale STFs parameter (consul): StaleStfCount={}", lStaleStfTimeoutMs);

lStfsToErase.clear();
{
std::unique_lock lLock(mGlobalStfInfoLock);
Expand All @@ -175,29 +181,30 @@ void TfSchedulerStfInfo::StaleCleanupThread()
continue;
}

// check reap
// check and reap
const auto &lLastStfInfo = lStfInfoVec.back();
const auto lTimeDiff = std::chrono::abs(lLastStfInfo.mUpdateLocalTime - lLastDiscardTime);
if (lTimeDiff > sStfDiscardTimeout) {
WDDLOG_RL(1000, "Discarding incomplete SubTimeFrame. stf_id={} received={} expected={}",
lStfId, lStfInfoVec.size(), lNumStfSenders);

if (lTimeDiff > std::chrono::milliseconds(lStaleStfTimeoutMs)) {
EDDLOG_RL(1000, "Discarding incomplete TimeFrame. stf_id={} received={} expected={} lastUpdateMs={}",
lStfId, lStfInfoVec.size(), lNumStfSenders, std::chrono::duration_cast<std::chrono::milliseconds>(lTimeDiff).count());

// erase the stale stf
lStfsToErase.push_back(lStfId);

// find missing StfSenders
std::set<std::string> lMissingStfSenders = lStfSenderIdSet;

for (const auto &lUpdate : lStfInfoVec) {
lMissingStfSenders.erase(lUpdate.process_id());
}

std::string lMissingIds = boost::algorithm::join(lMissingStfSenders, ", ");
DDDLOG("Missing STFs from StfSender IDs: {}", lMissingIds);
EDDLOG_RL(5000, "Missing STFs from StfSender IDs: {}", lMissingIds);

for (const auto &lStf : lMissingStfSenders) {
lStfSenderMissingCnt[lStf]++;
}

lStfsToErase.push_back(lStfId);
DDDLOG_RL(1000, "Missing STFs from StfSender IDs: {}", boost::algorithm::join(lMissingStfSenders, ", "));
} else {
break; // stop checking as soon the early STFs are within the stale timeout
}
}

Expand All @@ -208,8 +215,13 @@ void TfSchedulerStfInfo::StaleCleanupThread()
}

if (lStfsToErase.size() > 0) {
WDDLOG("SchedulingThread: TFs have been discarded due to incomplete number of STFs. discarded_tf_count={}",
lStfsToErase.size());
static std::uint64_t mStaleTfCount = 0;
mStaleTfCount += lStfsToErase.size();

WDDLOG_RL(2000, "StaleStfDropThread: TFs have been discarded due to incomplete number of STFs. discarded_tf_count={} total={}",
lStfsToErase.size(), mStaleTfCount);
DDMON("tfscheduler", "tf.rejected.stale_not_completed_stf", mStaleTfCount);
DDMON("tfscheduler", "tf.rejected.total", mNotScheduledTfsCount);

for (const auto &lStfSenderCnt : lStfSenderMissingCnt) {
if (lStfSenderCnt.second > 0) {
Expand Down
2 changes: 2 additions & 0 deletions src/TfScheduler/TfSchedulerStfInfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ struct TopoStfInfo {

class TfSchedulerStfInfo
{
static constexpr std::string_view cStaleStfTimeoutMsKey = "StaleStfTimeoutMs";

public:
TfSchedulerStfInfo() = delete;
TfSchedulerStfInfo(std::shared_ptr<ConsulTfSchedulerInstance> pDiscoveryConfig,
Expand Down

0 comments on commit 92ca505

Please sign in to comment.