Skip to content

Commit

Permalink
[df] Add initial RNTuple snapshot implementation
Browse files Browse the repository at this point in the history
This initial implemenation only supports single-threaded snapshotting.
When the original datasource is an RNTuple, the resulting snapshot will
be an RNTuple by default. This can be changed to TTree through the
RSnapshotOptions. Snapshotting from TTree to RNTuple is not yet
supported in this version, but will be added in the future. Snapshotting
from other data sources or dataframes created from scratch to RNTuple is
supported.
  • Loading branch information
enirolf committed Aug 19, 2024
1 parent 195cbe8 commit c701f1f
Show file tree
Hide file tree
Showing 10 changed files with 826 additions and 56 deletions.
191 changes: 179 additions & 12 deletions tree/dataframe/inc/ROOT/RDF/ActionHelpers.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
#include "TClassRef.h"
#include "TDirectory.h"
#include "TError.h" // for R__ASSERT, Warning
#include "TFile.h" // for SnapshotHelper
#include "TFile.h" // for SnapshotHelper
#include "TH1.h"
#include "TGraph.h"
#include "TGraphAsymmErrors.h"
Expand All @@ -46,6 +46,12 @@
#include "TStatistic.h"
#include "ROOT/RDF/RActionImpl.hxx"
#include "ROOT/RDF/RMergeableValue.hxx"
#include "ROOT/RDF/RLoopManager.hxx"

#ifdef R__HAS_ROOT7
#include "ROOT/RNTupleDS.hxx"
#include "ROOT/RNTupleWriter.hxx" // for SnapshotRNTupleHelper
#endif

#include <algorithm>
#include <functional>
Expand Down Expand Up @@ -1379,8 +1385,8 @@ void SetBranchesHelper(TTree *inputTree, TTree &outputTree, const std::string &i
branchAddress = nullptr;
}

/// Helper function for SnapshotHelper and SnapshotHelperMT. It creates new branches for the output TTree of a Snapshot.
/// This overload is called for columns of type `RVec<T>`. For RDF, these can represent:
/// Helper function for SnapshotHelper and SnapshotHelperMT. It creates new branches for the output TTree of a
/// Snapshot. This overload is called for columns of type `RVec<T>`. For RDF, these can represent:
/// 1. c-style arrays in ROOT files, so we are sure that there are input trees to which we can ask the correct branch
/// title
/// 2. RVecs coming from a custom column or the input file/data-source
Expand Down Expand Up @@ -1483,7 +1489,7 @@ void SetBranchesHelper(TTree *inputTree, TTree &outputTree, const std::string &i

void ValidateSnapshotOutput(const RSnapshotOptions &opts, const std::string &treeName, const std::string &fileName);

/// Helper object for a single-thread Snapshot action
/// Helper object for a single-thread TTree-based Snapshot action
template <typename... ColTypes>
class R__CLING_PTRCHECK(off) SnapshotHelper : public RActionImpl<SnapshotHelper<ColTypes...>> {
std::string fFileName;
Expand All @@ -1507,9 +1513,15 @@ public:
SnapshotHelper(std::string_view filename, std::string_view dirname, std::string_view treename,
const ColumnNames_t &vbnames, const ColumnNames_t &bnames, const RSnapshotOptions &options,
std::vector<bool> &&isDefine)
: fFileName(filename), fDirName(dirname), fTreeName(treename), fOptions(options), fInputBranchNames(vbnames),
fOutputBranchNames(ReplaceDotWithUnderscore(bnames)), fBranches(vbnames.size(), nullptr),
fBranchAddresses(vbnames.size(), nullptr), fIsDefine(std::move(isDefine))
: fFileName(filename),
fDirName(dirname),
fTreeName(treename),
fOptions(options),
fInputBranchNames(vbnames),
fOutputBranchNames(ReplaceDotWithUnderscore(bnames)),
fBranches(vbnames.size(), nullptr),
fBranchAddresses(vbnames.size(), nullptr),
fIsDefine(std::move(isDefine))
{
ValidateSnapshotOutput(fOptions, fTreeName, fFileName);
}
Expand Down Expand Up @@ -1632,7 +1644,7 @@ public:
}
};

/// Helper object for a multi-thread Snapshot action
/// Helper object for a multi-thread TTree-based Snapshot action
template <typename... ColTypes>
class R__CLING_PTRCHECK(off) SnapshotHelperMT : public RActionImpl<SnapshotHelperMT<ColTypes...>> {
unsigned int fNSlots;
Expand All @@ -1659,11 +1671,20 @@ public:
SnapshotHelperMT(const unsigned int nSlots, std::string_view filename, std::string_view dirname,
std::string_view treename, const ColumnNames_t &vbnames, const ColumnNames_t &bnames,
const RSnapshotOptions &options, std::vector<bool> &&isDefine)
: fNSlots(nSlots), fOutputFiles(fNSlots), fOutputTrees(fNSlots), fBranchAddressesNeedReset(fNSlots, 1),
fFileName(filename), fDirName(dirname), fTreeName(treename), fOptions(options), fInputBranchNames(vbnames),
fOutputBranchNames(ReplaceDotWithUnderscore(bnames)), fInputTrees(fNSlots),
: fNSlots(nSlots),
fOutputFiles(fNSlots),
fOutputTrees(fNSlots),
fBranchAddressesNeedReset(fNSlots, 1),
fFileName(filename),
fDirName(dirname),
fTreeName(treename),
fOptions(options),
fInputBranchNames(vbnames),
fOutputBranchNames(ReplaceDotWithUnderscore(bnames)),
fInputTrees(fNSlots),
fBranches(fNSlots, std::vector<TBranch *>(vbnames.size(), nullptr)),
fBranchAddresses(fNSlots, std::vector<void *>(vbnames.size(), nullptr)), fOutputBranches(fNSlots),
fBranchAddresses(fNSlots, std::vector<void *>(vbnames.size(), nullptr)),
fOutputBranches(fNSlots),
fIsDefine(std::move(isDefine))
{
ValidateSnapshotOutput(fOptions, fTreeName, fFileName);
Expand Down Expand Up @@ -1818,6 +1839,152 @@ public:
}
};

#ifdef R__HAS_ROOT7
void ValidateSnapshotRNTupleOutput(const RSnapshotOptions &opts, const std::string &ntupleName,
const std::string &fileName);

/// Helper function to update the value of an RNTuple's field in the provided entry.
template <typename T>
void SetFieldsHelper(T value, std::string_view fieldName, ROOT::Experimental::REntry *entry)
{
entry->BindValue(fieldName, std::make_shared<T>(value));
}

/// Helper object for a single-thread RNTuple-based Snapshot action
template <typename... ColTypes>
class R__CLING_PTRCHECK(off) SnapshotRNTupleHelper : public RActionImpl<SnapshotRNTupleHelper<ColTypes...>> {
std::string fFileName;
std::string fNTupleName;

std::unique_ptr<TFile> fOutFile = nullptr;

RSnapshotOptions fOptions;
ROOT::Detail::RDF::RLoopManager *fLoopManager;
ColumnNames_t fInputFieldNames; // This contains the resolved aliases
ColumnNames_t fOutputFieldNames;
std::unique_ptr<ROOT::Experimental::RNTupleWriter> fWriter{nullptr};

ROOT::Experimental::REntry *fOutputEntry;

std::vector<bool> fIsDefine;

public:
using ColumnTypes_t = TypeList<ColTypes...>;
SnapshotRNTupleHelper(std::string_view filename, std::string_view ntuplename, const ColumnNames_t &vfnames,
const ColumnNames_t &fnames, const RSnapshotOptions &options,
ROOT::Detail::RDF::RLoopManager *lm, std::vector<bool> &&isDefine)
: fFileName(filename),
fNTupleName(ntuplename),
fOptions(options),
fLoopManager(lm),
fInputFieldNames(vfnames),
fOutputFieldNames(ReplaceDotWithUnderscore(fnames)),
fIsDefine(std::move(isDefine))
{
ValidateSnapshotRNTupleOutput(fOptions, fNTupleName, fFileName);
}

SnapshotRNTupleHelper(const SnapshotRNTupleHelper &) = delete;
SnapshotRNTupleHelper(SnapshotRNTupleHelper &&) = default;
~SnapshotRNTupleHelper()
{
if (!fNTupleName.empty() && !fLoopManager->GetDataSource() && fOptions.fLazy)
Warning("Snapshot", "A lazy Snapshot action was booked but never triggered.");
}

void InitTask(TTreeReader *, unsigned int /* slot */) {}

void Exec(unsigned int /* slot */, ColTypes &...values)
{
using ind_t = std::index_sequence_for<ColTypes...>;

SetFields(values..., ind_t{});
fWriter->Fill();
}

template <std::size_t... S>
void SetFields(ColTypes &...values, std::index_sequence<S...> /*dummy*/)
{
int expander[] = {(SetFieldsHelper(values, fOutputFieldNames[S], fOutputEntry), 0)..., 0};
(void)expander; // avoid unused variable warnings for older compilers (gcc 14.1)
}

void Initialize()
{
using ind_t = std::index_sequence_for<ColTypes...>;

auto model = ROOT::Experimental::RNTupleModel::Create();
MakeFields(*model, ind_t{});
fOutputEntry = &model->GetDefaultEntry();

fOutFile = std::unique_ptr<TFile>(
TFile::Open(fFileName.c_str(), fOptions.fMode.c_str(), /*ftitle=*/fFileName.c_str() /* , cs */));
if (!fOutFile)
throw std::runtime_error("Snapshot: could not create output file " + fFileName);

ROOT::Experimental::RNTupleWriteOptions writeOptions;
writeOptions.SetCompression(fOptions.fCompressionAlgorithm, fOptions.fCompressionLevel);

TString checkupdate = fOptions.fMode;
checkupdate.ToLower();

if (checkupdate == "update") {
fWriter = ROOT::Experimental::RNTupleWriter::Append(std::move(model), fNTupleName, *fOutFile, writeOptions);
} else {
// If the output file is not closed before the RNTupleWriter is created, no actual data will be written and
// the file will be empty.
fOutFile->Close();
fWriter = ROOT::Experimental::RNTupleWriter::Recreate(std::move(model), fNTupleName, fFileName, writeOptions);
}
}

template <std::size_t... S>
void MakeFields(ROOT::Experimental::RNTupleModel &model, std::index_sequence<S...> /*dummy*/)
{
int expander[] = {(model.MakeField<ColTypes>(fOutputFieldNames[S]), 0)..., 0};
(void)expander; // avoid unused variable warnings for older compilers (gcc 14.1)
}

void Finalize()
{
{
fWriter.reset();
}
fLoopManager->SetDataSource(std::make_unique<ROOT::Experimental::RNTupleDS>(fNTupleName, fFileName));
}

std::string GetActionName() { return "Snapshot"; }

ROOT::RDF::SampleCallback_t GetSampleCallback() final
{
return [](unsigned int, const RSampleInfo &) mutable {};
}

/**
* @brief Create a new SnapshotRNTupleHelper with a different output file name
*
* @param newName A type-erased string with the output file name
* @return SnapshotRNTupleHelper
*
* This MakeNew implementation is tied to the cloning feature of actions
* of the computation graph. In particular, cloning a Snapshot node usually
* also involves changing the name of the output file, otherwise the cloned
* Snapshot would overwrite the same file.
*/
SnapshotRNTupleHelper MakeNew(void *newName)
{
const std::string finalName = *reinterpret_cast<const std::string *>(newName);
return SnapshotRNTupleHelper{finalName,
fNTupleName,
fInputFieldNames,
fOutputFieldNames,
fOptions,
fLoopManager,
std::vector<bool>(fIsDefine)};
}
};
#endif

template <typename Acc, typename Merge, typename R, typename T, typename U,
bool MustCopyAssign = std::is_same<R, U>::value>
class R__CLING_PTRCHECK(off) AggregateHelper
Expand Down
45 changes: 35 additions & 10 deletions tree/dataframe/inc/ROOT/RDF/InterfaceUtils.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,8 @@ struct SnapshotHelperArgs {
std::string fTreeName;
std::vector<std::string> fOutputColNames;
ROOT::RDF::RSnapshotOptions fOptions;
RDFDetail::RLoopManager *fLoopManager;
bool fToNTuple;
};

// Snapshot action
Expand All @@ -275,20 +277,39 @@ BuildAction(const ColumnNames_t &colNames, const std::shared_ptr<SnapshotHelperA
std::vector<bool> isDefine = makeIsDefine();

std::unique_ptr<RActionBase> actionPtr;
if (!ROOT::IsImplicitMTEnabled()) {
// single-thread snapshot
using Helper_t = SnapshotHelper<ColTypes...>;
if (snapHelperArgs->fToNTuple) {
#ifdef R__HAS_ROOT7
// TODO(fdegeus) Add MT snapshotting
using Helper_t = SnapshotRNTupleHelper<ColTypes...>;
using Action_t = RAction<Helper_t, PrevNodeType>;

auto loopManager = snapHelperArgs->fLoopManager;

actionPtr.reset(
new Action_t(Helper_t(filename, dirname, treename, colNames, outputColNames, options, std::move(isDefine)),
new Action_t(Helper_t(filename, treename, colNames, outputColNames, options, loopManager, std::move(isDefine)),
colNames, prevNode, colRegister));

return actionPtr;
#else
throw std::runtime_error("Cannot snapshot to RNTuple: this installation of ROOT has not been build with ROOT7 "
"components enabled.");
#endif
} else {
// multi-thread snapshot
using Helper_t = SnapshotHelperMT<ColTypes...>;
using Action_t = RAction<Helper_t, PrevNodeType>;
actionPtr.reset(new Action_t(
Helper_t(nSlots, filename, dirname, treename, colNames, outputColNames, options, std::move(isDefine)),
colNames, prevNode, colRegister));
if (!ROOT::IsImplicitMTEnabled()) {
// single-thread snapshot
using Helper_t = SnapshotHelper<ColTypes...>;
using Action_t = RAction<Helper_t, PrevNodeType>;
actionPtr.reset(
new Action_t(Helper_t(filename, dirname, treename, colNames, outputColNames, options, std::move(isDefine)),
colNames, prevNode, colRegister));
} else {
// multi-thread snapshot
using Helper_t = SnapshotHelperMT<ColTypes...>;
using Action_t = RAction<Helper_t, PrevNodeType>;
actionPtr.reset(new Action_t(
Helper_t(nSlots, filename, dirname, treename, colNames, outputColNames, options, std::move(isDefine)),
colNames, prevNode, colRegister));
}
}
return actionPtr;
}
Expand Down Expand Up @@ -781,6 +802,10 @@ AddSizeBranches(const std::vector<std::string> &branches, TTree *tree, std::vect

void RemoveDuplicates(ColumnNames_t &columnNames);

#ifdef R__HAS_ROOT7
void RemoveRNTupleSubFields(ColumnNames_t &columnNames);
#endif

} // namespace RDF
} // namespace Internal

Expand Down
Loading

0 comments on commit c701f1f

Please sign in to comment.