Skip to content

Commit

Permalink
cleanup(outputs): adopt different style for outputs_queue params enco…
Browse files Browse the repository at this point in the history
…dings

Co-authored-by: Leonardo Grasso <me@leonardograsso.com>
Signed-off-by: Melissa Kilby <melissa.kilby.oss@gmail.com>
  • Loading branch information
incertum and leogr committed Aug 3, 2023
1 parent 66c6c99 commit 7924e36
Show file tree
Hide file tree
Showing 10 changed files with 61 additions and 51 deletions.
15 changes: 7 additions & 8 deletions falco.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -306,18 +306,17 @@ outputs:
# as a parameter.
# However, it will not address the root cause of the event pipe not keeping up.
#
# `items`: the maximum number of items allowed in the queue, defaulting to 0. This means that
# the queue is unbounded.
# `capacity`: the maximum number of items allowed in the queue, defaulting to 0. This means that
# the queue remains unbounded aka this setting is disabled.
# You can experiment with values greater or smaller than the anchor value 1000000.
#
# `recovery`: the strategy to follow when the queue becomes filled up. This also applies when
# the queue is unbounded, and all available memory on the system is consumed.
# recovery: 0 means continue.
# recovery: 1 means simply exit (default behavior).
# recovery: 2 means empty the queue and then continue.
queue_capacity_outputs:
items: 0
recovery: 1
# `exit` is default, `continue` does nothing special and `empty` empties the queue and then
# continues.
outputs_queue:
capacity: 0
recovery: exit


##########################
Expand Down
19 changes: 19 additions & 0 deletions userspace/engine/falco_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ static std::vector<std::string> priority_names = {
"Debug"
};

static std::vector<std::string> outputs_recovery_names = {
"continue",
"exit",
"empty",
};

bool falco_common::parse_priority(std::string v, priority_type& out)
{
for (size_t i = 0; i < priority_names.size(); i++)
Expand Down Expand Up @@ -54,6 +60,19 @@ falco_common::priority_type falco_common::parse_priority(std::string v)
return out;
}

bool falco_common::parse_recovery(std::string v, outputs_recovery_type& out)
{
for (size_t i = 0; i < outputs_recovery_names.size(); i++)
{
if (!strcasecmp(v.c_str(), outputs_recovery_names[i].c_str()))
{
out = (outputs_recovery_type) i;
return true;
}
}
return false;
}

bool falco_common::format_priority(priority_type v, std::string& out, bool shortfmt)
{
if ((size_t) v < priority_names.size())
Expand Down
10 changes: 10 additions & 0 deletions userspace/engine/falco_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ limitations under the License.
#include <mutex>
#include <sinsp.h>

#define DEFAULT_OUTPUTS_QUEUE_CAPACITY 0

//
// Most falco_* classes can throw exceptions. Unless directly related
// to low-level failures like inability to open file, etc, they will
Expand Down Expand Up @@ -52,6 +54,13 @@ struct falco_exception : std::exception

namespace falco_common
{

enum outputs_recovery_type {
RECOVERY_CONTINUE = 0, /* queue_capacity_outputs recovery strategy of continuing on. */
RECOVERY_EXIT = 1, /* queue_capacity_outputs recovery strategy of exiting, self OOM kill. */
RECOVERY_EMPTY = 2, /* queue_capacity_outputs recovery strategy of emptying queue then continuing. */
};

const std::string syscall_source = sinsp_syscall_event_source_name;

// Same as numbers/indices into the above vector
Expand All @@ -69,6 +78,7 @@ namespace falco_common

bool parse_priority(std::string v, priority_type& out);
priority_type parse_priority(std::string v);
bool parse_recovery(std::string v, outputs_recovery_type& out);
bool format_priority(priority_type v, std::string& out, bool shortfmt=false);
std::string format_priority(priority_type v, bool shortfmt=false);
};
4 changes: 2 additions & 2 deletions userspace/falco/app/actions/init_outputs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ falco::app::run_result falco::app::actions::init_outputs(falco::app::state& s)
s.config->m_json_include_tags_property,
s.config->m_output_timeout,
s.config->m_buffered_outputs,
s.config->m_queue_capacity_outputs_items,
s.config->m_queue_capacity_outputs_recovery,
s.config->m_outputs_queue_capacity,
s.config->m_outputs_queue_recovery,
s.config->m_time_format_iso_8601,
hostname));

Expand Down
14 changes: 9 additions & 5 deletions userspace/falco/configuration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ limitations under the License.
#include "falco_utils.h"

#include "configuration.h"
#include "configuration_aux.h"
#include "logger.h"
#include "banned.h" // This raises a compilation error when certain functions are used

Expand All @@ -40,8 +39,8 @@ falco_configuration::falco_configuration():
m_notifications_max_burst(1000),
m_watch_config_files(true),
m_buffered_outputs(false),
m_queue_capacity_outputs_items(DEFAULT_ITEMS_QUEUE_CAPAXITY_OUTPUTS),
m_queue_capacity_outputs_recovery(RECOVERY_EXIT),
m_outputs_queue_capacity(DEFAULT_OUTPUTS_QUEUE_CAPACITY),
m_outputs_queue_recovery(falco_common::RECOVERY_EXIT),
m_time_format_iso_8601(false),
m_output_timeout(2000),
m_grpc_enabled(false),
Expand Down Expand Up @@ -254,8 +253,13 @@ void falco_configuration::load_yaml(const std::string& config_name, const yaml_h
}

m_buffered_outputs = config.get_scalar<bool>("buffered_outputs", false);
m_queue_capacity_outputs_items = config.get_scalar<size_t>("queue_capacity_outputs.items", DEFAULT_ITEMS_QUEUE_CAPAXITY_OUTPUTS);
m_queue_capacity_outputs_recovery = config.get_scalar<uint32_t>("queue_capacity_outputs.recovery", RECOVERY_EXIT);
m_outputs_queue_capacity = config.get_scalar<size_t>("outputs_queue.capacity", DEFAULT_OUTPUTS_QUEUE_CAPACITY);
std::string recovery = config.get_scalar<std::string>("outputs_queue.recovery", "exit");
if (!falco_common::parse_recovery(recovery, m_outputs_queue_recovery))
{
throw std::logic_error("Unknown recovery \"" + recovery + "\"--must be one of exit, continue, empty");
}

m_time_format_iso_8601 = config.get_scalar<bool>("time_format_iso_8601", false);

falco_logger::log_stderr = config.get_scalar<bool>("log_stderr", false);
Expand Down
4 changes: 2 additions & 2 deletions userspace/falco/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ class falco_configuration

bool m_watch_config_files;
bool m_buffered_outputs;
size_t m_queue_capacity_outputs_items;
uint32_t m_queue_capacity_outputs_recovery;
size_t m_outputs_queue_capacity;
falco_common::outputs_recovery_type m_outputs_queue_recovery;
bool m_time_format_iso_8601;
uint32_t m_output_timeout;

Expand Down
22 changes: 0 additions & 22 deletions userspace/falco/configuration_aux.h

This file was deleted.

16 changes: 8 additions & 8 deletions userspace/falco/falco_outputs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ limitations under the License.

#include "falco_outputs.h"
#include "config_falco.h"
#include "configuration_aux.h"

#include "formats.h"
#include "logger.h"
Expand All @@ -47,8 +46,8 @@ falco_outputs::falco_outputs(
bool json_include_tags_property,
uint32_t timeout,
bool buffered,
size_t queue_capacity_outputs_items,
uint32_t queue_capacity_outputs_recovery,
size_t outputs_queue_capacity,
falco_common::outputs_recovery_type outputs_queue_recovery,
bool time_format_iso_8601,
const std::string& hostname)
{
Expand All @@ -68,12 +67,12 @@ falco_outputs::falco_outputs(
}

m_worker_thread = std::thread(&falco_outputs::worker, this);
if (queue_capacity_outputs_items > 0)
if (outputs_queue_capacity > 0)
{
m_queue.set_capacity(queue_capacity_outputs_items);
m_queue.set_capacity(outputs_queue_capacity);
}

m_recovery = queue_capacity_outputs_recovery;
m_recovery = outputs_queue_recovery;
}

falco_outputs::~falco_outputs()
Expand Down Expand Up @@ -278,12 +277,13 @@ inline void falco_outputs::push(const ctrl_msg& cmsg)
{
switch (m_recovery)
{
case RECOVERY_EXIT:
case falco_common::RECOVERY_EXIT:
fprintf(stderr, "Fatal error: Output queue out of memory. Exiting ... \n");
exit(EXIT_FAILURE);
case RECOVERY_EMPTY:
case falco_common::RECOVERY_EMPTY:
fprintf(stderr, "Output queue out of memory. Empty queue and continue ... \n");
m_queue.empty();
break;
default:
fprintf(stderr, "Output queue out of memory. Continue on ... \n");
break;
Expand Down
4 changes: 2 additions & 2 deletions userspace/falco/falco_outputs.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ class falco_outputs
bool json_include_tags_property,
uint32_t timeout,
bool buffered,
size_t queue_capacity_outputs_items,
uint32_t queue_capacity_outputs_recovery,
size_t outputs_queue_capacity,
falco_common::outputs_recovery_type outputs_queue_recovery,
bool time_format_iso_8601,
const std::string& hostname);

Expand Down
4 changes: 2 additions & 2 deletions userspace/falco/stats_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,9 @@ stats_writer::stats_writer(
m_config = config;
// capacity and controls should not be relevant for stats outputs, adopt capacity
// for completeness, but do not implement config recovery strategies.
if (config->m_queue_capacity_outputs_items > 0)
if (config->m_outputs_queue_capacity > 0)
{
m_queue.set_capacity(config->m_queue_capacity_outputs_items);
m_queue.set_capacity(config->m_outputs_queue_capacity);
}
if (config->m_metrics_enabled)
{
Expand Down

0 comments on commit 7924e36

Please sign in to comment.