Skip to content

Commit

Permalink
[df] Add initial snapshotting for RNTuple
Browse files Browse the repository at this point in the history
  • Loading branch information
enirolf committed Jun 4, 2024
1 parent 7dab2fe commit 9097b0a
Show file tree
Hide file tree
Showing 6 changed files with 1,549 additions and 8 deletions.
134 changes: 130 additions & 4 deletions tree/dataframe/inc/ROOT/RDF/ActionHelpers.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
#include <string_view>
#include "ROOT/RVec.hxx"
#include "ROOT/TBufferMerger.hxx" // for SnapshotHelper
#include "ROOT/RField.hxx" // for SnapshotRNTupleHelper
#include "ROOT/RNTupleWriter.hxx" // for SnapshotRNTupleHelper
#include "ROOT/RDF/RCutFlowReport.hxx"
#include "ROOT/RDF/RSampleInfo.hxx"
#include "ROOT/RDF/Utils.hxx"
Expand All @@ -46,6 +48,8 @@
#include "TStatistic.h"
#include "ROOT/RDF/RActionImpl.hxx"
#include "ROOT/RDF/RMergeableValue.hxx"
#include "ROOT/RDF/RLoopManager.hxx"
#include "ROOT/RNTupleDS.hxx"

#include <algorithm>
#include <functional>
Expand Down Expand Up @@ -1509,9 +1513,10 @@ void SetBranchesHelper(TTree *inputTree, TTree &outputTree, const std::string &i

void ValidateSnapshotOutput(const RSnapshotOptions &opts, const std::string &treeName, const std::string &fileName);

/// Helper object for a single-thread Snapshot action
/// Helper object for a single-thread TTree-based Snapshot action
template <typename... ColTypes>
class R__CLING_PTRCHECK(off) SnapshotHelper : public RActionImpl<SnapshotHelper<ColTypes...>> {
class R__CLING_PTRCHECK(off) SnapshotHelper
: public RActionImpl<SnapshotHelper<ColTypes...>> { // TODO(fdegeus) rename SnapshotTTreeHelper
std::string fFileName;
std::string fDirName;
std::string fTreeName;
Expand Down Expand Up @@ -1658,9 +1663,10 @@ public:
}
};

/// Helper object for a multi-thread Snapshot action
/// Helper object for a multi-thread TTree-based Snapshot action
template <typename... ColTypes>
class R__CLING_PTRCHECK(off) SnapshotHelperMT : public RActionImpl<SnapshotHelperMT<ColTypes...>> {
class R__CLING_PTRCHECK(off) SnapshotHelperMT
: public RActionImpl<SnapshotHelperMT<ColTypes...>> { // TODO(fdegeus) rename SnapshotTTreeHelper
unsigned int fNSlots;
std::unique_ptr<ROOT::TBufferMerger> fMerger; // must use a ptr because TBufferMerger is not movable
std::vector<std::shared_ptr<ROOT::TBufferMergerFile>> fOutputFiles;
Expand Down Expand Up @@ -1844,6 +1850,126 @@ public:
}
};

/// Helper function to update the value of an RNTuple's field in the provided entry.
template <typename T>
void SetFieldsHelper(T value, std::string_view fieldName, ROOT::Experimental::REntry *entry)
{
entry->BindValue(fieldName, std::make_shared<T>(value));
}

/// Helper object for a single-thread RNTuple-based Snapshot action
template <typename... ColTypes>
class R__CLING_PTRCHECK(off) SnapshotRNTupleHelper : public RActionImpl<SnapshotRNTupleHelper<ColTypes...>> {
std::string fFileName;
std::string fNTupleName;
RSnapshotOptions fOptions;
ROOT::Detail::RDF::RLoopManager *fLoopManager;
ColumnNames_t fInputFieldNames; // This contains the resolved aliases
ColumnNames_t fOutputFieldNames;
std::unique_ptr<ROOT::Experimental::RNTupleWriter> fWriter{nullptr};

ROOT::Experimental::REntry *fOutputEntry;

std::vector<bool> fIsDefine;

public:
using ColumnTypes_t = TypeList<ColTypes...>;
SnapshotRNTupleHelper(std::string_view filename, std::string_view ntuplename, const ColumnNames_t &vfnames,
const ColumnNames_t &fnames, const RSnapshotOptions &options,
ROOT::Detail::RDF::RLoopManager *lm, std::vector<bool> &&isDefine)
: fFileName(filename),
fNTupleName(ntuplename),
fOptions(options),
fLoopManager(lm),
fInputFieldNames(vfnames),
fOutputFieldNames(ReplaceDotWithUnderscore(fnames)),
fIsDefine(std::move(isDefine))
{
// TODO(fdegeus) add ValidateSnapshotRNTupleOutput(fOptions, fTreeName, fFileName);
}

SnapshotRNTupleHelper(const SnapshotRNTupleHelper &) = delete;
SnapshotRNTupleHelper(SnapshotRNTupleHelper &&) = default;
~SnapshotRNTupleHelper()
{
if (!fNTupleName.empty() /* TODO(fdegeus) Check if LM has DS */ && fOptions.fLazy)
Warning("SnapshotRNTuple", "A lazy Snapshot action was booked but never triggered.");
}

void InitTask(TTreeReader *, unsigned int /* slot */) {}

void Exec(unsigned int /* slot */, ColTypes &...values)
{
using ind_t = std::index_sequence_for<ColTypes...>;

SetFields(values..., ind_t{});
fWriter->Fill();
}

template <std::size_t... S>
void SetFields(ColTypes &...values, std::index_sequence<S...> /*dummy*/)
{
int expander[] = {(SetFieldsHelper(values, fOutputFieldNames[S], fOutputEntry), 0)..., 0};
(void)expander; // avoid unused variable warnings for older compilers such as gcc 4.9
}

void Initialize()
{
using ind_t = std::index_sequence_for<ColTypes...>;

auto model = ROOT::Experimental::RNTupleModel::Create();
MakeFields(*model, ind_t{});
fOutputEntry = &model->GetDefaultEntry();

fWriter = ROOT::Experimental::RNTupleWriter::Recreate(std::move(model), fNTupleName, fFileName);
}

template <std::size_t... S>
void MakeFields(ROOT::Experimental::RNTupleModel &model, std::index_sequence<S...> /*dummy*/)
{
int expander[] = {(model.MakeField<ColTypes>(fOutputFieldNames[S]), 0)..., 0};
(void)expander;
}

void Finalize()
{
{
fWriter.reset();
}
fLoopManager->SetDataSource(std::make_unique<ROOT::Experimental::RNTupleDS>(fNTupleName, fFileName));
}

std::string GetActionName() { return "Snapshot"; }

ROOT::RDF::SampleCallback_t GetSampleCallback() final
{
return [](unsigned int, const RSampleInfo &) mutable {};
}

/**
* @brief Create a new SnapshotRNTupleHelper with a different output file name
*
* @param newName A type-erased string with the output file name
* @return SnapshotRNTupleHelper
*
* This MakeNew implementation is tied to the cloning feature of actions
* of the computation graph. In particular, cloning a Snapshot node usually
* also involves changing the name of the output file, otherwise the cloned
* Snapshot would overwrite the same file.
*/
SnapshotRNTupleHelper MakeNew(void *newName)
{
const std::string finalName = *reinterpret_cast<const std::string *>(newName);
return SnapshotRNTupleHelper{finalName,
fNTupleName,
fInputFieldNames,
fOutputFieldNames,
fOptions,
fLoopManager,
std::vector<bool>(fIsDefine)};
}
};

template <typename Acc, typename Merge, typename R, typename T, typename U,
bool MustCopyAssign = std::is_same<R, U>::value>
class R__CLING_PTRCHECK(off) AggregateHelper
Expand Down
46 changes: 45 additions & 1 deletion tree/dataframe/inc/ROOT/RDF/InterfaceUtils.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ struct Fill{};
struct StdDev{};
struct Display{};
struct Snapshot{};
struct SnapshotRNTuple{};
struct Book{};
}
// clang-format on
Expand Down Expand Up @@ -250,9 +251,10 @@ struct SnapshotHelperArgs {
std::string fTreeName;
std::vector<std::string> fOutputColNames;
ROOT::RDF::RSnapshotOptions fOptions;
RDFDetail::RLoopManager *fLoopManager;
};

// Snapshot action
// Snapshot action TODO(fdegeus) SnapshotTTree
template <typename... ColTypes, typename PrevNodeType>
std::unique_ptr<RActionBase>
BuildAction(const ColumnNames_t &colNames, const std::shared_ptr<SnapshotHelperArgs> &snapHelperArgs,
Expand Down Expand Up @@ -293,6 +295,48 @@ BuildAction(const ColumnNames_t &colNames, const std::shared_ptr<SnapshotHelperA
return actionPtr;
}

// SnapshotRNTuple action
template <typename... ColTypes, typename PrevNodeType>
std::unique_ptr<RActionBase>
BuildAction(const ColumnNames_t &colNames, const std::shared_ptr<SnapshotHelperArgs> &snapHelperArgs,
const unsigned int /*nSlots*/, std::shared_ptr<PrevNodeType> prevNode, ActionTags::SnapshotRNTuple,
const RColumnRegister &colRegister)
{
const auto &filename = snapHelperArgs->fFileName;
const auto &ntuplename = snapHelperArgs->fTreeName;
const auto &outputColNames = snapHelperArgs->fOutputColNames;
const auto &options = snapHelperArgs->fOptions;
auto loopManager = snapHelperArgs->fLoopManager;

auto makeIsDefine = [&] {
std::vector<bool> isDef;
isDef.reserve(sizeof...(ColTypes));
for (auto i = 0u; i < sizeof...(ColTypes); ++i)
isDef.push_back(colRegister.IsDefineOrAlias(colNames[i]));
return isDef;
};
std::vector<bool> isDefine = makeIsDefine();

std::unique_ptr<RActionBase> actionPtr;
if (!ROOT::IsImplicitMTEnabled()) {
// single-thread snapshot
using Helper_t = SnapshotRNTupleHelper<ColTypes...>;
using Action_t = RAction<Helper_t, PrevNodeType>;
actionPtr.reset(new Action_t(
Helper_t(filename, ntuplename, colNames, outputColNames, options, loopManager, std::move(isDefine)), colNames,
prevNode, colRegister));
} else {
// TODO(fdegeus)
// // multi-thread snapshot
// using Helper_t = SnapshotHelperMT<ColTypes...>;
// using Action_t = RAction<Helper_t, PrevNodeType>;
// actionPtr.reset(new Action_t(
// Helper_t(nSlots, filename, dirname, treename, colNames, outputColNames, options, std::move(isDefine)),
// colNames, prevNode, colRegister));
}
return actionPtr;
}

// Book with custom helper type
template <typename... ColTypes, typename PrevNodeType, typename Helper_t>
std::unique_ptr<RActionBase>
Expand Down
92 changes: 89 additions & 3 deletions tree/dataframe/inc/ROOT/RDF/RInterface.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -1169,7 +1169,7 @@ public:

auto snapHelperArgs = std::make_shared<RDFInternal::SnapshotHelperArgs>(
RDFInternal::SnapshotHelperArgs{std::string(filename), std::string(dirname), std::string(treename),
colListWithAliasesAndSizeBranches, options});
colListWithAliasesAndSizeBranches, options, nullptr});

::TDirectory::TContext ctxt;

Expand All @@ -1190,6 +1190,56 @@ public:
return resPtr;
}

#ifdef R__HAS_ROOT7
/// TODO(fdegeus) docs
template <typename... ColumnTypes>
RResultPtr<RInterface<RLoopManager>>
SnapshotRNTuple(std::string_view ntuplename, std::string_view filename, const ColumnNames_t &columnList,
const RSnapshotOptions &options = RSnapshotOptions())
{
return SnapshotRNTupleImpl<ColumnTypes...>(ntuplename, filename, columnList, options);
}

RResultPtr<RInterface<RLoopManager>> SnapshotRNTuple(std::string_view ntuplename, std::string_view filename,
const ColumnNames_t &columnList,
const RSnapshotOptions &options = RSnapshotOptions())
{
// like columnList but with `#var` columns removed
auto colListNoPoundSizes = RDFInternal::FilterArraySizeColNames(columnList, "SnapshotRNTuple");
// like columnListWithoutSizeColumns but with aliases resolved
auto colListNoAliases = GetValidatedColumnNames(colListNoPoundSizes.size(), colListNoPoundSizes);
RDFInternal::CheckForDuplicateSnapshotColumns(colListNoAliases);
// like validCols but with missing size branches required by array branches added in the right positions
const auto pairOfColumnLists =
RDFInternal::AddSizeBranches(fLoopManager->GetBranchNames(), fLoopManager->GetTree(),
std::move(colListNoAliases), std::move(colListNoPoundSizes));
const auto &colListNoAliasesWithSizeBranches = pairOfColumnLists.first;
const auto &colListWithAliasesAndSizeBranches = pairOfColumnLists.second;

const auto colTypeList = GetColumnTypeNamesList(colListNoAliasesWithSizeBranches);

const auto fullNTupleName = ntuplename;
const auto parsedNTupleName = RDFInternal::ParseTreePath(fullNTupleName);
ntuplename = parsedNTupleName.fTreeName;

::TDirectory::TContext ctxt;

auto newRDF = std::make_shared<RInterface<RLoopManager>>(std::make_shared<RLoopManager>(0));

auto snapHelperArgs = std::make_shared<RDFInternal::SnapshotHelperArgs>(
RDFInternal::SnapshotHelperArgs{std::string(filename), "", std::string(ntuplename),
colListWithAliasesAndSizeBranches, options, newRDF->GetLoopManager()});

auto resPtr = CreateAction<RDFInternal::ActionTags::SnapshotRNTuple, RDFDetail::RInferredType>(
colListNoAliasesWithSizeBranches, newRDF, snapHelperArgs, fProxiedPtr,
colListNoAliasesWithSizeBranches.size());

if (!options.fLazy)
*resPtr;
return resPtr;
}
#endif

// clang-format off
////////////////////////////////////////////////////////////////////////////
/// \brief Save selected columns to disk, in a new TTree `treename` in file `filename`.
Expand Down Expand Up @@ -3002,8 +3052,9 @@ private:
const auto &treename = parsedTreePath.fTreeName;
const auto &dirname = parsedTreePath.fDirName;

auto snapHelperArgs = std::make_shared<RDFInternal::SnapshotHelperArgs>(RDFInternal::SnapshotHelperArgs{
std::string(filename), std::string(dirname), std::string(treename), columnListWithoutSizeColumns, options});
auto snapHelperArgs = std::make_shared<RDFInternal::SnapshotHelperArgs>(
RDFInternal::SnapshotHelperArgs{std::string(filename), std::string(dirname), std::string(treename),
columnListWithoutSizeColumns, options, nullptr});

::TDirectory::TContext ctxt;

Expand All @@ -3025,6 +3076,41 @@ private:
return resPtr;
}

// TODO(fdegeus) Docs
template <typename... ColumnTypes>
RResultPtr<RInterface<RLoopManager>>
SnapshotRNTupleImpl(std::string_view fullNTupleName, std::string_view filename, const ColumnNames_t &columnList,
const RSnapshotOptions &options)
{
const auto columnListWithoutSizeColumns = RDFInternal::FilterArraySizeColNames(columnList, "SnapshotRNTuple");

RDFInternal::CheckTypesAndPars(sizeof...(ColumnTypes), columnListWithoutSizeColumns.size());
// validCols has aliases resolved, while columnListWithoutSizeColumns still has aliases in it.
const auto validCols = GetValidatedColumnNames(columnListWithoutSizeColumns.size(), columnListWithoutSizeColumns);
RDFInternal::CheckForDuplicateSnapshotColumns(validCols);
CheckAndFillDSColumns(validCols, TTraits::TypeList<ColumnTypes...>());

const auto parsedNTuplePath = RDFInternal::ParseTreePath(fullNTupleName);
const auto &ntuplename = parsedNTuplePath.fTreeName;

::TDirectory::TContext ctxt;

auto newRDF = std::make_shared<RInterface<RLoopManager>>(std::make_shared<RLoopManager>(0));

auto snapHelperArgs = std::make_shared<RDFInternal::SnapshotHelperArgs>(
RDFInternal::SnapshotHelperArgs{std::string(filename), "", std::string(ntuplename),
columnListWithoutSizeColumns, options, newRDF->GetLoopManager()});

// The Snapshot helper will use validCols (with aliases resolved) as input columns, and
// columnListWithoutSizeColumns (still with aliases in it, passed through snapHelperArgs) as output column names.
auto resPtr = CreateAction<RDFInternal::ActionTags::SnapshotRNTuple, ColumnTypes...>(validCols, newRDF,
snapHelperArgs, fProxiedPtr);

if (!options.fLazy)
*resPtr;
return resPtr;
}

////////////////////////////////////////////////////////////////////////////
/// \brief Implementation of cache.
template <typename... ColTypes, std::size_t... S>
Expand Down
3 changes: 3 additions & 0 deletions tree/dataframe/inc/ROOT/RDF/RLoopManager.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
#include "ROOT/RDF/RNodeBase.hxx"
#include "ROOT/RDF/RNewSampleNotifier.hxx"
#include "ROOT/RDF/RSampleInfo.hxx"
#include "ROOT/RDataSource.hxx"

#include "ROOT/RNTupleWriter.hxx"

#include <functional>
#include <limits>
Expand Down
1 change: 1 addition & 0 deletions tree/dataframe/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ endif()

if(root7)
ROOT_ADD_GTEST(datasource_ntuple datasource_ntuple.cxx LIBRARIES ROOTDataFrame)
ROOT_ADD_GTEST(dataframe_snapshot_ntuple dataframe_snapshot_ntuple.cxx LIBRARIES ROOTDataFrame)

ROOT_STANDARD_LIBRARY_PACKAGE(NTupleStruct
NO_INSTALL_HEADERS
Expand Down
Loading

0 comments on commit 9097b0a

Please sign in to comment.