Skip to content

Commit

Permalink
DPL: Move DataInputDirector to arrow::Dataset API
Browse files Browse the repository at this point in the history
  • Loading branch information
ktf committed Dec 20, 2024
1 parent 4ebf849 commit 84ca389
Show file tree
Hide file tree
Showing 4 changed files with 175 additions and 85 deletions.
16 changes: 12 additions & 4 deletions Framework/AnalysisSupport/src/AODJAlienReaderHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@
// or submit itself to any jurisdiction.

#include "AODJAlienReaderHelpers.h"
#include <memory>
#include "Framework/TableTreeHelpers.h"
#include "Framework/AnalysisHelpers.h"
#include "Framework/DataProcessingStats.h"
#include "Framework/RootTableBuilderHelpers.h"
#include "Framework/RootArrowFilesystem.h"
#include "Framework/AlgorithmSpec.h"
#include "Framework/ConfigParamRegistry.h"
#include "Framework/ControlService.h"
Expand Down Expand Up @@ -41,6 +43,8 @@
#include <arrow/io/interfaces.h>
#include <arrow/table.h>
#include <arrow/util/key_value_metadata.h>
#include <arrow/dataset/dataset.h>
#include <arrow/dataset/file_base.h>

using namespace o2;
using namespace o2::aod;
Expand Down Expand Up @@ -272,11 +276,13 @@ AlgorithmSpec AODJAlienReaderHelpers::rootFileReaderCallback(ConfigContext const
// Origin file name for derived output map
auto o2 = Output(TFFileNameHeader);
auto fileAndFolder = didir->getFileFolder(dh, fcnt, ntf);
std::string currentFilename(fileAndFolder.file->GetName());
if (strcmp(fileAndFolder.file->GetEndpointUrl()->GetProtocol(), "file") == 0 && fileAndFolder.file->GetEndpointUrl()->GetFile()[0] != '/') {
auto rootFS = std::dynamic_pointer_cast<TFileFileSystem>(fileAndFolder.filesystem());
auto* f = dynamic_cast<TFile*>(rootFS->GetFile());
std::string currentFilename(f->GetFile()->GetName());
if (strcmp(f->GetEndpointUrl()->GetProtocol(), "file") == 0 && f->GetEndpointUrl()->GetFile()[0] != '/') {
// This is not an absolute local path. Make it absolute.
static std::string pwd = gSystem->pwd() + std::string("/");
currentFilename = pwd + std::string(fileAndFolder.file->GetName());
currentFilename = pwd + std::string(f->GetName());
}
outputs.make<std::string>(o2) = currentFilename;
}
Expand Down Expand Up @@ -312,7 +318,9 @@ AlgorithmSpec AODJAlienReaderHelpers::rootFileReaderCallback(ConfigContext const
auto concrete = DataSpecUtils::asConcreteDataMatcher(firstRoute.matcher);
auto dh = header::DataHeader(concrete.description, concrete.origin, concrete.subSpec);
auto fileAndFolder = didir->getFileFolder(dh, fcnt, ntf);
if (!fileAndFolder.file) {

// In case the filesource is empty, move to the next one.
if (fileAndFolder.path().empty()) {
fcnt += 1;
ntf = 0;
if (didir->atEnd(fcnt)) {
Expand Down
184 changes: 120 additions & 64 deletions Framework/AnalysisSupport/src/DataInputDirector.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
#include "DataInputDirector.h"
#include "Framework/DataDescriptorQueryBuilder.h"
#include "Framework/Logger.h"
#include "Framework/PluginManager.h"
#include "Framework/RootArrowFilesystem.h"
#include "Framework/AnalysisDataModelHelpers.h"
#include "Framework/Output.h"
#include "Headers/DataHeader.h"
Expand All @@ -26,8 +28,12 @@
#include "TGrid.h"
#include "TObjString.h"
#include "TMap.h"
#include "TFile.h"

#include <arrow/dataset/file_base.h>
#include <arrow/dataset/dataset.h>
#include <uv.h>
#include <memory>

#if __has_include(<TJAlienFile.h>)
#include <TJAlienFile.h>
Expand All @@ -47,12 +53,27 @@ FileNameHolder* makeFileNameHolder(std::string fileName)
return fileNameHolder;
}

DataInputDescriptor::DataInputDescriptor(bool alienSupport, int level, o2::monitoring::Monitoring* monitoring, int allowedParentLevel, std::string parentFileReplacement) : mAlienSupport(alienSupport),
mMonitoring(monitoring),
mAllowedParentLevel(allowedParentLevel),
mParentFileReplacement(std::move(parentFileReplacement)),
mLevel(level)
DataInputDescriptor::DataInputDescriptor(bool alienSupport, int level, o2::monitoring::Monitoring* monitoring, int allowedParentLevel, std::string parentFileReplacement)
: mAlienSupport(alienSupport),
mMonitoring(monitoring),
mAllowedParentLevel(allowedParentLevel),
mParentFileReplacement(std::move(parentFileReplacement)),
mLevel(level)
{
std::vector<char const*> capabilitiesSpecs = {
"O2Framework:RNTupleObjectReadingCapability",
"O2Framework:TTreeObjectReadingCapability",
};

std::vector<LoadablePlugin> plugins;
for (auto spec : capabilitiesSpecs) {
auto morePlugins = PluginManager::parsePluginSpecString(spec);
for (auto& extra : morePlugins) {
plugins.push_back(extra);
}
}

PluginManager::loadFromPlugin<RootObjectReadingCapability, RootObjectReadingCapabilityPlugin>(plugins, mFactory.capabilities);
}

void DataInputDescriptor::printOut()
Expand Down Expand Up @@ -108,20 +129,22 @@ bool DataInputDescriptor::setFile(int counter)

// open file
auto filename = mfilenames[counter]->fileName;
if (mcurrentFile) {
if (mcurrentFile->GetName() == filename) {
auto rootFS = std::dynamic_pointer_cast<TFileFileSystem>(mCurrentFilesystem);
if (rootFS.get()) {
if (rootFS->GetFile()->GetName() == filename) {
return true;
}
closeInputFile();
}
mcurrentFile = TFile::Open(filename.c_str());
if (!mcurrentFile) {

mCurrentFilesystem = std::make_shared<TFileFileSystem>(TFile::Open(filename.c_str()), 50 * 1024 * 1024, mFactory);
if (!mCurrentFilesystem.get()) {
throw std::runtime_error(fmt::format("Couldn't open file \"{}\"!", filename));
}
mcurrentFile->SetReadaheadSize(50 * 1024 * 1024);
rootFS = std::dynamic_pointer_cast<TFileFileSystem>(mCurrentFilesystem);

// get the parent file map if exists
mParentFileMap = (TMap*)mcurrentFile->Get("parentFiles"); // folder name (DF_XXX) --> parent file (absolute path)
mParentFileMap = (TMap*)rootFS->GetFile()->Get("parentFiles"); // folder name (DF_XXX) --> parent file (absolute path)
if (mParentFileMap && !mParentFileReplacement.empty()) {
auto pos = mParentFileReplacement.find(';');
if (pos == std::string::npos) {
Expand All @@ -140,16 +163,28 @@ bool DataInputDescriptor::setFile(int counter)

// get the directory names
if (mfilenames[counter]->numberOfTimeFrames <= 0) {
std::regex TFRegex = std::regex("DF_[0-9]+");
TList* keyList = mcurrentFile->GetListOfKeys();
const std::regex TFRegex = std::regex("/?DF_([0-9]+)(|-.*)$");
TList* keyList = rootFS->GetFile()->GetListOfKeys();
std::vector<std::string> finalList;

// extract TF numbers and sort accordingly
// We use an extra seen set to make sure we preserve the order in which
// we instert things in the final list and to make sure we do not have duplicates.
// Multiple folder numbers can happen if we use a flat structure /DF_<df>-<tablename>
std::unordered_set<size_t> seen;
for (auto key : *keyList) {
if (std::regex_match(((TObjString*)key)->GetString().Data(), TFRegex)) {
auto folderNumber = std::stoul(std::string(((TObjString*)key)->GetString().Data()).substr(3));
mfilenames[counter]->listOfTimeFrameNumbers.emplace_back(folderNumber);
std::smatch matchResult;
std::string keyName = ((TObjString*)key)->GetString().Data();
bool match = std::regex_match(keyName, matchResult, TFRegex);
if (match) {
auto folderNumber = std::stoul(matchResult[1].str());
if (seen.find(folderNumber) == seen.end()) {
seen.insert(folderNumber);
mfilenames[counter]->listOfTimeFrameNumbers.emplace_back(folderNumber);
}
}
}

if (mParentFileMap != nullptr) {
// If we have a parent map, we should not process in DF alphabetical order but according to parent file to avoid swapping between files
std::sort(mfilenames[counter]->listOfTimeFrameNumbers.begin(), mfilenames[counter]->listOfTimeFrameNumbers.end(),
Expand All @@ -162,12 +197,8 @@ bool DataInputDescriptor::setFile(int counter)
std::sort(mfilenames[counter]->listOfTimeFrameNumbers.begin(), mfilenames[counter]->listOfTimeFrameNumbers.end());
}

for (auto folderNumber : mfilenames[counter]->listOfTimeFrameNumbers) {
auto folderName = "DF_" + std::to_string(folderNumber);
mfilenames[counter]->listOfTimeFrameKeys.emplace_back(folderName);
mfilenames[counter]->alreadyRead.emplace_back(false);
}
mfilenames[counter]->numberOfTimeFrames = mfilenames[counter]->listOfTimeFrameKeys.size();
mfilenames[counter]->alreadyRead.resize(mfilenames[counter]->alreadyRead.size() + mfilenames[counter]->listOfTimeFrameNumbers.size(), false);
mfilenames[counter]->numberOfTimeFrames = mfilenames[counter]->listOfTimeFrameNumbers.size();
}

mCurrentFileID = counter;
Expand All @@ -193,26 +224,21 @@ uint64_t DataInputDescriptor::getTimeFrameNumber(int counter, int numTF)
return (mfilenames[counter]->listOfTimeFrameNumbers)[numTF];
}

FileAndFolder DataInputDescriptor::getFileFolder(int counter, int numTF)
arrow::dataset::FileSource DataInputDescriptor::getFileFolder(int counter, int numTF)
{
FileAndFolder fileAndFolder;

// open file
if (!setFile(counter)) {
return fileAndFolder;
return {};
}

// no TF left
if (mfilenames[counter]->numberOfTimeFrames > 0 && numTF >= mfilenames[counter]->numberOfTimeFrames) {
return fileAndFolder;
return {};
}

fileAndFolder.file = mcurrentFile;
fileAndFolder.folderName = (mfilenames[counter]->listOfTimeFrameKeys)[numTF];

mfilenames[counter]->alreadyRead[numTF] = true;

return fileAndFolder;
return {fmt::format("DF_{}", mfilenames[counter]->listOfTimeFrameNumbers[numTF]), mCurrentFilesystem};
}

DataInputDescriptor* DataInputDescriptor::getParentFile(int counter, int numTF, std::string treename)
Expand All @@ -221,17 +247,19 @@ DataInputDescriptor* DataInputDescriptor::getParentFile(int counter, int numTF,
// This file has no parent map
return nullptr;
}
auto folderName = (mfilenames[counter]->listOfTimeFrameKeys)[numTF];
auto folderName = fmt::format("DF_{}", mfilenames[counter]->listOfTimeFrameNumbers[numTF]);
auto parentFileName = (TObjString*)mParentFileMap->GetValue(folderName.c_str());
// The current DF is not found in the parent map (this should not happen and is a fatal error)
auto rootFS = std::dynamic_pointer_cast<TFileFileSystem>(mCurrentFilesystem);
if (!parentFileName) {
// The current DF is not found in the parent map (this should not happen and is a fatal error)
throw std::runtime_error(fmt::format(R"(parent file map exists but does not contain the current DF "{}" in file "{}")", folderName.c_str(), mcurrentFile->GetName()));
throw std::runtime_error(fmt::format(R"(parent file map exists but does not contain the current DF "{}" in file "{}")", folderName.c_str(), rootFS->GetFile()->GetName()));
return nullptr;
}

if (mParentFile) {
// Is this still the corresponding to the correct file?
if (parentFileName->GetString().CompareTo(mParentFile->mcurrentFile->GetName()) == 0) {
auto parentRootFS = std::dynamic_pointer_cast<TFileFileSystem>(mParentFile->mCurrentFilesystem);
if (parentFileName->GetString().CompareTo(parentRootFS->GetFile()->GetName()) == 0) {
return mParentFile;
} else {
mParentFile->closeInputFile();
Expand All @@ -241,7 +269,8 @@ DataInputDescriptor* DataInputDescriptor::getParentFile(int counter, int numTF,
}

if (mLevel == mAllowedParentLevel) {
throw std::runtime_error(fmt::format(R"(while looking for tree "{}", the parent file was requested but we are already at level {} of maximal allowed level {} for DF "{}" in file "{}")", treename.c_str(), mLevel, mAllowedParentLevel, folderName.c_str(), mcurrentFile->GetName()));
throw std::runtime_error(fmt::format(R"(while looking for tree "{}", the parent file was requested but we are already at level {} of maximal allowed level {} for DF "{}" in file "{}")", treename.c_str(), mLevel, mAllowedParentLevel, folderName.c_str(),
rootFS->GetFile()->GetName()));
}

LOGP(info, "Opening parent file {} for DF {}", parentFileName->GetString().Data(), folderName.c_str());
Expand Down Expand Up @@ -270,11 +299,13 @@ void DataInputDescriptor::printFileStatistics()
if (wait_time < 0) {
wait_time = 0;
}
std::string monitoringInfo(fmt::format("lfn={},size={},total_df={},read_df={},read_bytes={},read_calls={},io_time={:.1f},wait_time={:.1f},level={}", mcurrentFile->GetName(),
mcurrentFile->GetSize(), getTimeFramesInFile(mCurrentFileID), getReadTimeFramesInFile(mCurrentFileID), mcurrentFile->GetBytesRead(), mcurrentFile->GetReadCalls(),
auto rootFS = std::dynamic_pointer_cast<TFileFileSystem>(mCurrentFilesystem);
auto f = dynamic_cast<TFile*>(rootFS->GetFile());
std::string monitoringInfo(fmt::format("lfn={},size={},total_df={},read_df={},read_bytes={},read_calls={},io_time={:.1f},wait_time={:.1f},level={}", f->GetName(),
f->GetSize(), getTimeFramesInFile(mCurrentFileID), getReadTimeFramesInFile(mCurrentFileID), f->GetBytesRead(), f->GetReadCalls(),
((float)mIOTime / 1e9), ((float)wait_time / 1e9), mLevel));
#if __has_include(<TJAlienFile.h>)
auto alienFile = dynamic_cast<TJAlienFile*>(mcurrentFile);
auto alienFile = dynamic_cast<TJAlienFile*>(f);
if (alienFile) {
monitoringInfo += fmt::format(",se={},open_time={:.1f}", alienFile->GetSE(), alienFile->GetElapsed());
}
Expand All @@ -285,7 +316,7 @@ void DataInputDescriptor::printFileStatistics()

void DataInputDescriptor::closeInputFile()
{
if (mcurrentFile) {
if (mCurrentFilesystem.get()) {
if (mParentFile) {
mParentFile->closeInputFile();
delete mParentFile;
Expand All @@ -296,9 +327,7 @@ void DataInputDescriptor::closeInputFile()
mParentFileMap = nullptr;

printFileStatistics();
mcurrentFile->Close();
delete mcurrentFile;
mcurrentFile = nullptr;
mCurrentFilesystem.reset();
}
}

Expand Down Expand Up @@ -346,8 +375,8 @@ int DataInputDescriptor::fillInputfiles()

int DataInputDescriptor::findDFNumber(int file, std::string dfName)
{
auto dfList = mfilenames[file]->listOfTimeFrameKeys;
auto it = std::find(dfList.begin(), dfList.end(), dfName);
auto dfList = mfilenames[file]->listOfTimeFrameNumbers;
auto it = std::find_if(dfList.begin(), dfList.end(), [dfName](size_t i) { return fmt::format("DF_{}", i) == dfName; });
if (it == dfList.end()) {
return -1;
}
Expand All @@ -358,40 +387,67 @@ bool DataInputDescriptor::readTree(DataAllocator& outputs, header::DataHeader dh
{
auto ioStart = uv_hrtime();

auto fileAndFolder = getFileFolder(counter, numTF);
if (!fileAndFolder.file) {
auto folder = getFileFolder(counter, numTF);
if (!folder.filesystem()) {
return false;
}

auto fullpath = fileAndFolder.folderName + "/" + treename;
auto tree = (TTree*)fileAndFolder.file->Get(fullpath.c_str());
auto rootFS = std::dynamic_pointer_cast<TFileFileSystem>(folder.filesystem());

if (!rootFS) {
throw std::runtime_error(fmt::format(R"(Not a TFile filesystem!)"));
}
// FIXME: Ugly. We should detect the format from the treename, good enough for now.
std::shared_ptr<arrow::dataset::FileFormat> format;

auto fullpath = arrow::dataset::FileSource{folder.path() + "/" + treename, folder.filesystem()};

for (auto& capability : mFactory.capabilities) {
auto objectPath = capability.lfn2objectPath(fullpath.path());
void* handle = capability.getHandle(rootFS->GetFile(), objectPath);
if (handle) {
format = capability.factory().format();
break;
}
}

if (!tree) {
LOGP(debug, "Could not find tree {}. Trying in parent file.", fullpath.c_str());
if (!format) {
throw std::runtime_error(fmt::format(R"(Cannot find a viable format for object {}!)", fullpath.path()));
}

auto schemaOpt = format->Inspect(fullpath);
auto schema = *schemaOpt;

auto fragment = format->MakeFragment(fullpath, {}, schema);

if (!fragment.ok()) {
LOGP(debug, "Could not find tree {}. Trying in parent file.", fullpath.path());
auto parentFile = getParentFile(counter, numTF, treename);
if (parentFile != nullptr) {
int parentNumTF = parentFile->findDFNumber(0, fileAndFolder.folderName);
int parentNumTF = parentFile->findDFNumber(0, folder.path());
if (parentNumTF == -1) {
throw std::runtime_error(fmt::format(R"(DF {} listed in parent file map but not found in the corresponding file "{}")", fileAndFolder.folderName, parentFile->mcurrentFile->GetName()));
auto parentRootFS = std::dynamic_pointer_cast<TFileFileSystem>(parentFile->mCurrentFilesystem);
throw std::runtime_error(fmt::format(R"(DF {} listed in parent file map but not found in the corresponding file "{}")", folder.path(), parentRootFS->GetFile()->GetName()));
}
// first argument is 0 as the parent file object contains only 1 file
return parentFile->readTree(outputs, dh, 0, parentNumTF, treename, totalSizeCompressed, totalSizeUncompressed);
}
throw std::runtime_error(fmt::format(R"(Couldn't get TTree "{}" from "{}". Please check https://aliceo2group.github.io/analysis-framework/docs/troubleshooting/#tree-not-found for more information.)", fileAndFolder.folderName + "/" + treename, fileAndFolder.file->GetName()));
auto rootFS = std::dynamic_pointer_cast<TFileFileSystem>(mCurrentFilesystem);
throw std::runtime_error(fmt::format(R"(Couldn't get TTree "{}" from "{}". Please check https://aliceo2group.github.io/analysis-framework/docs/troubleshooting/#tree-not-found for more information.)", fullpath.path(), rootFS->GetFile()->GetName()));
}

// create table output
auto o = Output(dh);
auto t2t = outputs.make<TreeToTable>(o);

// add branches to read
// fill the table
t2t->setLabel(tree->GetName());
totalSizeCompressed += tree->GetZipBytes();
totalSizeUncompressed += tree->GetTotBytes();
t2t->addAllColumns(tree);
t2t->fill(tree);
delete tree;
// FIXME: This should allow me to create a memory pool
// which I can then use to scan the dataset.
//
auto f2b = outputs.make<FragmentToBatch>(o);

//// add branches to read
//// fill the table
f2b->setLabel(treename.c_str());
f2b->fill(*fragment, schema, format);

mIOTime += (uv_hrtime() - ioStart);

Expand Down Expand Up @@ -693,7 +749,7 @@ DataInputDescriptor* DataInputDirector::getDataInputDescriptor(header::DataHeade
return result;
}

FileAndFolder DataInputDirector::getFileFolder(header::DataHeader dh, int counter, int numTF)
arrow::dataset::FileSource DataInputDirector::getFileFolder(header::DataHeader dh, int counter, int numTF)
{
auto didesc = getDataInputDescriptor(dh);
// if NOT match then use defaultDataInputDescriptor
Expand Down
Loading

0 comments on commit 84ca389

Please sign in to comment.