Skip to content

Commit

Permalink
shm-metadata-msg-size parameter mapped with simple workflow to test i…
Browse files Browse the repository at this point in the history
…t's values
  • Loading branch information
Michal Tichák committed Feb 29, 2024
1 parent 144a9aa commit 2a25cf3
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 0 deletions.
2 changes: 2 additions & 0 deletions Framework/Core/src/DeviceSpecHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -1492,6 +1492,7 @@ void DeviceSpecHelpers::prepareArguments(bool defaultQuiet, bool defaultStopped,
realOdesc.add_options()("shm-allocation", bpo::value<std::string>());
realOdesc.add_options()("shm-no-cleanup", bpo::value<std::string>());
realOdesc.add_options()("shmid", bpo::value<std::string>());
realOdesc.add_options()("shm-metadata-msg-size", bpo::value<std::string>());
realOdesc.add_options()("shm-monitor", bpo::value<std::string>());
realOdesc.add_options()("channel-prefix", bpo::value<std::string>());
realOdesc.add_options()("network-interface", bpo::value<std::string>());
Expand Down Expand Up @@ -1674,6 +1675,7 @@ boost::program_options::options_description DeviceSpecHelpers::getForwardedDevic
("shm-allocation", bpo::value<std::string>()->default_value("rbtree_best_fit"), "shm allocation method") //
("shm-no-cleanup", bpo::value<std::string>()->default_value("false"), "no shm cleanup") //
("shmid", bpo::value<std::string>(), "shmid") //
("shm-metadata-msg-size", bpo::value<std::string>()->default_value("0"), "numeric value in B used for padding FairMQ header, see FairMQ v.1.6.0") //
("environment", bpo::value<std::string>(), "comma separated list of environment variables to set for the device") //
("stacktrace-on-signal", bpo::value<std::string>()->default_value("simple"), //
"dump stacktrace on specified signal(s) (any of `all`, `segv`, `bus`, `ill`, `abrt`, `fpe`, `sys`.)" //
Expand Down
5 changes: 5 additions & 0 deletions Framework/TestWorkflows/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,11 @@ o2_add_dpl_workflow(test-ccdb-fetcher
PUBLIC_LINK_LIBRARIES O2::DataFormatsTOF O2::Framework
COMPONENT_NAME TestWorkflows)

o2_add_executable(fairmq-header-size-test
SOURCES src/o2FairMQHeaderSizeTest.cxx
PUBLIC_LINK_LIBRARIES O2::Framework
COMPONENT_NAME TestWorkflows)

add_executable(o2-test-deadlock src/o2DeadlockReproducer.cxx)
target_link_libraries(o2-test-deadlock PUBLIC FairMQ::FairMQ)
add_test(NAME o2-test-deadlock COMMAND o2-test-deadlock --channel-config "name=data,type=sub,method=bind,address=ipc://127.0.0.1,rateLogging=0" --id foo --transport zeromq --control static)
96 changes: 96 additions & 0 deletions Framework/TestWorkflows/src/o2FairMQHeaderSizeTest.cxx
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
// All rights not expressly granted are reserved.
//
// This software is distributed under the terms of the GNU General Public
// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
//
// In applying this license CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.

/// \file o2FairMQHeaderSizeTest.cxx
/// \brief Just a simple workflow to test how much messages can be stored internally,
/// when nothing is consumed. Used for tuning parameter shm-message-metadata-size.
///
/// \author Michal Tichak, michal.tichak@cern.ch

#include "Framework/ConfigParamSpec.h"
#include "Framework/ControlService.h"
#include "Framework/CallbackService.h"
#include "Framework/EndOfStreamContext.h"
#include "Framework/DeviceSpec.h"
#include "Framework/ControlService.h"
#include "Framework/runDataProcessing.h"

#include <chrono>
#include <thread>
#include <vector>
#include <random>

using namespace o2::framework;

static std::random_device rd;
static std::mt19937 gen(rd());

std::string random_string(size_t length)
{
static const char alphanum[] =
"0123456789"
"ABCDEFGHIJKLMNOPQRSTUVWXYZ"
"abcdefghijklmnopqrstuvwxyz";

std::uniform_int_distribution<> dis(0, sizeof(alphanum) - 2);

std::string randomString;
randomString.reserve(length);

for (int i = 0; i < length; ++i) {
randomString.push_back(alphanum[dis(gen)]);
}

return randomString;
}

std::string filename()
{
std::stringstream ss;
ss << "messages_count_" << random_string(10) << ".data";
return std::move(ss).str();
}

WorkflowSpec defineDataProcessing(ConfigContext const& specs)
{
return WorkflowSpec{
{"A",
Inputs{},
{OutputSpec{{"a"}, "TST", "A"}},
AlgorithmSpec{
[numberOfMessages = 0, filename = filename()](ProcessingContext& ctx) mutable {
using namespace std::chrono;
++numberOfMessages;
// LOG(info) << "Generating message #" << ++numberOfMessages;

{
auto file = std::ofstream(filename, std::ios_base::out | std::ios_base::trunc);
// file << duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count() << "," << numberOfMessages << "\n";
file << numberOfMessages;
}

auto& aData = ctx.outputs().make<int>(Output{"TST", "A", 0}, 1);
aData[0] = 1;
}}},
{"B",
{InputSpec{"x", "TST", "A"}},
Outputs{},
AlgorithmSpec{[](InitContext& ic) {
return [](ProcessingContext& ctx) {
while (true) {
std::this_thread::sleep_for(std::chrono::milliseconds{100});
}
// auto& data = ctx.inputs().get<int>("x");
// LOG(info) << "Reading message: " << data;
};
}}},
};
}

0 comments on commit 2a25cf3

Please sign in to comment.