Skip to content

Commit

Permalink
Optionally select CTFs in timestamps or orbits range
Browse files Browse the repository at this point in the history
New option
--run-time-span-file <text file with run range_min range_max entries>
allows to push to DPL only those TFs which overlap with the
<runnumber> <range-min> <range-max>
(separators can be any whitespace, comma or semicolon) records provided
via text file (assuming that there are some entries for a given run,
otherwise the option is ignored).

Multiple ranges per run and multiple runs can be mentioned in a single
input file. The range limits can be indicated either as a UNIX
timestamp in ms or as an orbit number (in the fill the run belongs to).

In case an option --invert-irframe-selection is provided, the selections above are inverted:
TFs matching some of the provided ranges will be discarded, while the rest will be pushed to the DPL

At the end of the processing the ctf-writer will create a local file ctf_read_ntf.txt containing only
the number of TFs pushed to the DPL.
In case no TF passed the selections above, this file will contain 0.
  • Loading branch information
shahor02 committed Dec 3, 2024
1 parent 950b8b7 commit 4bffbfa
Show file tree
Hide file tree
Showing 6 changed files with 208 additions and 23 deletions.
2 changes: 2 additions & 0 deletions Common/Utils/include/CommonUtils/IRFrameSelector.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ class IRFrameSelector
auto getIRFrames() const { return mFrames; }
bool isSet() const { return mIsSet; }

void setOwnList(const std::vector<o2::dataformats::IRFrame>& lst, bool toBeSorted);

private:
gsl::span<const o2::dataformats::IRFrame> mFrames{}; // externally provided span of IRFrames, must be sorted in IRFrame.getMin()
o2::dataformats::IRFrame mLastIRFrameChecked{}; // last frame which was checked
Expand Down
12 changes: 12 additions & 0 deletions Common/Utils/src/IRFrameSelector.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,16 @@ size_t IRFrameSelector::loadIRFrames(const std::string& fname)
return mOwnList.size();
}

void IRFrameSelector::setOwnList(const std::vector<o2::dataformats::IRFrame>& lst, bool toBeSorted)
{
clear();
mOwnList.insert(mOwnList.end(), lst.begin(), lst.end());
if (toBeSorted) {
std::sort(mOwnList.begin(), mOwnList.end(), [](const auto& a, const auto& b) { return a.getMin() < b.getMin(); });
}
setSelectedIRFrames(mOwnList, 0, 0, 0, false);
}

void IRFrameSelector::print(bool lst) const
{
LOGP(info, "Last query stopped at entry {} for IRFrame {}:{}", mLastBoundID,
Expand All @@ -183,6 +193,8 @@ void IRFrameSelector::clear()
{
mIsSet = false;
mOwnList.clear();
mLastIRFrameChecked.getMin().clear(); // invalidate
mLastBoundID = -1;
mFrames = {};
}

Expand Down
48 changes: 37 additions & 11 deletions Detectors/CTF/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,14 @@ comma-separated list of detectors to read, Overrides skipDet
```
comma-separated list of detectors to skip
By default an exception will be thrown if detector is requested but missing in the CTF. To enable injection of the empty output in such case one should use option `--allow-missing-detectors`.
```
--ctf-data-subspec arg (=0)
```
allows to alter the `subSpecification` used to send the CTFDATA from the reader to decoders. Non-0 value must be used in case the data extracted by the CTF-reader should be processed and stored in new CTFs (in order to avoid clash of CTFDATA messages of the reader and writer).
```
--max-tf arg (=-1)
```
Expand Down Expand Up @@ -141,31 +149,34 @@ There is a possibility to read remote root files directly, w/o caching them loca
2) provide proper regex to define remote files, e.g. for the example above: `--remote-regex "^root://.+/eos/aliceo2/.+"`.
3) pass an option `--copy-cmd no-copy`.
## Selective TF reading
```
--select-ctf-ids <id's of CTFs to select>
```
This is a `ctf-reader` device local option allowing selective reading of particular CTFs. It is useful when dealing with CTF files containing multiple TFs. The comma-separated list of increasing CTFs indices must be provided in the format parsed by the `RangeTokenizer<int>`, e.g. `1,4-6,...`.
Note that the index corresponds not to the entry of the TF in the CTF tree but to the reader own counter incremented throught all input files (e.g. if the 10 CTF files with 20 TFs each are provided for the input and the selection of TFs
`0,2,22,66` is provided, the reader will inject to the DPL the TFs at entries 0 and 2 from the 1st CTF file, entry 5 of the second file, entry 6 of the 3d and will finish the job.
For the ITS and MFT entropy decoding one can request either to decompose clusters to digits and send them instead of clusters (via `o2-ctf-reader-workflow` global options `--its-digits` and `--mft-digits` respectively)
or to apply the noise mask to decoded clusters (or decoded digits). If the masking (e.g. via option `--its-entropy-decoder " --mask-noise "`) is requested, user should provide to the entropy decoder the noise mask file (eventually will be loaded from CCDB) and cluster patterns decoding dictionary (if the clusters were encoded with patterns IDs).
For example,
```
o2-ctf-reader-workflow --ctf-input <ctfFiles> --onlyDet ITS,MFT --its-entropy-decoder ' --mask-noise' | ...
--ir-frames-files <root_file_with_IRFrames_to_select> --skip-skimmed-out-tf
```
will decode ITS and MFT data, decompose on the fly ITS clusters to digits, mask the noisy pixels with the provided masks, recluster remaining ITS digits and send the new clusters out, together with unchanged MFT clusters.
This option (used for skimming) allow to push to DPL only those TFs which overlap with selected BC-ranges provided via input root file (for various formats see `o2::utils::IRFrameSelector::loadIRFrames` method).
```
o2-ctf-reader-workflow --ctf-input <ctfFiles> --onlyDet ITS,MFT --mft-digits --mft-entropy-decoder ' --mask-noise' | ...
--ir-frames-files <root_file_with_IRFrames_to_select>
```
will send decompose clusters to digits and send ben out after masking the noise for the MFT, while ITS clusters will be sent as decoded.
By default an exception will be thrown if detector is requested but missing in the CTF. To enable injection of the empty output in such case one should use option `--allow-missing-detectors`.
This option allows to push to DPL only those TFs which overlap with the `<runnumber> <range-min> <range-max>` (separators can be any whitespace, comma or semicolon) records provided via text file (assuming that there are some entries for a given run, otherwise the option is ignored).
Multiple ranges per run and multiple runs can be mentioned in a single input file. The range limits can be indicated either as a UNIX timestamp in `ms` or as an orbit number (in the fill the run belongs to).
In case an option
```
--ctf-data-subspec arg (=0)
--invert-irframe-selection
```
allows to alter the `subSpecification` used to send the CTFDATA from the reader to decoders. Non-0 value must be used in case the data extracted by the CTF-reader should be processed and stored in new CTFs (in order to avoid clash of CTFDATA messages of the reader and writer).
is provided, the selections above are inverted: TFs matching some of the provided ranges will be discarded, while the rest will be pushed to the DPL
At the end of the processing the `ctf-writer` will create a local file `ctf_read_ntf.txt` containing only the number of TFs pushed to the DPL.
In case no TF passed the selections above, this file will contain 0.
## Support for externally provided encoding dictionaries
Expand Down Expand Up @@ -201,3 +212,18 @@ Additionally, one may throttle on the free SHM by providing an option to the rea
Note that by default the reader reads into the memory the CTF data and prepares all output messages but injects them only once the rate-limiter allows that.
With the option `--limit-tf-before-reading` set also the preparation of the data to inject will be conditioned by the green light from the rate-limiter.
## Modifying ITS/MFT CTF output
For the ITS and MFT entropy decoding one can request either to decompose clusters to digits and send them instead of clusters (via `o2-ctf-reader-workflow` global options `--its-digits` and `--mft-digits` respectively)
or to apply the noise mask to decoded clusters (or decoded digits). If the masking (e.g. via option `--its-entropy-decoder " --mask-noise "`) is requested, user should provide to the entropy decoder the noise mask file (eventually will be loaded from CCDB) and cluster patterns decoding dictionary (if the clusters were encoded with patterns IDs).
For example,
```
o2-ctf-reader-workflow --ctf-input <ctfFiles> --onlyDet ITS,MFT --its-entropy-decoder ' --mask-noise' | ...
```
will decode ITS and MFT data, decompose on the fly ITS clusters to digits, mask the noisy pixels with the provided masks, recluster remaining ITS digits and send the new clusters out, together with unchanged MFT clusters.
```
o2-ctf-reader-workflow --ctf-input <ctfFiles> --onlyDet ITS,MFT --mft-digits --mft-entropy-decoder ' --mask-noise' | ...
```
will send decompose clusters to digits and send ben out after masking the noise for the MFT, while ITS clusters will be sent as decoded.
2 changes: 2 additions & 0 deletions Detectors/CTF/workflow/include/CTFWorkflow/CTFReaderSpec.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@ struct CTFReaderInp {
std::string remoteRegex{};
std::string metricChannel{};
std::string fileIRFrames{};
std::string fileRunTimeSpans{};
std::vector<int> ctfIDs{};
bool skipSkimmedOutTF = false;
bool invertIRFramesSelection = false;
bool allowMissingDetectors = false;
bool checkTFLimitBeforeReading = false;
bool sup0xccdb = false;
Expand Down
157 changes: 145 additions & 12 deletions Detectors/CTF/workflow/src/CTFReaderSpec.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@
#include "DataFormatsZDC/CTF.h"
#include "DataFormatsHMP/CTF.h"
#include "DataFormatsCTP/CTF.h"
#include "DataFormatsParameters/AggregatedRunInfo.h"
#include "CCDB/BasicCCDBManager.h"
#include "CommonConstants/LHCConstants.h"
#include "Algorithm/RangeTokenizer.h"
#include <TStopwatch.h>
#include <fairmq/Device.h>
Expand Down Expand Up @@ -81,6 +84,8 @@ class CTFReaderSpec : public o2::framework::Task
void run(o2::framework::ProcessingContext& pc) final;

private:
void runTimeRangesToIRFrameSelector(const o2::framework::TimingInfo& timingInfo);
void loadRunTimeSpans(const std::string& flname);
void openCTFFile(const std::string& flname);
bool processTF(ProcessingContext& pc);
void checkTreeEntries();
Expand All @@ -91,16 +96,20 @@ class CTFReaderSpec : public o2::framework::Task
void tryToFixCTFHeader(CTFHeader& ctfHeader) const;
CTFReaderInp mInput{};
o2::utils::IRFrameSelector mIRFrameSelector; // optional IR frames selector
std::map<int, std::vector<std::pair<long, long>>> mRunTimeRanges;
std::unique_ptr<o2::utils::FileFetcher> mFileFetcher;
std::unique_ptr<TFile> mCTFFile;
std::unique_ptr<TTree> mCTFTree;
bool mRunning = false;
bool mUseLocalTFCounter = false;
int mConvRunTimeRangesToOrbits = -1; // not defined yet
int mCTFCounter = 0;
int mCTFCounterAcc = 0;
int mNFailedFiles = 0;
int mFilesRead = 0;
int mTFLength = 128;
int mNWaits = 0;
int mRunNumberPrev = -1;
long mTotalWaitTime = 0;
long mLastSendTime = 0L;
long mCurrTreeEntry = 0L;
Expand Down Expand Up @@ -129,8 +138,8 @@ void CTFReaderSpec::stopReader()
return;
}
LOGP(info, "CTFReader stops processing, {} files read, {} files failed", mFilesRead - mNFailedFiles, mNFailedFiles);
LOGP(info, "CTF reading total timing: Cpu: {:.3f} Real: {:.3f} s for {} TFs in {} loops, spent {:.2} s in {} data waiting states",
mTimer.CpuTime(), mTimer.RealTime(), mCTFCounter, mFileFetcher->getNLoops(), 1e-6 * mTotalWaitTime, mNWaits);
LOGP(info, "CTF reading total timing: Cpu: {:.3f} Real: {:.3f} s for {} TFs ({} accepted) in {} loops, spent {:.2} s in {} data waiting states",
mTimer.CpuTime(), mTimer.RealTime(), mCTFCounter, mCTFCounterAcc, mFileFetcher->getNLoops(), 1e-6 * mTotalWaitTime, mNWaits);
mRunning = false;
mFileFetcher->stop();
mFileFetcher.reset();
Expand Down Expand Up @@ -164,6 +173,111 @@ void CTFReaderSpec::init(InitContext& ic)
mTFLength = hbfu.nHBFPerTF;
LOGP(info, "IRFrames will be selected from {}, assumed TF length: {} HBF", mInput.fileIRFrames, mTFLength);
}
if (!mInput.fileRunTimeSpans.empty()) {
loadRunTimeSpans(mInput.fileRunTimeSpans);
}
}

void CTFReaderSpec::runTimeRangesToIRFrameSelector(const o2::framework::TimingInfo& timingInfo)
{
// convert entries in the runTimeRanges to IRFrameSelector, if needed, convert time to orbit
mIRFrameSelector.clear();
auto ent = mRunTimeRanges.find(timingInfo.runNumber);
if (ent == mRunTimeRanges.end()) {
LOGP(info, "RunTimeRanges selection was provided but run {} has no entries, all TFs will be processed", timingInfo.runNumber);
return;
}
o2::parameters::AggregatedRunInfo rinfo;
auto& ccdb = o2::ccdb::BasicCCDBManager::instance();
rinfo = o2::parameters::AggregatedRunInfo::buildAggregatedRunInfo(ccdb, timingInfo.runNumber);
if (rinfo.runNumber != timingInfo.runNumber || rinfo.orbitsPerTF < 1) {
LOGP(fatal, "failed to extract AggregatedRunInfo for run {}", timingInfo.runNumber);
}
mTFLength = rinfo.orbitsPerTF;
std::vector<o2::dataformats::IRFrame> frames;
for (const auto& rng : ent->second) {
long orbMin = 0, orbMax = 0;
if (mConvRunTimeRangesToOrbits > 0) {
orbMin = rinfo.orbitSOR + (rng.first - rinfo.sor) / (o2::constants::lhc::LHCOrbitMUS * 0.001);
orbMax = rinfo.orbitSOR + (rng.second - rinfo.sor) / (o2::constants::lhc::LHCOrbitMUS * 0.001);
} else {
orbMin = rng.first;
orbMax = rng.second;
}
if (orbMin < 0) {
orbMin = 0;
}
if (orbMax < 0) {
orbMax = 0;
}
if (timingInfo.runNumber > 523897) {
orbMin = (orbMin / rinfo.orbitsPerTF) * rinfo.orbitsPerTF;
orbMax = (orbMax / rinfo.orbitsPerTF + 1) * rinfo.orbitsPerTF - 1;
}
LOGP(info, "TFs overlapping with orbits {}:{} will be {}", orbMin, orbMax, mInput.invertIRFramesSelection ? "rejected" : "selected");
frames.emplace_back(InteractionRecord{0, uint32_t(orbMin)}, InteractionRecord{o2::constants::lhc::LHCMaxBunches, uint32_t(orbMax)});
}
mIRFrameSelector.setOwnList(frames, true);
}

void CTFReaderSpec::loadRunTimeSpans(const std::string& flname)
{
std::ifstream inputFile(flname);
if (!inputFile) {
LOGP(fatal, "Failed to open selected run/timespans file {}", mInput.fileRunTimeSpans);
}
std::string line;
size_t cntl = 0, cntr = 0;
while (std::getline(inputFile, line)) {
cntl++;
for (char& ch : line) { // Replace semicolons and tabs with spaces for uniform processing
if (ch == ';' || ch == '\t' || ch == ',') {
ch = ' ';
}
}
o2::utils::Str::trim(line);
if (line.size() < 1 || line[0] == '#') {
continue;
}
auto tokens = o2::utils::Str::tokenize(line, ' ');
auto logError = [&cntl, &line]() { LOGP(error, "Expected format for selection is tripplet <run> <range_min> <range_max>, failed on line#{}: {}", cntl, line); };
if (tokens.size() >= 3) {
int run = 0;
long rmin, rmax;
try {
run = std::stoi(tokens[0]);
rmin = std::stol(tokens[1]);
rmax = std::stol(tokens[2]);
} catch (...) {
logError();
continue;
}

constexpr long ISTimeStamp = 1514761200000L;
int convmn = rmin > ISTimeStamp ? 1 : 0, convmx = rmax > ISTimeStamp ? 1 : 0; // values above ISTimeStamp are timestamps (need to be converted to orbits)
if (rmin > rmax) {
LOGP(fatal, "Provided range limits are not in increasing order, entry is {}", line);
}
if (mConvRunTimeRangesToOrbits == -1) {
if (convmn != convmx) {
LOGP(fatal, "Provided range limits should be both consistent either with orbit number or with unix timestamp in ms, entry is {}", line);
}
mConvRunTimeRangesToOrbits = convmn; // need to convert to orbit if time
LOGP(info, "Interpret selected time-spans input as {}", mConvRunTimeRangesToOrbits == 1 ? "timstamps(ms)" : "orbits");
} else {
if (mConvRunTimeRangesToOrbits != convmn || mConvRunTimeRangesToOrbits != convmx) {
LOGP(fatal, "Provided range limits should are not consistent with previously determined {} input, entry is {}", mConvRunTimeRangesToOrbits == 1 ? "timestamps" : "orbits", line);
}
}

mRunTimeRanges[run].emplace_back(rmin, rmax);
cntr++;
} else {
logError();
}
}
LOGP(info, "Read {} time-spans for {} runs from {}", cntr, mRunTimeRanges.size(), mInput.fileRunTimeSpans);
inputFile.close();
}

///_______________________________________
Expand Down Expand Up @@ -256,6 +370,17 @@ void CTFReaderSpec::run(ProcessingContext& pc)
pc.services().get<ControlService>().endOfStream();
pc.services().get<ControlService>().readyToQuit(QuitRequest::Me);
stopReader();
const std::string dummy{"ctf_read_ntf.txt"};
if (mCTFCounterAcc == 0) {
LOGP(warn, "No TF passed selection, writing a 0 to file {}", dummy);
}
try {
std::ofstream outfile;
outfile.open(dummy, std::ios::out | std::ios::trunc);
outfile << mCTFCounterAcc << std::endl;
} catch (...) {
LOGP(error, "Failed to write {}", dummy);
}
}
}

Expand All @@ -278,7 +403,7 @@ bool CTFReaderSpec::processTF(ProcessingContext& pc)
}

if (mUseLocalTFCounter) {
ctfHeader.tfCounter = mCTFCounter;
ctfHeader.tfCounter = mCTFCounterAcc;
}

LOG(info) << ctfHeader;
Expand All @@ -289,19 +414,26 @@ bool CTFReaderSpec::processTF(ProcessingContext& pc)
timingInfo.tfCounter = ctfHeader.tfCounter;
timingInfo.runNumber = ctfHeader.run;

if (mRunTimeRanges.size() && timingInfo.runNumber != mRunNumberPrev) {
runTimeRangesToIRFrameSelector(timingInfo);
}
mRunNumberPrev = timingInfo.runNumber;

if (mIRFrameSelector.isSet()) {
o2::InteractionRecord ir0(0, timingInfo.firstTForbit);
// we cannot have GRPECS via DPL CCDB fetcher in the CTFReader, so we use mTFLength extracted from the HBFUtils
o2::InteractionRecord ir1(o2::constants::lhc::LHCMaxBunches - 1, timingInfo.firstTForbit < 0xffffffff - (mTFLength - 1) ? timingInfo.firstTForbit + (mTFLength - 1) : 0xffffffff);
auto irSpan = mIRFrameSelector.getMatchingFrames({ir0, ir1});
if (irSpan.size() == 0 && mInput.skipSkimmedOutTF) {
LOGP(info, "Skimming did not define any selection for TF [{}] : [{}]", ir0.asString(), ir1.asString());
bool acc = true;
if (mInput.skipSkimmedOutTF) {
acc = (irSpan.size() > 0) ? !mInput.invertIRFramesSelection : mInput.invertIRFramesSelection;
LOGP(info, "IRFrame selection contains {} frames for TF [{}] : [{}]: {}use this TF (selection inversion mode is {})",
irSpan.size(), ir0.asString(), ir1.asString(), acc ? "" : "do not ", mInput.invertIRFramesSelection ? "ON" : "OFF");
}
if (!acc) {
return false;
} else {
if (mInput.checkTFLimitBeforeReading) {
limiter.check(pc, mInput.tfRateLimit, mInput.minSHM);
}
LOGP(info, "{} IR-Frames are selected for TF [{}] : [{}]", irSpan.size(), ir0.asString(), ir1.asString());
}
if (mInput.checkTFLimitBeforeReading) {
limiter.check(pc, mInput.tfRateLimit, mInput.minSHM);
}
auto outVec = pc.outputs().make<std::vector<o2::dataformats::IRFrame>>(OutputRef{"selIRFrames"}, irSpan.begin(), irSpan.end());
} else {
Expand Down Expand Up @@ -329,6 +461,7 @@ bool CTFReaderSpec::processTF(ProcessingContext& pc)
processDetector<o2::cpv::CTF>(DetID::CPV, ctfHeader, pc);
processDetector<o2::zdc::CTF>(DetID::ZDC, ctfHeader, pc);
processDetector<o2::ctp::CTF>(DetID::CTP, ctfHeader, pc);
mCTFCounterAcc++;

// send sTF acknowledge message
if (!mInput.sup0xccdb) {
Expand Down Expand Up @@ -466,7 +599,7 @@ DataProcessorSpec getCTFReaderSpec(const CTFReaderInp& inp)
outputs.emplace_back(OutputLabel{det.getName()}, det.getDataOrigin(), "CTFDATA", inp.subspec, Lifetime::Timeframe);
}
}
if (!inp.fileIRFrames.empty()) {
if (!inp.fileIRFrames.empty() || !inp.fileRunTimeSpans.empty()) {
outputs.emplace_back(OutputLabel{"selIRFrames"}, "CTF", "SELIRFRAMES", 0, Lifetime::Timeframe);
}
if (!inp.sup0xccdb) {
Expand Down
Loading

0 comments on commit 4bffbfa

Please sign in to comment.