diff --git a/script/README.md b/script/README.md index 35e163c..723e5c4 100644 --- a/script/README.md +++ b/script/README.md @@ -62,5 +62,8 @@ Make sure that only a single instance of the proxy is started. ## Consul parameters (tuning option) ### TfScheduler - *MaxNumTfInBuilding* (`epn/data-dist/parameters/TfScheduler/MaxNumTfInBuilding`) - Override the number of TFs each TfBuilder is allowed to build concurrently. This variable can be increased if a small number of EPNs is used. + *MaxNumTfInBuilding* (`epn/data-dist/parameters/TfScheduler/MaxNumTfInBuilding`) + Override the number of TFs each TfBuilder is allowed to build concurrently. This variable can be increased if a small number of EPNs is used. + + *StaleStfTimeoutMs* (`epn/data-dist/parameters/TfScheduler/StaleStfTimeoutMs`) + Timeout (milliseconds) at which point the non-complete TFs will be scheduled for building or deletion. Default is 5000 (5s) \ No newline at end of file diff --git a/src/TfScheduler/TfSchedulerStfInfo.cxx b/src/TfScheduler/TfSchedulerStfInfo.cxx index 5224c89..c16251c 100644 --- a/src/TfScheduler/TfSchedulerStfInfo.cxx +++ b/src/TfScheduler/TfSchedulerStfInfo.cxx @@ -154,10 +154,16 @@ void TfSchedulerStfInfo::StaleCleanupThread() std::vector lStfInfos; + auto lStaleStfTimeoutMs = mDiscoveryConfig->getUInt64Param(cStaleStfTimeoutMsKey, 5000); // 5 seconds + while (mRunning) { - std::this_thread::sleep_for(std::chrono::seconds(sStfDiscardTimeout)); + std::this_thread::sleep_for(std::chrono::milliseconds(250)); lLastDiscardTime = std::chrono::steady_clock::now(); + // update housekeeping parameters + lStaleStfTimeoutMs = std::clamp(mDiscoveryConfig->getUInt64Param(cStaleStfTimeoutMsKey, 5000), std::uint64_t(500), std::uint64_t(30000)); + DDDLOG_RL(10000, "Dropping stale STFs parameter (consul): StaleStfCount={}", lStaleStfTimeoutMs); + lStfsToErase.clear(); { std::unique_lock lLock(mGlobalStfInfoLock); @@ -175,29 +181,30 @@ void TfSchedulerStfInfo::StaleCleanupThread() continue; } - // check reap + // check and reap const auto &lLastStfInfo = lStfInfoVec.back(); const auto lTimeDiff = std::chrono::abs(lLastStfInfo.mUpdateLocalTime - lLastDiscardTime); - if (lTimeDiff > sStfDiscardTimeout) { - WDDLOG_RL(1000, "Discarding incomplete SubTimeFrame. stf_id={} received={} expected={}", - lStfId, lStfInfoVec.size(), lNumStfSenders); + + if (lTimeDiff > std::chrono::milliseconds(lStaleStfTimeoutMs)) { + EDDLOG_RL(1000, "Discarding incomplete TimeFrame. stf_id={} received={} expected={} lastUpdateMs={}", + lStfId, lStfInfoVec.size(), lNumStfSenders, std::chrono::duration_cast(lTimeDiff).count()); + + // erase the stale stf + lStfsToErase.push_back(lStfId); // find missing StfSenders std::set lMissingStfSenders = lStfSenderIdSet; - for (const auto &lUpdate : lStfInfoVec) { lMissingStfSenders.erase(lUpdate.process_id()); } - std::string lMissingIds = boost::algorithm::join(lMissingStfSenders, ", "); - DDDLOG("Missing STFs from StfSender IDs: {}", lMissingIds); - EDDLOG_RL(5000, "Missing STFs from StfSender IDs: {}", lMissingIds); - for (const auto &lStf : lMissingStfSenders) { lStfSenderMissingCnt[lStf]++; } - lStfsToErase.push_back(lStfId); + DDDLOG_RL(1000, "Missing STFs from StfSender IDs: {}", boost::algorithm::join(lMissingStfSenders, ", ")); + } else { + break; // stop checking as soon the early STFs are within the stale timeout } } @@ -208,8 +215,13 @@ void TfSchedulerStfInfo::StaleCleanupThread() } if (lStfsToErase.size() > 0) { - WDDLOG("SchedulingThread: TFs have been discarded due to incomplete number of STFs. discarded_tf_count={}", - lStfsToErase.size()); + static std::uint64_t mStaleTfCount = 0; + mStaleTfCount += lStfsToErase.size(); + + WDDLOG_RL(2000, "StaleStfDropThread: TFs have been discarded due to incomplete number of STFs. discarded_tf_count={} total={}", + lStfsToErase.size(), mStaleTfCount); + DDMON("tfscheduler", "tf.rejected.stale_not_completed_stf", mStaleTfCount); + DDMON("tfscheduler", "tf.rejected.total", mNotScheduledTfsCount); for (const auto &lStfSenderCnt : lStfSenderMissingCnt) { if (lStfSenderCnt.second > 0) { diff --git a/src/TfScheduler/TfSchedulerStfInfo.h b/src/TfScheduler/TfSchedulerStfInfo.h index b77ffff..49fcf45 100644 --- a/src/TfScheduler/TfSchedulerStfInfo.h +++ b/src/TfScheduler/TfSchedulerStfInfo.h @@ -76,6 +76,8 @@ struct TopoStfInfo { class TfSchedulerStfInfo { + static constexpr std::string_view cStaleStfTimeoutMsKey = "StaleStfTimeoutMs"; + public: TfSchedulerStfInfo() = delete; TfSchedulerStfInfo(std::shared_ptr pDiscoveryConfig,