Skip to content

Commit

Permalink
DPL: hide more stuff from runDataProcessing.h
Browse files Browse the repository at this point in the history
This avoids having a large mainNoCatch duplicated in each executable.
  • Loading branch information
ktf committed Jan 9, 2025
1 parent 43223a4 commit 5269f06
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 23 deletions.
41 changes: 18 additions & 23 deletions Framework/Core/include/Framework/runDataProcessing.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#define FRAMEWORK_RUN_DATA_PROCESSING_H

#include <fmt/format.h>
#include "Framework/ConfigParamSpec.h"
#include "Framework/ChannelConfigurationPolicy.h"
#include "Framework/CallbacksPolicy.h"
#include "Framework/CompletionPolicy.h"
Expand All @@ -22,17 +23,13 @@
#include "Framework/SendingPolicy.h"
#include "Framework/WorkflowSpec.h"
#include "Framework/ConfigContext.h"
#include "Framework/BoostOptionsRetriever.h"
#include "Framework/CustomWorkflowTerminationHook.h"
#include "Framework/CommonServices.h"
#include "Framework/WorkflowCustomizationHelpers.h"
#include "Framework/ResourcePolicyHelpers.h"
#include "Framework/Logger.h"
#include "Framework/CheckTypes.h"
#include "Framework/StructToTuple.h"
#include "Framework/ConfigParamDiscovery.h"
#include "ResourcePolicy.h"
#include "ServiceRegistryRef.h"
#include <vector>

namespace o2::framework
Expand Down Expand Up @@ -120,7 +117,9 @@ struct UserCustomizationsHelper {
namespace o2::framework
{
class ConfigContext;
}
class ConfigParamRegistry;
class ConfigParamSpec;
} // namespace o2::framework
/// Helper used to customize a workflow pipelining options
void overridePipeline(o2::framework::ConfigContext& ctx, std::vector<o2::framework::DataProcessorSpec>& workflow);

Expand Down Expand Up @@ -155,10 +154,18 @@ std::vector<T> injectCustomizations()
return policies;
}

void overrideAll(o2::framework::ConfigContext& ctx, std::vector<o2::framework::DataProcessorSpec>& workflow);

o2::framework::ConfigContext createConfigContext(std::unique_ptr<o2::framework::ConfigParamRegistry>& workflowOptionsRegistry,
o2::framework::ServiceRegistry& configRegistry,
std::vector<o2::framework::ConfigParamSpec>& workflowOptions,
std::vector<o2::framework::ConfigParamSpec>& extraOptions, int argc, char** argv);

std::unique_ptr<o2::framework::ServiceRegistry> createRegistry();

int mainNoCatch(int argc, char** argv)
{
using namespace o2::framework;
using namespace boost::program_options;

std::vector<o2::framework::ConfigParamSpec> workflowOptions;
UserCustomizationsHelper::userDefinedCustomization(workflowOptions);
Expand All @@ -171,24 +178,13 @@ int mainNoCatch(int argc, char** argv)
std::vector<CallbacksPolicy> callbacksPolicies = injectCustomizations<CallbacksPolicy>();
std::vector<SendingPolicy> sendingPolicies = injectCustomizations<SendingPolicy>();

std::vector<std::unique_ptr<ParamRetriever>> retrievers;
std::unique_ptr<ParamRetriever> retriever{new BoostOptionsRetriever(true, argc, argv)};
retrievers.emplace_back(std::move(retriever));
auto workflowOptionsStore = std::make_unique<ConfigParamStore>(workflowOptions, std::move(retrievers));
workflowOptionsStore->preload();
workflowOptionsStore->activate();
ConfigParamRegistry workflowOptionsRegistry(std::move(workflowOptionsStore));
auto extraOptions = o2::framework::ConfigParamDiscovery::discover(workflowOptionsRegistry, argc, argv);
for (auto& extra : extraOptions) {
workflowOptions.push_back(extra);
}
std::unique_ptr<ServiceRegistry> configRegistry = createRegistry();
std::vector<ConfigParamSpec> extraOptions;
std::unique_ptr<ConfigParamRegistry> workflowOptionsRegistry{nullptr};
auto configContext = createConfigContext(workflowOptionsRegistry, *configRegistry, workflowOptions, extraOptions, argc, argv);

ServiceRegistry configRegistry;
ConfigContext configContext(workflowOptionsRegistry, ServiceRegistryRef{configRegistry}, argc, argv);
o2::framework::WorkflowSpec specs = defineDataProcessing(configContext);
overrideCloning(configContext, specs);
overridePipeline(configContext, specs);
overrideLabels(configContext, specs);
overrideAll(configContext, specs);
for (auto& spec : specs) {
UserCustomizationsHelper::userDefinedCustomization(spec.requiredServices);
}
Expand All @@ -207,7 +203,6 @@ char* getIdString(int argc, char** argv);
int main(int argc, char** argv)
{
using namespace o2::framework;
using namespace boost::program_options;

int result = callMain(argc, argv, mainNoCatch);

Expand Down
34 changes: 34 additions & 0 deletions Framework/Core/src/runDataProcessing.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
// In applying this license CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.
#include <memory>
#define BOOST_BIND_GLOBAL_PLACEHOLDERS
#include <stdexcept>
#include "Framework/BoostOptionsRetriever.h"
Expand Down Expand Up @@ -69,6 +70,7 @@
#include "HTTPParser.h"
#include "DPLWebSocket.h"
#include "ArrowSupport.h"
#include "Framework/ConfigParamDiscovery.h"

#include "ComputingResourceHelpers.h"
#include "DataProcessingStatus.h"
Expand Down Expand Up @@ -2806,6 +2808,38 @@ void enableSignposts(std::string const& signpostsToEnable)
}
}

void overrideAll(o2::framework::ConfigContext& ctx, std::vector<o2::framework::DataProcessorSpec>& workflow)
{
overrideCloning(ctx, workflow);
overridePipeline(ctx, workflow);
overrideLabels(ctx, workflow);
}

o2::framework::ConfigContext createConfigContext(std::unique_ptr<ConfigParamRegistry>& workflowOptionsRegistry,
o2::framework::ServiceRegistry& configRegistry,
std::vector<o2::framework::ConfigParamSpec>& workflowOptions,
std::vector<o2::framework::ConfigParamSpec>& extraOptions, int argc, char** argv)
{
std::vector<std::unique_ptr<o2::framework::ParamRetriever>> retrievers;
std::unique_ptr<o2::framework::ParamRetriever> retriever{new o2::framework::BoostOptionsRetriever(true, argc, argv)};
retrievers.emplace_back(std::move(retriever));
auto workflowOptionsStore = std::make_unique<o2::framework::ConfigParamStore>(workflowOptions, std::move(retrievers));
workflowOptionsStore->preload();
workflowOptionsStore->activate();
workflowOptionsRegistry = std::make_unique<ConfigParamRegistry>(std::move(workflowOptionsStore));
extraOptions = o2::framework::ConfigParamDiscovery::discover(*workflowOptionsRegistry, argc, argv);
for (auto& extra : extraOptions) {
workflowOptions.push_back(extra);
}

return o2::framework::ConfigContext(*workflowOptionsRegistry, o2::framework::ServiceRegistryRef{configRegistry}, argc, argv);
}

std::unique_ptr<o2::framework::ServiceRegistry> createRegistry()
{
return std::make_unique<o2::framework::ServiceRegistry>();
}

// This is a toy executor for the workflow spec
// What it needs to do is:
//
Expand Down

0 comments on commit 5269f06

Please sign in to comment.