From 3085aae0c2b103e724b4f44f86ed98fd9a462947 Mon Sep 17 00:00:00 2001 From: Melissa Kilby Date: Tue, 1 Aug 2023 05:02:45 +0000 Subject: [PATCH] fix(outputs): expose queue_capacity_outputs config for memory control Signed-off-by: Melissa Kilby --- falco.yaml | 22 ++++++++++++++++++++ userspace/falco/app/actions/init_outputs.cpp | 2 ++ userspace/falco/configuration.cpp | 5 +++++ userspace/falco/configuration.h | 2 ++ userspace/falco/configuration_aux.h | 22 ++++++++++++++++++++ userspace/falco/falco_outputs.cpp | 20 +++++++++++++++--- userspace/falco/falco_outputs.h | 3 +++ userspace/falco/stats_writer.cpp | 3 +++ 8 files changed, 76 insertions(+), 3 deletions(-) create mode 100644 userspace/falco/configuration_aux.h diff --git a/falco.yaml b/falco.yaml index 0406c83f5ff..1899fe23220 100644 --- a/falco.yaml +++ b/falco.yaml @@ -39,6 +39,7 @@ # json_include_tags_property # buffered_outputs # outputs (throttling) +# queue_capacity_outputs # Falco outputs channels # stdout_output # syslog_output @@ -304,6 +305,27 @@ outputs: # defined. rule_matching: first +# [Experimental] `queue_capacity_outputs` +# +# Falco utilizes tbb::concurrent_bounded_queue for the outputs, and this parameter +# allows you to customize the capacity. Refer to the official documentation: +# https://oneapi-src.github.io/oneTBB/main/tbb_userguide/Concurrent_Queue_Classes.html. +# On a healthy system with tuned Falco rules, the queue should not fill up. +# If it does, it most likely happens if the entire event flow is too slow. This +# could indicate that the server is under heavy load. +# +# Lowering the number of items can prevent steadily increasing memory until the OOM +# killer stops the Falco process. We expose recovery actions to self-limit or self +# OOM kill earlier similar to how we expose the kernel buffer size as parameter. +# However, it will not address the root cause of the event pipe not holding up. +queue_capacity_outputs: + # number of max items in queue + items: 1000000 + # continue: 0 (default) + # exit: 1 + # empty queue then continue: 2 + recovery: 0 + ########################## # Falco outputs channels # diff --git a/userspace/falco/app/actions/init_outputs.cpp b/userspace/falco/app/actions/init_outputs.cpp index b4a60751871..13ce1460229 100644 --- a/userspace/falco/app/actions/init_outputs.cpp +++ b/userspace/falco/app/actions/init_outputs.cpp @@ -63,6 +63,8 @@ falco::app::run_result falco::app::actions::init_outputs(falco::app::state& s) s.config->m_json_include_tags_property, s.config->m_output_timeout, s.config->m_buffered_outputs, + s.config->m_queue_capacity_outputs_items, + s.config->m_queue_capacity_outputs_recovery, s.config->m_time_format_iso_8601, hostname)); diff --git a/userspace/falco/configuration.cpp b/userspace/falco/configuration.cpp index a059d321cbf..d94ccf77486 100644 --- a/userspace/falco/configuration.cpp +++ b/userspace/falco/configuration.cpp @@ -28,6 +28,7 @@ limitations under the License. #include "falco_utils.h" #include "configuration.h" +#include "configuration_aux.h" #include "logger.h" #include "banned.h" // This raises a compilation error when certain functions are used @@ -40,6 +41,8 @@ falco_configuration::falco_configuration(): m_watch_config_files(true), m_rule_matching(falco_common::rule_matching::FIRST), m_buffered_outputs(false), + m_queue_capacity_outputs_items(DEFAULT_ITEMS_QUEUE_CAPAXITY_OUTPUTS), + m_queue_capacity_outputs_recovery(RECOVERY_DROP_CURRENT), m_time_format_iso_8601(false), m_output_timeout(2000), m_grpc_enabled(false), @@ -258,6 +261,8 @@ void falco_configuration::load_yaml(const std::string& config_name, const yaml_h } m_buffered_outputs = config.get_scalar("buffered_outputs", false); + m_queue_capacity_outputs_items = config.get_scalar("queue_capacity_outputs.items", DEFAULT_ITEMS_QUEUE_CAPAXITY_OUTPUTS); + m_queue_capacity_outputs_recovery = config.get_scalar("queue_capacity_outputs.recovery", RECOVERY_DROP_CURRENT); m_time_format_iso_8601 = config.get_scalar("time_format_iso_8601", false); falco_logger::log_stderr = config.get_scalar("log_stderr", false); diff --git a/userspace/falco/configuration.h b/userspace/falco/configuration.h index 1b3bf7f1a3e..1f0579c2aaa 100644 --- a/userspace/falco/configuration.h +++ b/userspace/falco/configuration.h @@ -72,6 +72,8 @@ class falco_configuration bool m_watch_config_files; bool m_buffered_outputs; + size_t m_queue_capacity_outputs_items; + uint32_t m_queue_capacity_outputs_recovery; bool m_time_format_iso_8601; uint32_t m_output_timeout; diff --git a/userspace/falco/configuration_aux.h b/userspace/falco/configuration_aux.h new file mode 100644 index 00000000000..eb9344a6d31 --- /dev/null +++ b/userspace/falco/configuration_aux.h @@ -0,0 +1,22 @@ +/* +Copyright (C) 2023 The Falco Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +#pragma once + +#define DEFAULT_ITEMS_QUEUE_CAPAXITY_OUTPUTS 1000000UL + +enum outputs_recovery_code { + RECOVERY_DROP_CURRENT = 0, /* queue_capacity_outputs recovery strategy of continuing on. */ + RECOVERY_EXIT = 1, /* queue_capacity_outputs recovery strategy of exiting, self OOM kill. */ + RECOVERY_EMPTY = 2, /* queue_capacity_outputs recovery strategy of emptying queue then continuing. */ +}; diff --git a/userspace/falco/falco_outputs.cpp b/userspace/falco/falco_outputs.cpp index a862a7073d7..de1610b35e9 100644 --- a/userspace/falco/falco_outputs.cpp +++ b/userspace/falco/falco_outputs.cpp @@ -19,8 +19,8 @@ limitations under the License. #endif #include "falco_outputs.h" - #include "config_falco.h" +#include "configuration_aux.h" #include "formats.h" #include "logger.h" @@ -47,6 +47,8 @@ falco_outputs::falco_outputs( bool json_include_tags_property, uint32_t timeout, bool buffered, + size_t queue_capacity_outputs_items, + uint32_t queue_capacity_outputs_recovery, bool time_format_iso_8601, const std::string& hostname) { @@ -66,6 +68,8 @@ falco_outputs::falco_outputs( } m_worker_thread = std::thread(&falco_outputs::worker, this); + m_queue.set_capacity(queue_capacity_outputs_items); + m_recovery = queue_capacity_outputs_recovery; } falco_outputs::~falco_outputs() @@ -268,8 +272,18 @@ inline void falco_outputs::push(const ctrl_msg& cmsg) { if (!m_queue.try_push(cmsg)) { - fprintf(stderr, "Fatal error: Output queue reached maximum capacity. Exiting.\n"); - exit(EXIT_FAILURE); + switch (m_recovery) + { + case RECOVERY_EXIT: + fprintf(stderr, "Fatal error: Output queue reached maximum capacity. Exiting ... \n"); + exit(EXIT_FAILURE); + case RECOVERY_EMPTY: + fprintf(stderr, "Output queue reached maximum capacity. Empty queue and continue ... \n"); + m_queue.empty(); + default: + fprintf(stderr, "Output queue reached maximum capacity. Continue on ... \n"); + break; + } } } diff --git a/userspace/falco/falco_outputs.h b/userspace/falco/falco_outputs.h index c51726f9f64..cbee55d09fe 100644 --- a/userspace/falco/falco_outputs.h +++ b/userspace/falco/falco_outputs.h @@ -46,6 +46,8 @@ class falco_outputs bool json_include_tags_property, uint32_t timeout, bool buffered, + size_t queue_capacity_outputs_items, + uint32_t queue_capacity_outputs_recovery, bool time_format_iso_8601, const std::string& hostname); @@ -108,6 +110,7 @@ class falco_outputs typedef tbb::concurrent_bounded_queue falco_outputs_cbq; falco_outputs_cbq m_queue; + uint32_t m_recovery; std::thread m_worker_thread; inline void push(const ctrl_msg& cmsg); diff --git a/userspace/falco/stats_writer.cpp b/userspace/falco/stats_writer.cpp index e055d347563..4e8648dd99e 100644 --- a/userspace/falco/stats_writer.cpp +++ b/userspace/falco/stats_writer.cpp @@ -84,6 +84,9 @@ stats_writer::stats_writer( : m_initialized(false), m_total_samples(0) { m_config = config; + // capacity and controls should not be relevant for stats outputs, adopt capacity + // for completeness, but do not implement config recovery strategies. + m_queue.set_capacity(config->m_queue_capacity_outputs_items); if (config->m_metrics_enabled) { if (!config->m_metrics_output_file.empty())