From 2a25cf3de38ec78f144dddbb91359501fb6d9424 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michal=20Tich=C3=A1k?= Date: Mon, 19 Feb 2024 15:03:53 +0100 Subject: [PATCH] shm-metadata-msg-size parameter mapped with simple workflow to test it's values --- Framework/Core/src/DeviceSpecHelpers.cxx | 2 + Framework/TestWorkflows/CMakeLists.txt | 5 + .../src/o2FairMQHeaderSizeTest.cxx | 96 +++++++++++++++++++ 3 files changed, 103 insertions(+) create mode 100644 Framework/TestWorkflows/src/o2FairMQHeaderSizeTest.cxx diff --git a/Framework/Core/src/DeviceSpecHelpers.cxx b/Framework/Core/src/DeviceSpecHelpers.cxx index e9df320fe8b95..19c0e5ca92237 100644 --- a/Framework/Core/src/DeviceSpecHelpers.cxx +++ b/Framework/Core/src/DeviceSpecHelpers.cxx @@ -1492,6 +1492,7 @@ void DeviceSpecHelpers::prepareArguments(bool defaultQuiet, bool defaultStopped, realOdesc.add_options()("shm-allocation", bpo::value()); realOdesc.add_options()("shm-no-cleanup", bpo::value()); realOdesc.add_options()("shmid", bpo::value()); + realOdesc.add_options()("shm-metadata-msg-size", bpo::value()); realOdesc.add_options()("shm-monitor", bpo::value()); realOdesc.add_options()("channel-prefix", bpo::value()); realOdesc.add_options()("network-interface", bpo::value()); @@ -1674,6 +1675,7 @@ boost::program_options::options_description DeviceSpecHelpers::getForwardedDevic ("shm-allocation", bpo::value()->default_value("rbtree_best_fit"), "shm allocation method") // ("shm-no-cleanup", bpo::value()->default_value("false"), "no shm cleanup") // ("shmid", bpo::value(), "shmid") // + ("shm-metadata-msg-size", bpo::value()->default_value("0"), "numeric value in B used for padding FairMQ header, see FairMQ v.1.6.0") // ("environment", bpo::value(), "comma separated list of environment variables to set for the device") // ("stacktrace-on-signal", bpo::value()->default_value("simple"), // "dump stacktrace on specified signal(s) (any of `all`, `segv`, `bus`, `ill`, `abrt`, `fpe`, `sys`.)" // diff --git a/Framework/TestWorkflows/CMakeLists.txt b/Framework/TestWorkflows/CMakeLists.txt index c2c89e19b894e..6daac86276274 100644 --- a/Framework/TestWorkflows/CMakeLists.txt +++ b/Framework/TestWorkflows/CMakeLists.txt @@ -119,6 +119,11 @@ o2_add_dpl_workflow(test-ccdb-fetcher PUBLIC_LINK_LIBRARIES O2::DataFormatsTOF O2::Framework COMPONENT_NAME TestWorkflows) +o2_add_executable(fairmq-header-size-test + SOURCES src/o2FairMQHeaderSizeTest.cxx + PUBLIC_LINK_LIBRARIES O2::Framework + COMPONENT_NAME TestWorkflows) + add_executable(o2-test-deadlock src/o2DeadlockReproducer.cxx) target_link_libraries(o2-test-deadlock PUBLIC FairMQ::FairMQ) add_test(NAME o2-test-deadlock COMMAND o2-test-deadlock --channel-config "name=data,type=sub,method=bind,address=ipc://127.0.0.1,rateLogging=0" --id foo --transport zeromq --control static) diff --git a/Framework/TestWorkflows/src/o2FairMQHeaderSizeTest.cxx b/Framework/TestWorkflows/src/o2FairMQHeaderSizeTest.cxx new file mode 100644 index 0000000000000..7471da547297a --- /dev/null +++ b/Framework/TestWorkflows/src/o2FairMQHeaderSizeTest.cxx @@ -0,0 +1,96 @@ +// Copyright 2019-2020 CERN and copyright holders of ALICE O2. +// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders. +// All rights not expressly granted are reserved. +// +// This software is distributed under the terms of the GNU General Public +// License v3 (GPL Version 3), copied verbatim in the file "COPYING". +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +/// \file o2FairMQHeaderSizeTest.cxx +/// \brief Just a simple workflow to test how much messages can be stored internally, +/// when nothing is consumed. Used for tuning parameter shm-message-metadata-size. +/// +/// \author Michal Tichak, michal.tichak@cern.ch + +#include "Framework/ConfigParamSpec.h" +#include "Framework/ControlService.h" +#include "Framework/CallbackService.h" +#include "Framework/EndOfStreamContext.h" +#include "Framework/DeviceSpec.h" +#include "Framework/ControlService.h" +#include "Framework/runDataProcessing.h" + +#include +#include +#include +#include + +using namespace o2::framework; + +static std::random_device rd; +static std::mt19937 gen(rd()); + +std::string random_string(size_t length) +{ + static const char alphanum[] = + "0123456789" + "ABCDEFGHIJKLMNOPQRSTUVWXYZ" + "abcdefghijklmnopqrstuvwxyz"; + + std::uniform_int_distribution<> dis(0, sizeof(alphanum) - 2); + + std::string randomString; + randomString.reserve(length); + + for (int i = 0; i < length; ++i) { + randomString.push_back(alphanum[dis(gen)]); + } + + return randomString; +} + +std::string filename() +{ + std::stringstream ss; + ss << "messages_count_" << random_string(10) << ".data"; + return std::move(ss).str(); +} + +WorkflowSpec defineDataProcessing(ConfigContext const& specs) +{ + return WorkflowSpec{ + {"A", + Inputs{}, + {OutputSpec{{"a"}, "TST", "A"}}, + AlgorithmSpec{ + [numberOfMessages = 0, filename = filename()](ProcessingContext& ctx) mutable { + using namespace std::chrono; + ++numberOfMessages; + // LOG(info) << "Generating message #" << ++numberOfMessages; + + { + auto file = std::ofstream(filename, std::ios_base::out | std::ios_base::trunc); + // file << duration_cast(system_clock::now().time_since_epoch()).count() << "," << numberOfMessages << "\n"; + file << numberOfMessages; + } + + auto& aData = ctx.outputs().make(Output{"TST", "A", 0}, 1); + aData[0] = 1; + }}}, + {"B", + {InputSpec{"x", "TST", "A"}}, + Outputs{}, + AlgorithmSpec{[](InitContext& ic) { + return [](ProcessingContext& ctx) { + while (true) { + std::this_thread::sleep_for(std::chrono::milliseconds{100}); + } + // auto& data = ctx.inputs().get("x"); + // LOG(info) << "Reading message: " << data; + }; + }}}, + }; +}