Skip to content

Commit

Permalink
fix(outputs): expose queue_capacity_outputs config for memory control
Browse files Browse the repository at this point in the history
Signed-off-by: Melissa Kilby <melissa.kilby.oss@gmail.com>
  • Loading branch information
incertum committed Aug 23, 2023
1 parent bc12e56 commit 3085aae
Show file tree
Hide file tree
Showing 8 changed files with 76 additions and 3 deletions.
22 changes: 22 additions & 0 deletions falco.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
# json_include_tags_property
# buffered_outputs
# outputs (throttling)
# queue_capacity_outputs
# Falco outputs channels
# stdout_output
# syslog_output
Expand Down Expand Up @@ -304,6 +305,27 @@ outputs:
# defined.
rule_matching: first

# [Experimental] `queue_capacity_outputs`
#
# Falco utilizes tbb::concurrent_bounded_queue for the outputs, and this parameter
# allows you to customize the capacity. Refer to the official documentation:
# https://oneapi-src.github.io/oneTBB/main/tbb_userguide/Concurrent_Queue_Classes.html.
# On a healthy system with tuned Falco rules, the queue should not fill up.
# If it does, it most likely happens if the entire event flow is too slow. This
# could indicate that the server is under heavy load.
#
# Lowering the number of items can prevent steadily increasing memory until the OOM
# killer stops the Falco process. We expose recovery actions to self-limit or self
# OOM kill earlier similar to how we expose the kernel buffer size as parameter.
# However, it will not address the root cause of the event pipe not holding up.
queue_capacity_outputs:
# number of max items in queue
items: 1000000
# continue: 0 (default)
# exit: 1
# empty queue then continue: 2
recovery: 0


##########################
# Falco outputs channels #
Expand Down
2 changes: 2 additions & 0 deletions userspace/falco/app/actions/init_outputs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ falco::app::run_result falco::app::actions::init_outputs(falco::app::state& s)
s.config->m_json_include_tags_property,
s.config->m_output_timeout,
s.config->m_buffered_outputs,
s.config->m_queue_capacity_outputs_items,
s.config->m_queue_capacity_outputs_recovery,
s.config->m_time_format_iso_8601,
hostname));

Expand Down
5 changes: 5 additions & 0 deletions userspace/falco/configuration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ limitations under the License.
#include "falco_utils.h"

#include "configuration.h"
#include "configuration_aux.h"
#include "logger.h"
#include "banned.h" // This raises a compilation error when certain functions are used

Expand All @@ -40,6 +41,8 @@ falco_configuration::falco_configuration():
m_watch_config_files(true),
m_rule_matching(falco_common::rule_matching::FIRST),
m_buffered_outputs(false),
m_queue_capacity_outputs_items(DEFAULT_ITEMS_QUEUE_CAPAXITY_OUTPUTS),
m_queue_capacity_outputs_recovery(RECOVERY_DROP_CURRENT),
m_time_format_iso_8601(false),
m_output_timeout(2000),
m_grpc_enabled(false),
Expand Down Expand Up @@ -258,6 +261,8 @@ void falco_configuration::load_yaml(const std::string& config_name, const yaml_h
}

m_buffered_outputs = config.get_scalar<bool>("buffered_outputs", false);
m_queue_capacity_outputs_items = config.get_scalar<size_t>("queue_capacity_outputs.items", DEFAULT_ITEMS_QUEUE_CAPAXITY_OUTPUTS);
m_queue_capacity_outputs_recovery = config.get_scalar<uint32_t>("queue_capacity_outputs.recovery", RECOVERY_DROP_CURRENT);
m_time_format_iso_8601 = config.get_scalar<bool>("time_format_iso_8601", false);

falco_logger::log_stderr = config.get_scalar<bool>("log_stderr", false);
Expand Down
2 changes: 2 additions & 0 deletions userspace/falco/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ class falco_configuration

bool m_watch_config_files;
bool m_buffered_outputs;
size_t m_queue_capacity_outputs_items;
uint32_t m_queue_capacity_outputs_recovery;
bool m_time_format_iso_8601;
uint32_t m_output_timeout;

Expand Down
22 changes: 22 additions & 0 deletions userspace/falco/configuration_aux.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
Copyright (C) 2023 The Falco Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

#pragma once

#define DEFAULT_ITEMS_QUEUE_CAPAXITY_OUTPUTS 1000000UL

enum outputs_recovery_code {
RECOVERY_DROP_CURRENT = 0, /* queue_capacity_outputs recovery strategy of continuing on. */
RECOVERY_EXIT = 1, /* queue_capacity_outputs recovery strategy of exiting, self OOM kill. */
RECOVERY_EMPTY = 2, /* queue_capacity_outputs recovery strategy of emptying queue then continuing. */
};
20 changes: 17 additions & 3 deletions userspace/falco/falco_outputs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ limitations under the License.
#endif

#include "falco_outputs.h"

#include "config_falco.h"
#include "configuration_aux.h"

#include "formats.h"
#include "logger.h"
Expand All @@ -47,6 +47,8 @@ falco_outputs::falco_outputs(
bool json_include_tags_property,
uint32_t timeout,
bool buffered,
size_t queue_capacity_outputs_items,
uint32_t queue_capacity_outputs_recovery,
bool time_format_iso_8601,
const std::string& hostname)
{
Expand All @@ -66,6 +68,8 @@ falco_outputs::falco_outputs(
}

m_worker_thread = std::thread(&falco_outputs::worker, this);
m_queue.set_capacity(queue_capacity_outputs_items);
m_recovery = queue_capacity_outputs_recovery;
}

falco_outputs::~falco_outputs()
Expand Down Expand Up @@ -268,8 +272,18 @@ inline void falco_outputs::push(const ctrl_msg& cmsg)
{
if (!m_queue.try_push(cmsg))
{
fprintf(stderr, "Fatal error: Output queue reached maximum capacity. Exiting.\n");
exit(EXIT_FAILURE);
switch (m_recovery)
{
case RECOVERY_EXIT:
fprintf(stderr, "Fatal error: Output queue reached maximum capacity. Exiting ... \n");
exit(EXIT_FAILURE);
case RECOVERY_EMPTY:
fprintf(stderr, "Output queue reached maximum capacity. Empty queue and continue ... \n");
m_queue.empty();
default:
fprintf(stderr, "Output queue reached maximum capacity. Continue on ... \n");
break;
}
}
}

Expand Down
3 changes: 3 additions & 0 deletions userspace/falco/falco_outputs.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ class falco_outputs
bool json_include_tags_property,
uint32_t timeout,
bool buffered,
size_t queue_capacity_outputs_items,
uint32_t queue_capacity_outputs_recovery,
bool time_format_iso_8601,
const std::string& hostname);

Expand Down Expand Up @@ -108,6 +110,7 @@ class falco_outputs
typedef tbb::concurrent_bounded_queue<ctrl_msg> falco_outputs_cbq;

falco_outputs_cbq m_queue;
uint32_t m_recovery;

std::thread m_worker_thread;
inline void push(const ctrl_msg& cmsg);
Expand Down
3 changes: 3 additions & 0 deletions userspace/falco/stats_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ stats_writer::stats_writer(
: m_initialized(false), m_total_samples(0)
{
m_config = config;
// capacity and controls should not be relevant for stats outputs, adopt capacity
// for completeness, but do not implement config recovery strategies.
m_queue.set_capacity(config->m_queue_capacity_outputs_items);
if (config->m_metrics_enabled)
{
if (!config->m_metrics_output_file.empty())
Expand Down

0 comments on commit 3085aae

Please sign in to comment.