diff --git a/Framework/Core/include/Framework/GuiCallbackContext.h b/Framework/Core/include/Framework/GuiCallbackContext.h index 9c0f21f3aec9c..1dbb6ec30e849 100644 --- a/Framework/Core/include/Framework/GuiCallbackContext.h +++ b/Framework/Core/include/Framework/GuiCallbackContext.h @@ -44,6 +44,7 @@ struct GuiCallbackContext { void* window = nullptr; bool* guiQuitRequested = nullptr; bool* allChildrenGone = nullptr; + bool* guiTimerExpired = nullptr; std::function callback; std::set renderers; }; diff --git a/Framework/Core/src/DriverServerContext.h b/Framework/Core/src/DriverServerContext.h index c49ab460e4c6a..4d25c47bd172b 100644 --- a/Framework/Core/src/DriverServerContext.h +++ b/Framework/Core/src/DriverServerContext.h @@ -52,6 +52,7 @@ struct DriverServerContext { /// The handle to the server component of the /// driver. uv_tcp_t serverHandle; + uv_async_t* asyncLogProcessing = nullptr; }; } // namespace o2::framework diff --git a/Framework/Core/src/runDataProcessing.cxx b/Framework/Core/src/runDataProcessing.cxx index 64c034ebb8533..6b424ee6a4261 100644 --- a/Framework/Core/src/runDataProcessing.cxx +++ b/Framework/Core/src/runDataProcessing.cxx @@ -176,6 +176,7 @@ std::vector gDeviceMetricsInfos; bpo::options_description gHiddenDeviceOptions("Hidden child options"); O2_DECLARE_DYNAMIC_LOG(driver); +O2_DECLARE_DYNAMIC_LOG(gui); void doBoostException(boost::exception& e, const char*); void doDPLException(o2::framework::RuntimeErrorRef& ref, char const*); @@ -409,14 +410,14 @@ void spawnRemoteDevice(uv_loop_t* loop, struct DeviceLogContext { int fd; int index; - uv_loop_t* loop; - std::vector* infos; + DriverServerContext* serverContext; }; void log_callback(uv_poll_t* handle, int status, int events) { + O2_SIGNPOST_ID_FROM_POINTER(sid, driver, handle->loop); auto* logContext = reinterpret_cast(handle->data); - std::vector* infos = logContext->infos; + std::vector* infos = logContext->serverContext->infos; DeviceInfo& info = infos->at(logContext->index); if (status < 0) { @@ -428,16 +429,26 @@ void log_callback(uv_poll_t* handle, int status, int events) if (events & UV_DISCONNECT) { info.active = false; } + O2_SIGNPOST_EVENT_EMIT(driver, sid, "loop", "log_callback invoked by poller for device %{xcode:pid}d which is %{public}s%{public}s", + info.pid, info.active ? "active" : "inactive", + info.active ? " and still has data to read." : "."); + if (info.active == false) { + uv_poll_stop(handle); + } + uv_async_send(logContext->serverContext->asyncLogProcessing); } void close_websocket(uv_handle_t* handle) { - LOG(debug) << "Handle is being closed"; + O2_SIGNPOST_ID_FROM_POINTER(sid, driver, handle->loop); + O2_SIGNPOST_EVENT_EMIT(driver, sid, "mainloop", "close_websocket"); delete (WSDPLHandler*)handle->data; } void websocket_callback(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) { + O2_SIGNPOST_ID_FROM_POINTER(sid, driver, stream->loop); + O2_SIGNPOST_EVENT_EMIT(driver, sid, "mainloop", "websocket_callback"); auto* handler = (WSDPLHandler*)stream->data; if (nread == 0) { return; @@ -481,6 +492,8 @@ static void my_alloc_cb(uv_handle_t*, size_t suggested_size, uv_buf_t* buf) /// A callback for the rest engine void ws_connect_callback(uv_stream_t* server, int status) { + O2_SIGNPOST_ID_FROM_POINTER(sid, driver, server->loop); + O2_SIGNPOST_EVENT_EMIT(driver, sid, "mainloop", "websocket_callback"); auto* serverContext = reinterpret_cast(server->data); if (status < 0) { LOGF(error, "New connection error %s\n", uv_strerror(status)); @@ -574,9 +587,8 @@ void handleSignals() } } -void handleChildrenStdio(uv_loop_t* loop, +void handleChildrenStdio(DriverServerContext* serverContext, std::string const& forwardedStdin, - std::vector& deviceInfos, std::vector& childFds, std::vector& handles) { @@ -586,7 +598,7 @@ void handleChildrenStdio(uv_loop_t* loop, auto* req = (uv_work_t*)malloc(sizeof(uv_work_t)); req->data = new StreamConfigContext{forwardedStdin, childstdin[1]}; - uv_queue_work(loop, req, stream_config, nullptr); + uv_queue_work(serverContext->loop, req, stream_config, nullptr); // Setting them to non-blocking to avoid haing the driver hang when // reading from child. @@ -596,16 +608,15 @@ void handleChildrenStdio(uv_loop_t* loop, } /// Add pollers for stdout and stderr - auto addPoller = [&handles, &deviceInfos, &loop](int index, int fd) { + auto addPoller = [&handles, &serverContext](int index, int fd) { auto* context = new DeviceLogContext{}; context->index = index; context->fd = fd; - context->loop = loop; - context->infos = &deviceInfos; + context->serverContext = serverContext; handles.push_back((uv_poll_t*)malloc(sizeof(uv_poll_t))); auto handle = handles.back(); handle->data = context; - uv_poll_init(loop, handle, fd); + uv_poll_init(serverContext->loop, handle, fd); uv_poll_start(handle, UV_READABLE, log_callback); }; @@ -810,7 +821,8 @@ void spawnDevice(uv_loop_t* loop, gDeviceMetricsInfos.emplace_back(DeviceMetricsInfo{}); } -void processChildrenOutput(DriverInfo& driverInfo, +void processChildrenOutput(uv_loop_t* loop, + DriverInfo& driverInfo, DeviceInfos& infos, DeviceSpecs const& specs, DeviceControls& controls) @@ -826,8 +838,8 @@ void processChildrenOutput(DriverInfo& driverInfo, std::match_results match; ParsedMetricMatch metricMatch; ParsedConfigMatch configMatch; - const std::string delimiter("\n"); + int processed = 0; for (size_t di = 0, de = infos.size(); di < de; ++di) { DeviceInfo& info = infos[di]; DeviceControl& control = controls[di]; @@ -837,6 +849,7 @@ void processChildrenOutput(DriverInfo& driverInfo, if (info.unprinted.empty()) { continue; } + processed++; O2_SIGNPOST_ID_FROM_POINTER(sid, driver, &info); O2_SIGNPOST_START(driver, sid, "bytes_processed", "bytes processed by %{xcode:pid}d", info.pid); @@ -846,7 +859,7 @@ void processChildrenOutput(DriverInfo& driverInfo, info.history.resize(info.historySize); info.historyLevel.resize(info.historySize); - while ((pos = s.find(delimiter)) != std::string::npos) { + while ((pos = s.find("\n")) != std::string::npos) { std::string_view token{s.substr(0, pos)}; auto logLevel = LogParsingHelpers::parseTokenLevel(token); @@ -882,13 +895,18 @@ void processChildrenOutput(DriverInfo& driverInfo, info.firstSevereError = token; } } - s.remove_prefix(pos + delimiter.length()); + // +1 is to skip the \n + s.remove_prefix(pos + 1); } size_t oldSize = info.unprinted.size(); info.unprinted = std::string(s); int64_t bytesProcessed = oldSize - info.unprinted.size(); O2_SIGNPOST_END(driver, sid, "bytes_processed", "bytes processed by %{xcode:network-size-in-bytes}" PRIi64, bytesProcessed); } + if (processed == 0) { + O2_SIGNPOST_ID_FROM_POINTER(lid, driver, loop); + O2_SIGNPOST_EVENT_EMIT(driver, lid, "mainloop", "processChildrenOutput invoked for nothing!"); + } } // Process all the sigchld which are pending @@ -1106,14 +1124,29 @@ void gui_callback(uv_timer_s* ctx) { auto* gui = reinterpret_cast(ctx->data); if (gui->plugin == nullptr) { + // The gui is not there. Why are we here? + O2_SIGNPOST_ID_FROM_POINTER(sid, driver, ctx->loop); + O2_SIGNPOST_EVENT_EMIT_ERROR(driver, sid, "gui", "GUI timer callback invoked without a GUI plugin."); + uv_timer_stop(ctx); return; } + *gui->guiTimerExpired = true; + static int counter = 0; + if ((counter++ % 6000) == 0) { + O2_SIGNPOST_ID_FROM_POINTER(sid, driver, ctx->loop); + O2_SIGNPOST_EVENT_EMIT(driver, sid, "gui", "The GUI callback got called %d times.", counter); + *gui->guiTimerExpired = false; + } + // One interval per GUI invocation, using the loop as anchor. + O2_SIGNPOST_ID_FROM_POINTER(sid, gui, ctx->loop); + O2_SIGNPOST_START(gui, sid, "gui", "gui_callback"); // New version which allows deferred closure of windows if (gui->plugin->supportsDeferredClose()) { // For now, there is nothing for which we want to defer the close // so if the flag is set, we simply exit if (*(gui->guiQuitRequested)) { + O2_SIGNPOST_END(gui, sid, "gui", "Quit requested by the GUI."); return; } void* draw_data = nullptr; @@ -1123,6 +1156,7 @@ void gui_callback(uv_timer_s* ctx) // if less than 15ms have passed reuse old frame if (frameLatency / 1000000 <= 15) { draw_data = gui->lastFrame; + O2_SIGNPOST_END(gui, sid, "gui", "Reusing old frame."); return; } // The result of the pollGUIPreRender is used to determine if we @@ -1148,6 +1182,7 @@ void gui_callback(uv_timer_s* ctx) if (frameLatency / 1000000 > 15) { if (!gui->plugin->pollGUIPreRender(gui->window, (float)frameLatency / 1000000000.0f)) { *(gui->guiQuitRequested) = true; + O2_SIGNPOST_END(gui, sid, "gui", "Reusing old frame."); return; } draw_data = gui->plugin->pollGUIRender(gui->callback); @@ -1163,6 +1198,7 @@ void gui_callback(uv_timer_s* ctx) gui->frameLast = frameStart; } } + O2_SIGNPOST_END(gui, sid, "gui", "Gui redrawn."); } /// Force single stepping of the children @@ -1429,12 +1465,14 @@ int runStateMachine(DataProcessorSpecs const& workflow, ServiceRegistryRef ref{serviceRegistry}; ref.registerService(ServiceRegistryHelpers::handleForService(devicesManager)); + bool guiTimerExpired = false; GuiCallbackContext guiContext; guiContext.plugin = debugGUI; guiContext.frameLast = uv_hrtime(); guiContext.frameLatency = &driverInfo.frameLatency; guiContext.frameCost = &driverInfo.frameCost; guiContext.guiQuitRequested = &guiQuitRequested; + guiContext.guiTimerExpired = &guiTimerExpired; // This is to make sure we can process metrics, commands, configuration // changes coming from websocket (or even via any standard uv_stream_t, I guess). @@ -1467,6 +1505,16 @@ int runStateMachine(DataProcessorSpecs const& workflow, metricDumpTimer.data = &serverContext; bool allChildrenGone = false; guiContext.allChildrenGone = &allChildrenGone; + O2_SIGNPOST_ID_FROM_POINTER(sid, driver, loop); + O2_SIGNPOST_START(driver, sid, "driver", "Starting driver loop"); + + // Async callback to process the output of the children, if needed. + serverContext.asyncLogProcessing = (uv_async_t*)malloc(sizeof(uv_async_t)); + serverContext.asyncLogProcessing->data = &serverContext; + uv_async_init(loop, serverContext.asyncLogProcessing, [](uv_async_t* handle) { + auto* context = (DriverServerContext*)handle->data; + processChildrenOutput(context->loop, *context->driver, *context->infos, *context->specs, *context->controls); + }); while (true) { // If control forced some transition on us, we push it to the queue. @@ -2046,7 +2094,7 @@ int runStateMachine(DataProcessorSpecs const& workflow, } } handleSignals(); - handleChildrenStdio(loop, forwardedStdin.str(), infos, childFds, pollHandles); + handleChildrenStdio(&serverContext, forwardedStdin.str(), childFds, pollHandles); for (auto& callback : postScheduleCallbacks) { callback(serviceRegistry, {varmap}); } @@ -2067,6 +2115,12 @@ int runStateMachine(DataProcessorSpecs const& workflow, // any, so that we do not consume CPU time when the driver is // idle. devicesManager->flush(); + // We print the event loop for the gui only once every + // 6000 iterations (i.e. ~2 minutes). To avoid spamming, while still + // being able to see the event loop in case of a deadlock / systematic failure. + if (guiTimerExpired == false) { + O2_SIGNPOST_EVENT_EMIT(driver, sid, "mainloop", "Entering event loop with %{public}s", once ? "UV_RUN_ONCE" : "UV_RUN_NOWAIT"); + } uv_run(loop, once ? UV_RUN_ONCE : UV_RUN_NOWAIT); once = true; // Calculate what we should do next and eventually @@ -2099,9 +2153,6 @@ int runStateMachine(DataProcessorSpecs const& workflow, } else { driverInfo.states.push_back(DriverState::RUNNING); } - { - processChildrenOutput(driverInfo, infos, runningWorkflow.devices, controls); - } break; case DriverState::QUIT_REQUESTED: LOG(info) << "QUIT_REQUESTED"; @@ -2135,7 +2186,7 @@ int runStateMachine(DataProcessorSpecs const& workflow, } sigchld_requested = false; driverInfo.sigchldRequested = false; - processChildrenOutput(driverInfo, infos, runningWorkflow.devices, controls); + processChildrenOutput(loop, driverInfo, infos, runningWorkflow.devices, controls); hasError = processSigChild(infos, runningWorkflow.devices); allChildrenGone = areAllChildrenGone(infos); bool canExit = checkIfCanExit(infos); @@ -2213,6 +2264,7 @@ int runStateMachine(DataProcessorSpecs const& workflow, driverInfo.states.push_back(DriverState::QUIT_REQUESTED); } } + O2_SIGNPOST_END(driver, sid, "driver", "End driver loop"); } // Print help