Skip to content

Commit

Permalink
Optional ordering of objects in CCDBPopulator
Browse files Browse the repository at this point in the history
If an option --ordering-latency <T> with positive value T (in ms) is provided than every
incoming object will be buffered and uploaded only if no object with the same CCDB path
and earlier start of validity was received in preceding T ms.
All remaining cached objects are uploaded at EOR (or stop() method call).
  • Loading branch information
shahor02 committed Nov 18, 2024
1 parent 9019955 commit 348b194
Show file tree
Hide file tree
Showing 3 changed files with 220 additions and 109 deletions.
23 changes: 23 additions & 0 deletions CCDB/include/CCDB/CcdbObjectInfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,16 @@ class CcdbObjectInfo
[[nodiscard]] long getEndValidityTimestamp() const { return mEnd; }
void setEndValidityTimestamp(long end) { mEnd = end; }

bool operator<(const CcdbObjectInfo& other) const
{
return mStart < other.mStart;
}

bool operator>(const CcdbObjectInfo& other) const
{
return mStart > other.mStart;
}

private:
std::string mObjType{}; // object type (e.g. class)
std::string mFileName{}; // file name in the CCDB
Expand All @@ -107,4 +117,17 @@ class CcdbObjectInfo

} // namespace o2::ccdb

namespace std
{
// defining std::hash for InteractionRecord to be used with std containers
template <>
struct hash<o2::ccdb::CcdbObjectInfo> {
public:
size_t operator()(const o2::ccdb::CcdbObjectInfo& info) const
{
return info.getStartValidityTimestamp();
}
};
} // namespace std

#endif // O2_CCDB_CCDBOBJECTINFO_H_
6 changes: 5 additions & 1 deletion Detectors/Calibration/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,11 @@ o2-calibration-ccdb-populator-workflow --sspec-min 0 --sspec-max 1 -b
then the `ObjA` will be uploaded only to the default server (`http://alice-ccdb.cern.ch`), `ObjB` will be uploaded to both default and `local` server and
`ObjC` will be uploaded to the `local` server only.
By default the ccdb-populator-workflow will not produce `fatal` on failed upload. To require it an option `--fatal-on-failure` can be used.
By default the `ccdb-populator-workflow` will not produce `fatal` on failed upload. To require it an option `--fatal-on-failure` can be used.
By default the `ccdb-populator-workflow` uploads objects as it gets them. In case there is a danger that objects of the same URL will arrive in the order not sorted in SOV
(which may lead to screaning of the object with later SOV by other object if earlier ROF) one can use an option `--ordering-latency <N milliseconds>` of the `ccdb-populator-workflow`.
Then every incoming object will be buffered and uploaded only if no object with the same CCDB path and earlier start of validity was received in preceding N milliseconds. All remaining cached objects are uploaded at EOR (or stop() method call).
<!-- doxy
* \subpage refDetectorsCalibrationtestMacros
Expand Down
300 changes: 192 additions & 108 deletions Detectors/Calibration/workflow/CCDBPopulatorSpec.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
#include "CommonUtils/NameConf.h"
#include <unordered_map>
#include <chrono>
#include <vector>
#include <utility>
#include <map>

namespace o2
{
Expand All @@ -39,132 +42,212 @@ namespace calibration

class CCDBPopulator : public o2::framework::Task
{
public:
using CcdbObjectInfo = o2::ccdb::CcdbObjectInfo;
using CcdbApi = o2::ccdb::CcdbApi;

public:
void init(o2::framework::InitContext& ic) final
{
mCCDBpath = ic.options().get<std::string>("ccdb-path");
mSSpecMin = ic.options().get<std::int64_t>("sspec-min");
mSSpecMax = ic.options().get<std::int64_t>("sspec-max");
mFatalOnFailure = ic.options().get<bool>("fatal-on-failure");
mValidateUpload = ic.options().get<bool>("validate-upload");
mThrottlingDelayMS = ic.options().get<std::int64_t>("throttling-delay");
mAPI.init(mCCDBpath);
}
using BLOB = std::vector<char>;
using TBLOB = std::pair<long, BLOB>; // pair of creation time and object to upload
using OBJCACHE = std::map<CcdbObjectInfo, TBLOB>;

void init(o2::framework::InitContext& ic) final;
void run(o2::framework::ProcessingContext& pc) final;
void endOfStream(o2::framework::EndOfStreamContext& ec) final;
void stop() final;

void checkCache(long delay);
void doUpload(const CcdbObjectInfo& wrp, const gsl::span<const char>& pld, bool cached = false);
void logAsNeeded(long nowMS, const std::string& path, std::string& msg);

void run(o2::framework::ProcessingContext& pc) final
{
int nSlots = pc.inputs().getNofParts(0);
if (nSlots != pc.inputs().getNofParts(1)) {
LOGP(alarm, "Number of slots={} in part0 is different from that ({}) in part1", nSlots, pc.inputs().getNofParts(1));
return;
} else if (nSlots == 0) {
LOG(alarm) << "0 slots received";
return;
private:
CcdbApi mAPI;
long mThrottlingDelayMS = 0; // LOG(important) at most once per this period for given path
int mOrderingLatencyMS = -1; // if >0, bufferize and upload if no object with smaller SOV was received in this time interval in ms
bool mFatalOnFailure = true; // produce fatal on failed upload
bool mValidateUpload = false; // validate upload by querying its headers
bool mEnded = false;
std::unordered_map<std::string, std::pair<long, int>> mThrottling;
std::unordered_map<std::string, OBJCACHE> mOrdCache;
std::int64_t mSSpecMin = -1; // min subspec to accept
std::int64_t mSSpecMax = -1; // max subspec to accept
std::string mCCDBpath = "http://ccdb-test.cern.ch:8080"; // CCDB path
int mRunNoFromDH = 0;
std::string mRunNoStr = {};
};

void CCDBPopulator::init(o2::framework::InitContext& ic)
{
mCCDBpath = ic.options().get<std::string>("ccdb-path");
mSSpecMin = ic.options().get<std::int64_t>("sspec-min");
mSSpecMax = ic.options().get<std::int64_t>("sspec-max");
mFatalOnFailure = ic.options().get<bool>("fatal-on-failure");
mValidateUpload = ic.options().get<bool>("validate-upload");
mThrottlingDelayMS = ic.options().get<std::int64_t>("throttling-delay");
mOrderingLatencyMS = ic.options().get<int>("ordering-latency");
mAPI.init(mCCDBpath);
}

void CCDBPopulator::run(o2::framework::ProcessingContext& pc)
{
int nSlots = pc.inputs().getNofParts(0);
if (nSlots != pc.inputs().getNofParts(1)) {
LOGP(alarm, "Number of slots={} in part0 is different from that ({}) in part1", nSlots, pc.inputs().getNofParts(1));
return;
} else if (nSlots == 0) {
LOG(alarm) << "0 slots received";
return;
}
mRunNoFromDH = pc.services().get<o2::framework::TimingInfo>().runNumber;
if (mRunNoFromDH > 0) {
mRunNoStr = std::to_string(mRunNoFromDH);
}
auto nowMS = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
for (int isl = 0; isl < nSlots; isl++) {
auto refWrp = pc.inputs().get("clbWrapper", isl);
auto refPld = pc.inputs().get("clbPayload", isl);
if (!o2::framework::DataRefUtils::isValid(refWrp)) {
LOGP(alarm, "Wrapper is not valid for slot {}", isl);
continue;
}
auto runNoFromDH = pc.services().get<o2::framework::TimingInfo>().runNumber;
std::string runNoStr;
if (runNoFromDH > 0) {
runNoStr = std::to_string(runNoFromDH);
if (!o2::framework::DataRefUtils::isValid(refPld)) {
LOGP(alarm, "Payload is not valid for slot {}", isl);
continue;
}
auto nowMS = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
std::map<std::string, std::string> metadata;
for (int isl = 0; isl < nSlots; isl++) {
auto refWrp = pc.inputs().get("clbWrapper", isl);
auto refPld = pc.inputs().get("clbPayload", isl);
if (!o2::framework::DataRefUtils::isValid(refWrp)) {
LOGP(alarm, "Wrapper is not valid for slot {}", isl);
continue;
}
if (!o2::framework::DataRefUtils::isValid(refPld)) {
LOGP(alarm, "Payload is not valid for slot {}", isl);
if (mSSpecMin >= 0 && mSSpecMin <= mSSpecMax) { // there is a selection
auto ss = std::int64_t(o2::framework::DataRefUtils::getHeader<o2::header::DataHeader*>(refWrp)->subSpecification);
if (ss < mSSpecMin || ss > mSSpecMax) {
continue;
}
if (mSSpecMin >= 0 && mSSpecMin <= mSSpecMax) { // there is a selection
auto ss = std::int64_t(o2::framework::DataRefUtils::getHeader<o2::header::DataHeader*>(refWrp)->subSpecification);
if (ss < mSSpecMin || ss > mSSpecMax) {
continue;
}
}
const auto wrp = pc.inputs().get<CcdbObjectInfo*>(refWrp);
const auto pld = pc.inputs().get<gsl::span<char>>(refPld); // this is actually an image of TMemFile
if (!wrp) {
LOGP(alarm, "No CcdbObjectInfo info for {} at slot {}",
o2::framework::DataRefUtils::getHeader<o2::header::DataHeader*>(refWrp)->dataDescription.as<std::string>(), isl);
continue;
}
const auto* md = &wrp->getMetaData();
if (runNoFromDH > 0 && md->find(o2::base::NameConf::CCDBRunTag.data()) == md->end()) { // if valid run number is provided and it is not filled in the metadata, add it to the clone
metadata = *md; // clone since the md from the message is const
metadata[o2::base::NameConf::CCDBRunTag.data()] = runNoStr;
md = &metadata;
}
std::string msg = fmt::format("Storing in ccdb {}/{} of size {} valid for {} : {}", wrp->getPath(), wrp->getFileName(), pld.size(), wrp->getStartValidityTimestamp(), wrp->getEndValidityTimestamp());
auto& lastLog = mThrottling[wrp->getPath()];
if (lastLog.first + mThrottlingDelayMS < nowMS) {
if (lastLog.second) {
msg += fmt::format(" ({} uploads were logged as INFO)", lastLog.second);
lastLog.second = 0;
}
lastLog.first = nowMS;
LOG(important) << msg;
}
const auto wrp = pc.inputs().get<CcdbObjectInfo*>(refWrp);
const auto pld = pc.inputs().get<gsl::span<char>>(refPld); // this is actually an image of TMemFile
if (!wrp) {
LOGP(alarm, "No CcdbObjectInfo info for {} at slot {}",
o2::framework::DataRefUtils::getHeader<o2::header::DataHeader*>(refWrp)->dataDescription.as<std::string>(), isl);
continue;
}
if (mOrderingLatencyMS <= 0) { // ordering is not requested
doUpload(*wrp, pld);
} else {
auto& pathCache = mOrdCache[wrp->getPath()];
auto stt = pathCache.emplace(*wrp, std::make_pair(nowMS, std::vector<char>(pld.size())));
if (stt.second) { // insertion success
stt.first->second.second.assign(pld.begin(), pld.end());
std::string msg = fmt::format("Bufferizing for ordering ccdb object {}/{} of size {} valid for {} : {}",
wrp->getPath(), wrp->getFileName(), pld.size(), wrp->getStartValidityTimestamp(), wrp->getEndValidityTimestamp());
logAsNeeded(nowMS, wrp->getPath(), msg);
} else {
lastLog.second++;
LOG(info) << msg;
bool v = stt.first != pathCache.end();
LOGP(error, "failed to bufferize a {} object with SOV={}/EOV={} received at {}, conflicting with previously bufferized one SOV={}/EOV={} received at {}",
wrp->getPath(), wrp->getStartValidityTimestamp(), wrp->getEndValidityTimestamp(), nowMS,
v ? std::to_string(stt.first->first.getStartValidityTimestamp()) : std::string{"N/A"},
v ? std::to_string(stt.first->first.getEndValidityTimestamp()) : std::string{"N/A"},
v ? std::to_string(stt.first->second.first) : std::string{"N/A"});
}
}
}
if (mOrderingLatencyMS > 0) {
checkCache(mOrderingLatencyMS);
}
}

auto uploadTS = o2::ccdb::getCurrentTimestamp();

int res = mAPI.storeAsBinaryFile(&pld[0], pld.size(), wrp->getFileName(), wrp->getObjectType(), wrp->getPath(),
*md, wrp->getStartValidityTimestamp(), wrp->getEndValidityTimestamp());
if (res) {
if (mFatalOnFailure) {
LOGP(fatal, "failed on uploading to {} / {} for [{}:{}]", mAPI.getURL(), wrp->getPath(), wrp->getStartValidityTimestamp(), wrp->getEndValidityTimestamp());
} else {
LOGP(error, "failed on uploading to {} / {} for [{}:{}]", mAPI.getURL(), wrp->getPath(), wrp->getStartValidityTimestamp(), wrp->getEndValidityTimestamp());
}
}
// do we need to override previous object?
if (wrp->isAdjustableEOV() && !mAPI.isSnapshotMode()) {
o2::ccdb::adjustOverriddenEOV(mAPI, *wrp.get());
void CCDBPopulator::checkCache(long delay)
{
// check if some entries in cache are ripe enough to upload
auto nowMS = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
for (auto& pathCache : mOrdCache) { // loop over paths
if (delay < 0 && pathCache.second.size()) {
LOGP(important, "Uploading {} cached objects for path {}", pathCache.second.size(), pathCache.first);
}
for (auto it = pathCache.second.begin(); it != pathCache.second.end();) { // loop over objects of the path
if (nowMS - it->second.first > delay) {
doUpload(it->first, {it->second.second.data(), it->second.second.size()}, true);
it = pathCache.second.erase(it);
} else {
break;
}
// if requested, make sure that the new object can be queried
if (mValidateUpload || wrp->getValidateUpload()) {
constexpr long MAXDESYNC = 3;
auto headers = mAPI.retrieveHeaders(wrp->getPath(), {}, wrp->getStartValidityTimestamp() + (wrp->getEndValidityTimestamp() - wrp->getStartValidityTimestamp()) / 2);
if (headers.empty() ||
std::atol(headers["Created"].c_str()) < uploadTS - MAXDESYNC ||
std::atol(headers["Valid-From"].c_str()) != wrp->getStartValidityTimestamp() ||
std::atol(headers["Valid-Until"].c_str()) != wrp->getEndValidityTimestamp()) {
if (mFatalOnFailure) {
LOGP(fatal, "Failed to validate upload to {} / {} for [{}:{}]", mAPI.getURL(), wrp->getPath(), wrp->getStartValidityTimestamp(), wrp->getEndValidityTimestamp());
} else {
LOGP(error, "Failed to validate upload to {} / {} for [{}:{}]", mAPI.getURL(), wrp->getPath(), wrp->getStartValidityTimestamp(), wrp->getEndValidityTimestamp());
}
} else {
LOGP(important, "Validated upload to {} / {} for [{}:{}]", mAPI.getURL(), wrp->getPath(), wrp->getStartValidityTimestamp(), wrp->getEndValidityTimestamp());
}
}
}
}

void CCDBPopulator::doUpload(const CcdbObjectInfo& wrp, const gsl::span<const char>& pld, bool cached)
{
std::string msg = fmt::format("Storing in ccdb {}{}/{} of size {} valid for {} : {}", cached ? "cached " : "", wrp.getPath(), wrp.getFileName(), pld.size(), wrp.getStartValidityTimestamp(), wrp.getEndValidityTimestamp());
auto uploadTS = o2::ccdb::getCurrentTimestamp();
logAsNeeded(uploadTS, wrp.getPath(), msg);
std::map<std::string, std::string> metadata;
const auto* md = &wrp.getMetaData();
if (mRunNoFromDH > 0 && md->find(o2::base::NameConf::CCDBRunTag.data()) == md->end()) { // if valid run number is provided and it is not filled in the metadata, add it to the clone
metadata = *md; // clone since the md from the message is const
metadata[o2::base::NameConf::CCDBRunTag.data()] = mRunNoStr;
md = &metadata;
}
int res = mAPI.storeAsBinaryFile(&pld[0], pld.size(), wrp.getFileName(), wrp.getObjectType(), wrp.getPath(), *md, wrp.getStartValidityTimestamp(), wrp.getEndValidityTimestamp());
if (res) {
if (mFatalOnFailure) {
LOGP(fatal, "failed on uploading to {} / {} for [{}:{}]", mAPI.getURL(), wrp.getPath(), wrp.getStartValidityTimestamp(), wrp.getEndValidityTimestamp());
} else {
LOGP(error, "failed on uploading to {} / {} for [{}:{}]", mAPI.getURL(), wrp.getPath(), wrp.getStartValidityTimestamp(), wrp.getEndValidityTimestamp());
}
}
// if requested, make sure that the new object can be queried
if (mValidateUpload || wrp.getValidateUpload()) {
constexpr long MAXDESYNC = 3;
auto headers = mAPI.retrieveHeaders(wrp.getPath(), {}, wrp.getStartValidityTimestamp() + (wrp.getEndValidityTimestamp() - wrp.getStartValidityTimestamp()) / 2);
if (headers.empty() ||
std::atol(headers["Created"].c_str()) < uploadTS - MAXDESYNC ||
std::atol(headers["Valid-From"].c_str()) != wrp.getStartValidityTimestamp() ||
std::atol(headers["Valid-Until"].c_str()) != wrp.getEndValidityTimestamp()) {
if (mFatalOnFailure) {
LOGP(fatal, "Failed to validate upload to {} / {} for [{}:{}]", mAPI.getURL(), wrp.getPath(), wrp.getStartValidityTimestamp(), wrp.getEndValidityTimestamp());
} else {
LOGP(error, "Failed to validate upload to {} / {} for [{}:{}]", mAPI.getURL(), wrp.getPath(), wrp.getStartValidityTimestamp(), wrp.getEndValidityTimestamp());
}
} else {
LOGP(important, "Validated upload to {} / {} for [{}:{}]", mAPI.getURL(), wrp.getPath(), wrp.getStartValidityTimestamp(), wrp.getEndValidityTimestamp());
}
}
}

void CCDBPopulator::logAsNeeded(long nowMS, const std::string& path, std::string& msg)
{
auto& lastLog = mThrottling[path];
if (lastLog.first + mThrottlingDelayMS < nowMS) {
if (lastLog.second) {
msg += fmt::format(" ({} uploads were logged as INFO)", lastLog.second);
lastLog.second = 0;
}
lastLog.first = nowMS;
LOG(important) << msg;
} else {
lastLog.second++;
LOG(info) << msg;
}
}

void endOfStream(o2::framework::EndOfStreamContext& ec) final
{
LOG(info) << "EndOfStream received";
void CCDBPopulator::endOfStream(o2::framework::EndOfStreamContext& ec)
{
if (mEnded) {
return;
}
mEnded = true;
LOG(info) << "EndOfStream received";
if (mOrderingLatencyMS > 0) {
checkCache(-mOrderingLatencyMS); // force
}
}

private:
CcdbApi mAPI;
long mThrottlingDelayMS = 0; // LOG(important) at most once per this period for given path
bool mFatalOnFailure = true; // produce fatal on failed upload
bool mValidateUpload = false; // validate upload by querying its headers
std::unordered_map<std::string, std::pair<long, int>> mThrottling;
std::int64_t mSSpecMin = -1; // min subspec to accept
std::int64_t mSSpecMax = -1; // max subspec to accept
std::string mCCDBpath = "http://ccdb-test.cern.ch:8080"; // CCDB path
};
void CCDBPopulator::stop()
{
if (mEnded) {
return;
}
mEnded = true;
LOG(info) << "Forced stop";
if (mOrderingLatencyMS > 0) {
checkCache(-mOrderingLatencyMS); // force
}
}

} // namespace calibration

Expand All @@ -186,6 +269,7 @@ DataProcessorSpec getCCDBPopulatorDeviceSpec(const std::string& defCCDB, const s
{"ccdb-path", VariantType::String, defCCDB, {"Path to CCDB"}},
{"sspec-min", VariantType::Int64, -1L, {"min subspec to accept"}},
{"sspec-max", VariantType::Int64, -1L, {"max subspec to accept"}},
{"ordering-latency", VariantType::Int, -1, {"if enabled (positive) bufferize object and upload it if no object with smaller SOV received in given waiting time (ms)"}},
{"throttling-delay", VariantType::Int64, 300000L, {"produce important type log at most once per this period in ms for each CCDB path"}},
{"validate-upload", VariantType::Bool, false, {"valider upload by querying its headers"}},
{"fatal-on-failure", VariantType::Bool, false, {"do not produce fatal on failed upload"}}}};
Expand Down

0 comments on commit 348b194

Please sign in to comment.