Skip to content

Commit

Permalink
DPL: allow configuring compression for the AOD writer (#13659)
Browse files Browse the repository at this point in the history
  • Loading branch information
ktf authored Nov 7, 2024
1 parent fec521f commit 2475cbc
Show file tree
Hide file tree
Showing 9 changed files with 103 additions and 15 deletions.
2 changes: 1 addition & 1 deletion Framework/Core/include/Framework/DataOutputDirector.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ struct DataOutputDirector {
std::vector<DataOutputDescriptor*> getDataOutputDescriptors(InputSpec spec);

// get the matching TFile
FileAndFolder getFileFolder(DataOutputDescriptor* dodesc, uint64_t folderNumber, std::string parentFileName);
FileAndFolder getFileFolder(DataOutputDescriptor* dodesc, uint64_t folderNumber, std::string parentFileName, int compression);

// check file sizes
bool checkFileSizes();
Expand Down
18 changes: 9 additions & 9 deletions Framework/Core/src/AnalysisSupportHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -318,10 +318,10 @@ DataProcessorSpec AnalysisSupportHelpers::getOutputObjHistSink(std::vector<Outpu
// add sink for the AODs
DataProcessorSpec
AnalysisSupportHelpers::getGlobalAODSink(std::shared_ptr<DataOutputDirector> dod,
std::vector<InputSpec> const& outputInputs)
std::vector<InputSpec> const& outputInputs, int compressionLevel)
{

auto writerFunction = [dod, outputInputs](InitContext& ic) -> std::function<void(ProcessingContext&)> {
auto writerFunction = [dod, outputInputs, compressionLevel](InitContext& ic) -> std::function<void(ProcessingContext&)> {
LOGP(debug, "======== getGlobalAODSink::Init ==========");

// find out if any table needs to be saved
Expand Down Expand Up @@ -363,7 +363,7 @@ DataProcessorSpec
std::vector<TString> aodMetaDataVals;

// this functor is called once per time frame
return [dod, tfNumbers, tfFilenames, aodMetaDataKeys, aodMetaDataVals](ProcessingContext& pc) mutable -> void {
return [dod, tfNumbers, tfFilenames, aodMetaDataKeys, aodMetaDataVals, compressionLevel](ProcessingContext& pc) mutable -> void {
LOGP(debug, "======== getGlobalAODSink::processing ==========");
LOGP(debug, " processing data set with {} entries", pc.inputs().size());

Expand Down Expand Up @@ -457,7 +457,7 @@ DataProcessorSpec
// a table can be saved in multiple ways
// e.g. different selections of columns to different files
for (auto d : ds) {
auto fileAndFolder = dod->getFileFolder(d, tfNumber, aodInputFile);
auto fileAndFolder = dod->getFileFolder(d, tfNumber, aodInputFile, compressionLevel);
auto treename = fileAndFolder.folderName + "/" + d->treename;
TableToTree ta2tr(table,
fileAndFolder.file,
Expand Down Expand Up @@ -495,11 +495,11 @@ DataProcessorSpec
// the command line options relevant for the writer are global
// see runDataProcessing.h
DataProcessorSpec spec{
"internal-dpl-aod-writer",
outputInputs,
Outputs{},
AlgorithmSpec(writerFunction),
{}};
.name = "internal-dpl-aod-writer",
.inputs = outputInputs,
.outputs = {},
.algorithm = AlgorithmSpec{writerFunction},
};

return spec;
}
Expand Down
2 changes: 1 addition & 1 deletion Framework/Core/src/AnalysisSupportHelpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ struct AnalysisSupportHelpers {
std::vector<OutputTaskInfo> const& tskmap);
/// writes inputs of kind AOD to file
static DataProcessorSpec getGlobalAODSink(std::shared_ptr<DataOutputDirector> dod,
std::vector<InputSpec> const& outputInputs);
std::vector<InputSpec> const& outputInputs, int compression);
};

}; // namespace o2::framework
Expand Down
7 changes: 6 additions & 1 deletion Framework/Core/src/ArrowSupport.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include "Headers/DataHeader.h"
#include "Headers/DataHeaderHelpers.h"

#include <RtypesCore.h>
#include <fairmq/ProgOptions.h>

#include <uv.h>
Expand Down Expand Up @@ -536,7 +537,11 @@ o2::framework::ServiceSpec ArrowSupport::arrowBackendSpec()
// add TFNumber and TFFilename as input to the writer
outputsInputsAOD.emplace_back("tfn", "TFN", "TFNumber");
outputsInputsAOD.emplace_back("tff", "TFF", "TFFilename");
workflow.push_back(AnalysisSupportHelpers::getGlobalAODSink(dod, outputsInputsAOD));
int compression = 505;
if (ctx.options().hasOption("aod-writer-compression")) {
compression = ctx.options().get<int>("aod-writer-compression");
}
workflow.push_back(AnalysisSupportHelpers::getGlobalAODSink(dod, outputsInputsAOD, compression));
}
// Move the dummy sink at the end, if needed
for (size_t i = 0; i < workflow.size(); ++i) {
Expand Down
1 change: 1 addition & 0 deletions Framework/Core/src/ConfigParamDiscovery.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ std::vector<ConfigParamSpec> ConfigParamDiscovery::discover(ConfigParamRegistry&
std::vector<char const*> capabilitiesSpecs = {
"O2Framework:DiscoverMetadataInAODCapability",
"O2Framework:DiscoverMetadataInCommandLineCapability",
"O2Framework:DiscoverAODOptionsInCommandLineCapability",
};

// Load all the requested plugins and discover what we can do.
Expand Down
4 changes: 2 additions & 2 deletions Framework/Core/src/DataOutputDirector.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ std::vector<DataOutputDescriptor*> DataOutputDirector::getDataOutputDescriptors(
return result;
}

FileAndFolder DataOutputDirector::getFileFolder(DataOutputDescriptor* dodesc, uint64_t folderNumber, std::string parentFileName)
FileAndFolder DataOutputDirector::getFileFolder(DataOutputDescriptor* dodesc, uint64_t folderNumber, std::string parentFileName, int compression)
{
// initialisation
FileAndFolder fileAndFolder;
Expand Down Expand Up @@ -488,7 +488,7 @@ FileAndFolder DataOutputDirector::getFileFolder(DataOutputDescriptor* dodesc, ui
auto fn = resdirname + "/" + mfilenameBases[ind] + ".root";
delete mfilePtrs[ind];
mParentMaps[ind]->Clear();
mfilePtrs[ind] = TFile::Open(fn.c_str(), mfileMode.c_str(), "", 505);
mfilePtrs[ind] = TFile::Open(fn.c_str(), mfileMode.c_str(), "", compression);
}
fileAndFolder.file = mfilePtrs[ind];

Expand Down
61 changes: 61 additions & 0 deletions Framework/Core/src/Plugin.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "Framework/Capability.h"
#include "Framework/Signpost.h"
#include "Framework/VariantJSONHelpers.h"
#include <cstddef>
#include <string_view>

O2_DECLARE_DYNAMIC_LOG(capabilities);
Expand Down Expand Up @@ -47,6 +48,19 @@ auto lookForCommandLineOptions = [](ConfigParamRegistry& registry, int argc, cha
return false;
};

auto lookForCommandLineAODOptions = [](ConfigParamRegistry& registry, int argc, char** argv) -> bool {
O2_SIGNPOST_ID_GENERATE(sid, capabilities);
// If one of the options for aod-writer is specified, we should allow configuring compression.
for (size_t i = 0; i < argc; i++) {
std::string_view arg = argv[i];
if (arg.starts_with("--aod-writer-")) {
O2_SIGNPOST_EVENT_EMIT(capabilities, sid, "DiscoverAODOptionsInCommandLineCapability", "AOD options found in arguments. Populating from them.");
return true;
}
}
return false;
};

struct DiscoverMetadataInAODCapability : o2::framework::CapabilityPlugin {
Capability* create() override
{
Expand All @@ -68,6 +82,16 @@ struct DiscoverMetadataInCommandLineCapability : o2::framework::CapabilityPlugin
}
};

struct DiscoverAODOptionsInCommandLineCapability : o2::framework::CapabilityPlugin {
Capability* create() override
{
return new Capability{
.name = "DiscoverAODOptionsInCommandLineCapability",
.checkIfNeeded = lookForCommandLineAODOptions,
.requiredPlugin = "O2Framework:DiscoverAODOptionsInCommandLine"};
}
};

struct DiscoverMetadataInCommandLine : o2::framework::ConfigDiscoveryPlugin {
ConfigDiscovery* create() override
{
Expand Down Expand Up @@ -99,9 +123,46 @@ struct DiscoverMetadataInCommandLine : o2::framework::ConfigDiscoveryPlugin {
}};
}
};

struct DiscoverAODOptionsInCommandLine : o2::framework::ConfigDiscoveryPlugin {
ConfigDiscovery* create() override
{
return new ConfigDiscovery{
.init = []() {},
.discover = [](ConfigParamRegistry& registry, int argc, char** argv) -> std::vector<ConfigParamSpec> {
O2_SIGNPOST_ID_GENERATE(sid, capabilities);
O2_SIGNPOST_EVENT_EMIT(capabilities, sid, "DiscoverAODOptionsInCommandLine",
"Discovering AOD handling related options in commandline arguments.");
std::vector<ConfigParamSpec> results;
bool injectOption = true;
for (size_t i = 0; i < argc; i++) {
std::string_view arg = argv[i];
if (!arg.starts_with("--aod-writer-")) {
continue;
}
std::string key = arg.data() + 2;
std::string value = argv[i + 1];
O2_SIGNPOST_EVENT_EMIT(capabilities, sid, "DiscoverAODOptionsInCommandLine",
"Found %{public}s with value %{public}s.", key.c_str(), value.c_str());
if (key == "aod-writer-compression") {
int numericValue = std::stoi(value);
results.push_back(ConfigParamSpec{"aod-writer-compression", VariantType::Int, numericValue, {"AOD Compression options"}});
injectOption = false;
}
}
if (injectOption) {
results.push_back(ConfigParamSpec{"aod-writer-compression", VariantType::Int, 505, {"AOD Compression options"}});
}
return results;
}};
}
};

DEFINE_DPL_PLUGINS_BEGIN
DEFINE_DPL_PLUGIN_INSTANCE(DiscoverMetadataInAODCapability, Capability);
DEFINE_DPL_PLUGIN_INSTANCE(DiscoverMetadataInCommandLineCapability, Capability);
DEFINE_DPL_PLUGIN_INSTANCE(DiscoverAODOptionsInCommandLineCapability, Capability);
DEFINE_DPL_PLUGIN_INSTANCE(DiscoverMetadataInCommandLine, ConfigDiscovery);
DEFINE_DPL_PLUGIN_INSTANCE(DiscoverAODOptionsInCommandLine, ConfigDiscovery);
DEFINE_DPL_PLUGINS_END
} // namespace o2::framework
6 changes: 5 additions & 1 deletion Framework/Core/src/WorkflowHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -606,7 +606,11 @@ void WorkflowHelpers::injectServiceDevices(WorkflowSpec& workflow, ConfigContext
// add TFNumber and TFFilename as input to the writer
outputsInputsAOD.emplace_back(InputSpec{"tfn", "TFN", "TFNumber"});
outputsInputsAOD.emplace_back(InputSpec{"tff", "TFF", "TFFilename"});
auto fileSink = AnalysisSupportHelpers::getGlobalAODSink(dod, outputsInputsAOD);
int compressionLevel = 505;
if (ctx.options().hasOption("aod-writer-compression")) {
compressionLevel = ctx.options().get<int>("aod-writer-compression");
}
auto fileSink = AnalysisSupportHelpers::getGlobalAODSink(dod, outputsInputsAOD, compressionLevel);
extraSpecs.push_back(fileSink);

auto it = std::find_if(outputsInputs.begin(), outputsInputs.end(), [](InputSpec& spec) -> bool {
Expand Down
17 changes: 17 additions & 0 deletions Framework/TestWorkflows/src/o2TestHistograms.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -22,26 +22,43 @@ using namespace o2;
using namespace o2::framework;
using namespace o2::framework::expressions;

namespace o2::aod
{
namespace skimmedExampleTrack
{
DECLARE_SOA_COLUMN(Pt, pt, float); //!
DECLARE_SOA_COLUMN(Eta, eta, float); //!
} // namespace skimmedExampleTrack

DECLARE_SOA_TABLE(SkimmedExampleTrack, "AOD", "SKIMEXTRK", //!
skimmedExampleTrack::Pt,
skimmedExampleTrack::Eta);
} // namespace o2::aod

struct EtaAndClsHistogramsSimple {
OutputObj<TH2F> etaClsH{TH2F("eta_vs_pt", "#eta vs pT", 102, -2.01, 2.01, 100, 0, 10)};
Produces<o2::aod::SkimmedExampleTrack> skimEx;

void process(aod::Tracks const& tracks)
{
LOGP(info, "Invoking the simple one");
for (auto& track : tracks) {
etaClsH->Fill(track.eta(), track.pt(), 0);
skimEx(track.pt(), track.eta());
}
}
};

struct EtaAndClsHistogramsIUSimple {
OutputObj<TH2F> etaClsH{TH2F("eta_vs_pt", "#eta vs pT", 102, -2.01, 2.01, 100, 0, 10)};
Produces<o2::aod::SkimmedExampleTrack> skimEx;

void process(aod::TracksIU const& tracks)
{
LOGP(info, "Invoking the simple one");
for (auto& track : tracks) {
etaClsH->Fill(track.eta(), track.pt(), 0);
skimEx(track.pt(), track.eta());
}
}
};
Expand Down

0 comments on commit 2475cbc

Please sign in to comment.