Skip to content

Commit

Permalink
DPL: improve processing of logs from devices
Browse files Browse the repository at this point in the history
- Log processing now happens in an asynchronous action which is performed only
when some logs are actually read. All logs are read in any case, because
libuv will coalesce those callbacks into one.
- Improved signposts to print out what the driver event loop is doing.
- Improved signposts to debug the GUI with a separate stream.
  • Loading branch information
ktf committed Mar 6, 2024
1 parent fb9ed5d commit 02d80f8
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 20 deletions.
1 change: 1 addition & 0 deletions Framework/Core/include/Framework/GuiCallbackContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ struct GuiCallbackContext {
void* window = nullptr;
bool* guiQuitRequested = nullptr;
bool* allChildrenGone = nullptr;
bool* guiTimerExpired = nullptr;
std::function<void(void)> callback;
std::set<GuiRenderer*> renderers;
};
Expand Down
1 change: 1 addition & 0 deletions Framework/Core/src/DriverServerContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ struct DriverServerContext {
/// The handle to the server component of the
/// driver.
uv_tcp_t serverHandle;
uv_async_t* asyncLogProcessing = nullptr;
};
} // namespace o2::framework

Expand Down
92 changes: 72 additions & 20 deletions Framework/Core/src/runDataProcessing.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ std::vector<DeviceMetricsInfo> gDeviceMetricsInfos;
bpo::options_description gHiddenDeviceOptions("Hidden child options");

O2_DECLARE_DYNAMIC_LOG(driver);
O2_DECLARE_DYNAMIC_LOG(gui);

void doBoostException(boost::exception& e, const char*);
void doDPLException(o2::framework::RuntimeErrorRef& ref, char const*);
Expand Down Expand Up @@ -409,14 +410,14 @@ void spawnRemoteDevice(uv_loop_t* loop,
struct DeviceLogContext {
int fd;
int index;
uv_loop_t* loop;
std::vector<DeviceInfo>* infos;
DriverServerContext* serverContext;
};

void log_callback(uv_poll_t* handle, int status, int events)
{
O2_SIGNPOST_ID_FROM_POINTER(sid, driver, handle->loop);
auto* logContext = reinterpret_cast<DeviceLogContext*>(handle->data);
std::vector<DeviceInfo>* infos = logContext->infos;
std::vector<DeviceInfo>* infos = logContext->serverContext->infos;
DeviceInfo& info = infos->at(logContext->index);

if (status < 0) {
Expand All @@ -428,16 +429,26 @@ void log_callback(uv_poll_t* handle, int status, int events)
if (events & UV_DISCONNECT) {
info.active = false;
}
O2_SIGNPOST_EVENT_EMIT(driver, sid, "loop", "log_callback invoked by poller for device %{xcode:pid}d which is %{public}s%{public}s",
info.pid, info.active ? "active" : "inactive",
info.active ? " and still has data to read." : ".");
if (info.active == false) {
uv_poll_stop(handle);
}
uv_async_send(logContext->serverContext->asyncLogProcessing);
}

void close_websocket(uv_handle_t* handle)
{
LOG(debug) << "Handle is being closed";
O2_SIGNPOST_ID_FROM_POINTER(sid, driver, handle->loop);
O2_SIGNPOST_EVENT_EMIT(driver, sid, "mainloop", "close_websocket");
delete (WSDPLHandler*)handle->data;
}

void websocket_callback(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf)
{
O2_SIGNPOST_ID_FROM_POINTER(sid, driver, stream->loop);
O2_SIGNPOST_EVENT_EMIT(driver, sid, "mainloop", "websocket_callback");
auto* handler = (WSDPLHandler*)stream->data;
if (nread == 0) {
return;
Expand Down Expand Up @@ -481,6 +492,8 @@ static void my_alloc_cb(uv_handle_t*, size_t suggested_size, uv_buf_t* buf)
/// A callback for the rest engine
void ws_connect_callback(uv_stream_t* server, int status)
{
O2_SIGNPOST_ID_FROM_POINTER(sid, driver, server->loop);
O2_SIGNPOST_EVENT_EMIT(driver, sid, "mainloop", "websocket_callback");
auto* serverContext = reinterpret_cast<DriverServerContext*>(server->data);
if (status < 0) {
LOGF(error, "New connection error %s\n", uv_strerror(status));
Expand Down Expand Up @@ -574,9 +587,8 @@ void handleSignals()
}
}

void handleChildrenStdio(uv_loop_t* loop,
void handleChildrenStdio(DriverServerContext* serverContext,
std::string const& forwardedStdin,
std::vector<DeviceInfo>& deviceInfos,
std::vector<DeviceStdioContext>& childFds,
std::vector<uv_poll_t*>& handles)
{
Expand All @@ -586,7 +598,7 @@ void handleChildrenStdio(uv_loop_t* loop,

auto* req = (uv_work_t*)malloc(sizeof(uv_work_t));
req->data = new StreamConfigContext{forwardedStdin, childstdin[1]};
uv_queue_work(loop, req, stream_config, nullptr);
uv_queue_work(serverContext->loop, req, stream_config, nullptr);

// Setting them to non-blocking to avoid haing the driver hang when
// reading from child.
Expand All @@ -596,16 +608,15 @@ void handleChildrenStdio(uv_loop_t* loop,
}

/// Add pollers for stdout and stderr
auto addPoller = [&handles, &deviceInfos, &loop](int index, int fd) {
auto addPoller = [&handles, &serverContext](int index, int fd) {
auto* context = new DeviceLogContext{};
context->index = index;
context->fd = fd;
context->loop = loop;
context->infos = &deviceInfos;
context->serverContext = serverContext;
handles.push_back((uv_poll_t*)malloc(sizeof(uv_poll_t)));
auto handle = handles.back();
handle->data = context;
uv_poll_init(loop, handle, fd);
uv_poll_init(serverContext->loop, handle, fd);
uv_poll_start(handle, UV_READABLE, log_callback);
};

Expand Down Expand Up @@ -810,7 +821,8 @@ void spawnDevice(uv_loop_t* loop,
gDeviceMetricsInfos.emplace_back(DeviceMetricsInfo{});
}

void processChildrenOutput(DriverInfo& driverInfo,
void processChildrenOutput(uv_loop_t* loop,
DriverInfo& driverInfo,
DeviceInfos& infos,
DeviceSpecs const& specs,
DeviceControls& controls)
Expand All @@ -826,8 +838,8 @@ void processChildrenOutput(DriverInfo& driverInfo,
std::match_results<std::string_view::const_iterator> match;
ParsedMetricMatch metricMatch;
ParsedConfigMatch configMatch;
const std::string delimiter("\n");

int processed = 0;
for (size_t di = 0, de = infos.size(); di < de; ++di) {
DeviceInfo& info = infos[di];
DeviceControl& control = controls[di];
Expand All @@ -837,6 +849,7 @@ void processChildrenOutput(DriverInfo& driverInfo,
if (info.unprinted.empty()) {
continue;
}
processed++;

O2_SIGNPOST_ID_FROM_POINTER(sid, driver, &info);
O2_SIGNPOST_START(driver, sid, "bytes_processed", "bytes processed by %{xcode:pid}d", info.pid);
Expand All @@ -846,7 +859,7 @@ void processChildrenOutput(DriverInfo& driverInfo,
info.history.resize(info.historySize);
info.historyLevel.resize(info.historySize);

while ((pos = s.find(delimiter)) != std::string::npos) {
while ((pos = s.find("\n")) != std::string::npos) {
std::string_view token{s.substr(0, pos)};
auto logLevel = LogParsingHelpers::parseTokenLevel(token);

Expand Down Expand Up @@ -882,13 +895,18 @@ void processChildrenOutput(DriverInfo& driverInfo,
info.firstSevereError = token;
}
}
s.remove_prefix(pos + delimiter.length());
// +1 is to skip the \n
s.remove_prefix(pos + 1);
}
size_t oldSize = info.unprinted.size();
info.unprinted = std::string(s);
int64_t bytesProcessed = oldSize - info.unprinted.size();
O2_SIGNPOST_END(driver, sid, "bytes_processed", "bytes processed by %{xcode:network-size-in-bytes}" PRIi64, bytesProcessed);
}
if (processed == 0) {
O2_SIGNPOST_ID_FROM_POINTER(lid, driver, loop);
O2_SIGNPOST_EVENT_EMIT(driver, lid, "mainloop", "processChildrenOutput invoked for nothing!");
}
}

// Process all the sigchld which are pending
Expand Down Expand Up @@ -1106,14 +1124,29 @@ void gui_callback(uv_timer_s* ctx)
{
auto* gui = reinterpret_cast<GuiCallbackContext*>(ctx->data);
if (gui->plugin == nullptr) {
// The gui is not there. Why are we here?
O2_SIGNPOST_ID_FROM_POINTER(sid, driver, ctx->loop);
O2_SIGNPOST_EVENT_EMIT_ERROR(driver, sid, "gui", "GUI timer callback invoked without a GUI plugin.");
uv_timer_stop(ctx);
return;
}
*gui->guiTimerExpired = true;
static int counter = 0;
if ((counter++ % 6000) == 0) {
O2_SIGNPOST_ID_FROM_POINTER(sid, driver, ctx->loop);
O2_SIGNPOST_EVENT_EMIT(driver, sid, "gui", "The GUI callback got called %d times.", counter);
*gui->guiTimerExpired = false;
}
// One interval per GUI invocation, using the loop as anchor.
O2_SIGNPOST_ID_FROM_POINTER(sid, gui, ctx->loop);
O2_SIGNPOST_START(gui, sid, "gui", "gui_callback");

// New version which allows deferred closure of windows
if (gui->plugin->supportsDeferredClose()) {
// For now, there is nothing for which we want to defer the close
// so if the flag is set, we simply exit
if (*(gui->guiQuitRequested)) {
O2_SIGNPOST_END(gui, sid, "gui", "Quit requested by the GUI.");
return;
}
void* draw_data = nullptr;
Expand All @@ -1123,6 +1156,7 @@ void gui_callback(uv_timer_s* ctx)
// if less than 15ms have passed reuse old frame
if (frameLatency / 1000000 <= 15) {
draw_data = gui->lastFrame;
O2_SIGNPOST_END(gui, sid, "gui", "Reusing old frame.");
return;
}
// The result of the pollGUIPreRender is used to determine if we
Expand All @@ -1148,6 +1182,7 @@ void gui_callback(uv_timer_s* ctx)
if (frameLatency / 1000000 > 15) {
if (!gui->plugin->pollGUIPreRender(gui->window, (float)frameLatency / 1000000000.0f)) {
*(gui->guiQuitRequested) = true;
O2_SIGNPOST_END(gui, sid, "gui", "Reusing old frame.");
return;
}
draw_data = gui->plugin->pollGUIRender(gui->callback);
Expand All @@ -1163,6 +1198,7 @@ void gui_callback(uv_timer_s* ctx)
gui->frameLast = frameStart;
}
}
O2_SIGNPOST_END(gui, sid, "gui", "Gui redrawn.");
}

/// Force single stepping of the children
Expand Down Expand Up @@ -1429,12 +1465,14 @@ int runStateMachine(DataProcessorSpecs const& workflow,
ServiceRegistryRef ref{serviceRegistry};
ref.registerService(ServiceRegistryHelpers::handleForService<DevicesManager>(devicesManager));

bool guiTimerExpired = false;
GuiCallbackContext guiContext;
guiContext.plugin = debugGUI;
guiContext.frameLast = uv_hrtime();
guiContext.frameLatency = &driverInfo.frameLatency;
guiContext.frameCost = &driverInfo.frameCost;
guiContext.guiQuitRequested = &guiQuitRequested;
guiContext.guiTimerExpired = &guiTimerExpired;

// This is to make sure we can process metrics, commands, configuration
// changes coming from websocket (or even via any standard uv_stream_t, I guess).
Expand Down Expand Up @@ -1467,6 +1505,16 @@ int runStateMachine(DataProcessorSpecs const& workflow,
metricDumpTimer.data = &serverContext;
bool allChildrenGone = false;
guiContext.allChildrenGone = &allChildrenGone;
O2_SIGNPOST_ID_FROM_POINTER(sid, driver, loop);
O2_SIGNPOST_START(driver, sid, "driver", "Starting driver loop");

// Async callback to process the output of the children, if needed.
serverContext.asyncLogProcessing = (uv_async_t*)malloc(sizeof(uv_async_t));
serverContext.asyncLogProcessing->data = &serverContext;
uv_async_init(loop, serverContext.asyncLogProcessing, [](uv_async_t* handle) {
auto* context = (DriverServerContext*)handle->data;
processChildrenOutput(context->loop, *context->driver, *context->infos, *context->specs, *context->controls);
});

while (true) {
// If control forced some transition on us, we push it to the queue.
Expand Down Expand Up @@ -2046,7 +2094,7 @@ int runStateMachine(DataProcessorSpecs const& workflow,
}
}
handleSignals();
handleChildrenStdio(loop, forwardedStdin.str(), infos, childFds, pollHandles);
handleChildrenStdio(&serverContext, forwardedStdin.str(), childFds, pollHandles);
for (auto& callback : postScheduleCallbacks) {
callback(serviceRegistry, {varmap});
}
Expand All @@ -2067,6 +2115,12 @@ int runStateMachine(DataProcessorSpecs const& workflow,
// any, so that we do not consume CPU time when the driver is
// idle.
devicesManager->flush();
// We print the event loop for the gui only once every
// 6000 iterations (i.e. ~2 minutes). To avoid spamming, while still
// being able to see the event loop in case of a deadlock / systematic failure.
if (guiTimerExpired == false) {
O2_SIGNPOST_EVENT_EMIT(driver, sid, "mainloop", "Entering event loop with %{public}s", once ? "UV_RUN_ONCE" : "UV_RUN_NOWAIT");
}
uv_run(loop, once ? UV_RUN_ONCE : UV_RUN_NOWAIT);
once = true;
// Calculate what we should do next and eventually
Expand Down Expand Up @@ -2099,9 +2153,6 @@ int runStateMachine(DataProcessorSpecs const& workflow,
} else {
driverInfo.states.push_back(DriverState::RUNNING);
}
{
processChildrenOutput(driverInfo, infos, runningWorkflow.devices, controls);
}
break;
case DriverState::QUIT_REQUESTED:
LOG(info) << "QUIT_REQUESTED";
Expand Down Expand Up @@ -2135,7 +2186,7 @@ int runStateMachine(DataProcessorSpecs const& workflow,
}
sigchld_requested = false;
driverInfo.sigchldRequested = false;
processChildrenOutput(driverInfo, infos, runningWorkflow.devices, controls);
processChildrenOutput(loop, driverInfo, infos, runningWorkflow.devices, controls);
hasError = processSigChild(infos, runningWorkflow.devices);
allChildrenGone = areAllChildrenGone(infos);
bool canExit = checkIfCanExit(infos);
Expand Down Expand Up @@ -2213,6 +2264,7 @@ int runStateMachine(DataProcessorSpecs const& workflow,
driverInfo.states.push_back(DriverState::QUIT_REQUESTED);
}
}
O2_SIGNPOST_END(driver, sid, "driver", "End driver loop");
}

// Print help
Expand Down

0 comments on commit 02d80f8

Please sign in to comment.