From 0e1a1f6403af1a183fa938d8a83b81b9eb9ab536 Mon Sep 17 00:00:00 2001 From: Gianmatteo Palmieri Date: Fri, 5 Jul 2024 15:39:34 +0200 Subject: [PATCH 01/13] new(cmake): add bs_threadpool dependency Signed-off-by: Gianmatteo Palmieri --- .github/install-deps.sh | 5 +++ .github/workflows/ci.yml | 2 +- cmake/modules/Findbs_threadpool.cmake | 31 +++++++++++++++++++ cmake/modules/bs_threadpool.cmake | 44 +++++++++++++++++++++++++++ cmake/modules/libsinsp.cmake | 4 +++ userspace/libsinsp/CMakeLists.txt | 4 +++ 6 files changed, 89 insertions(+), 1 deletion(-) create mode 100644 cmake/modules/Findbs_threadpool.cmake create mode 100644 cmake/modules/bs_threadpool.cmake diff --git a/.github/install-deps.sh b/.github/install-deps.sh index 917d6d5337..8476667f6d 100755 --- a/.github/install-deps.sh +++ b/.github/install-deps.sh @@ -66,3 +66,8 @@ popd echo "=== Downloading uthash.h (1.9.8) ===" wget -P "/usr/include" "https://raw.githubusercontent.com/troydhanson/uthash/v1.9.8/src/uthash.h" + +# === BS_thread_pool === +echo "=== Downloading BS_thread_pool.h (4.1.0) ===" + +wget -P "/usr/include" "https://github.com/bshoshany/thread-pool/raw/v4.1.0/include/BS_thread_pool.hpp" \ No newline at end of file diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index d610549d4c..c2fce5cf5e 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -174,7 +174,7 @@ jobs: - name: Build ๐Ÿ—๏ธ run: | mkdir -p build - cd build && cmake -DBUILD_SHARED_LIBS=True -DUSE_BUNDLED_DEPS=False -DUSE_BUNDLED_VALIJSON=ON -DCMAKE_BUILD_TYPE=Release -DCREATE_TEST_TARGETS=OFF -DMINIMAL_BUILD=ON -DCMAKE_INSTALL_PREFIX=/tmp/libs-test .. + cd build && cmake -DBUILD_SHARED_LIBS=True -DUSE_BUNDLED_DEPS=False -DUSE_BUNDLED_VALIJSON=ON -DUSE_BUNDLED_BS_THREADPOOL=ON -DCMAKE_BUILD_TYPE=Release -DCREATE_TEST_TARGETS=OFF -DMINIMAL_BUILD=ON -DCMAKE_INSTALL_PREFIX=/tmp/libs-test .. cmake --build . --config Release --parallel $(getconf _NPROCESSORS_ONLN) - name: Install diff --git a/cmake/modules/Findbs_threadpool.cmake b/cmake/modules/Findbs_threadpool.cmake new file mode 100644 index 0000000000..6d9bfd3605 --- /dev/null +++ b/cmake/modules/Findbs_threadpool.cmake @@ -0,0 +1,31 @@ +# SPDX-License-Identifier: Apache-2.0 +# +# Copyright (C) 2023 The Falco Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +find_path(BS_THREADPOOL_INCLUDE NAMES BS_thread_pool.hpp) + +if (BS_THREADPOOL_INCLUDE) + if (NOT bs_threadpool_FIND_QUIETLY) + message(STATUS "Found bs_threadpool: include: ${BS_THREADPOOL_INCLUDE}.") + endif() +else() + if (bs_threadpool_FIND_REQUIRED) + message(FATAL_ERROR "Required component bs_threadpool missing.") + endif() + if (NOT bs_threadpool_FIND_QUIETLY) + message(WARNING "bs_threadpool not found.") + endif() +endif() \ No newline at end of file diff --git a/cmake/modules/bs_threadpool.cmake b/cmake/modules/bs_threadpool.cmake new file mode 100644 index 0000000000..c493107a8e --- /dev/null +++ b/cmake/modules/bs_threadpool.cmake @@ -0,0 +1,44 @@ +# SPDX-License-Identifier: Apache-2.0 +# +# Copyright (C) 2024 The Falco Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +# specific language governing permissions and limitations under the License. +# + +# +# bshoshany/thread-pool (https://github.com/bshoshany/thread-pool) +# + +option(USE_BUNDLED_BS_THREADPOOL "Enable building of the bundled bs_threadpool" ${USE_BUNDLED_DEPS}) + +if(BS_THREADPOOL_INCLUDE) + # we already have bs_threadpool +elseif(NOT USE_BUNDLED_BS_THREADPOOL) + find_package(bs_threadpool REQUIRED) +else() + set(BS_THREADPOOL_SRC "${PROJECT_BINARY_DIR}/bs_threadpool-prefix/src/bs_threadpool") + set(BS_THREADPOOL_INCLUDE "${BS_THREADPOOL_SRC}/include") + + message(STATUS "Using bundled bs_threadpool in '${BS_THREADPOOL_SRC}'") + + ExternalProject_Add(bs_threadpool + PREFIX "${PROJECT_BINARY_DIR}/bs_threadpool-prefix" + URL "https://github.com/bshoshany/thread-pool/archive/refs/tags/v4.1.0.tar.gz" + URL_HASH "SHA256=be7abecbc420bb87919eeef729b13ff7c29d5ce547bdae284923296c695415bd" + CONFIGURE_COMMAND "" + BUILD_COMMAND "" + INSTALL_COMMAND "") +endif() + +if(NOT TARGET bs_threadpool) + add_custom_target(bs_threadpool) +endif() + +include_directories("${BS_THREADPOOL_INCLUDE}") \ No newline at end of file diff --git a/cmake/modules/libsinsp.cmake b/cmake/modules/libsinsp.cmake index 6cc7db4892..4c3e5101e1 100644 --- a/cmake/modules/libsinsp.cmake +++ b/cmake/modules/libsinsp.cmake @@ -37,6 +37,7 @@ endif() include(jsoncpp) include(valijson) include(re2) +include(bs_threadpool) set(LIBSINSP_INCLUDE_DIRS ${LIBS_DIR} ${LIBS_DIR}/userspace ${LIBSCAP_INCLUDE_DIRS} ${DRIVER_CONFIG_DIR}) @@ -54,6 +55,9 @@ list(APPEND LIBSINSP_INCLUDE_DIRS ${VALIJSON_ABSOLUTE_INCLUDE_DIR}) get_filename_component(RE2_ABSOLUTE_INCLUDE_DIR ${RE2_INCLUDE} ABSOLUTE) list(APPEND LIBSINSP_INCLUDE_DIRS ${RE2_ABSOLUTE_INCLUDE_DIR}) +get_filename_component(BS_THREADPOOL_ABSOLUTE_INCLUDE_DIR ${BS_THREADPOOL_INCLUDE} ABSOLUTE) +list(APPEND LIBSINSP_INCLUDE_DIRS ${BS_THREADPOOL_ABSOLUTE_INCLUDE_DIR}) + if(NOT MINIMAL_BUILD AND NOT EMSCRIPTEN AND NOT APPLE) get_filename_component(CARES_ABSOLUTE_INCLUDE_DIR ${CARES_INCLUDE} ABSOLUTE) list(APPEND LIBSINSP_INCLUDE_DIRS ${CARES_ABSOLUTE_INCLUDE_DIR}) diff --git a/userspace/libsinsp/CMakeLists.txt b/userspace/libsinsp/CMakeLists.txt index 5d9ebacd9b..1a65a095e6 100644 --- a/userspace/libsinsp/CMakeLists.txt +++ b/userspace/libsinsp/CMakeLists.txt @@ -199,6 +199,10 @@ if(USE_BUNDLED_JSONCPP) add_dependencies(sinsp jsoncpp) endif() +if(USE_BUNDLED_BS_THREADPOOL) + add_dependencies(sinsp bs_threadpool) +endif() + function(prepare_cri_grpc api_version) configure_file(${CMAKE_CURRENT_SOURCE_DIR}/cri-${api_version}.proto ${CMAKE_CURRENT_BINARY_DIR}/cri-${api_version}.proto COPYONLY) add_custom_command(OUTPUT ${CMAKE_CURRENT_BINARY_DIR}/cri-${api_version}.grpc.pb.cc From e7aec2f4dab042280e94dc4c886c72989b37bcf0 Mon Sep 17 00:00:00 2001 From: Gianmatteo Palmieri Date: Fri, 5 Jul 2024 16:00:14 +0200 Subject: [PATCH 02/13] new(libsinsp): notify plugins of capture open/close Signed-off-by: Gianmatteo Palmieri --- userspace/libsinsp/plugin.cpp | 34 ++++++++++++++++++++++++++++++++ userspace/libsinsp/plugin.h | 2 ++ userspace/libsinsp/sinsp.cpp | 12 +++++++++++ userspace/plugin/plugin_api.h | 25 +++++++++++++++++++++++ userspace/plugin/plugin_loader.c | 2 ++ 5 files changed, 75 insertions(+) diff --git a/userspace/libsinsp/plugin.cpp b/userspace/libsinsp/plugin.cpp index 3de7dcc657..33cf1f00e5 100755 --- a/userspace/libsinsp/plugin.cpp +++ b/userspace/libsinsp/plugin.cpp @@ -753,6 +753,40 @@ bool sinsp_plugin::set_config(const std::string& config) return m_handle->api.set_config(m_state, &input) == SS_PLUGIN_SUCCESS; } +void sinsp_plugin::capture_open() +{ + if(!m_inited) + { + throw sinsp_exception(std::string(s_not_init_err) + ": " + m_name); + } + + ss_plugin_routine_vtable routine_vtable; + routine_vtable.subscribe = &plugin_subscribe_routine; + routine_vtable.unsubscribe = &plugin_unsubscribe_routine; + + if(m_handle->api.capture_open) + { + m_handle->api.capture_open(m_state, routine_vtable); + } +} + +void sinsp_plugin::capture_close() +{ + if(!m_inited) + { + throw sinsp_exception(std::string(s_not_init_err) + ": " + m_name); + } + + ss_plugin_routine_vtable routine_vtable; + routine_vtable.subscribe = &plugin_subscribe_routine; + routine_vtable.unsubscribe = &plugin_unsubscribe_routine; + + if(m_handle->api.capture_close) + { + m_handle->api.capture_close(m_state, routine_vtable); + } +} + /** Event Source CAP **/ scap_source_plugin& sinsp_plugin::as_scap_source() diff --git a/userspace/libsinsp/plugin.h b/userspace/libsinsp/plugin.h index 4f8b546023..2f4b4a0feb 100755 --- a/userspace/libsinsp/plugin.h +++ b/userspace/libsinsp/plugin.h @@ -166,6 +166,8 @@ class sinsp_plugin std::string get_init_schema(ss_plugin_schema_type& schema_type) const; bool set_config(const std::string& config); std::vector get_metrics() const; + void capture_open(); + void capture_close(); /** Event Sourcing **/ inline uint32_t id() const diff --git a/userspace/libsinsp/sinsp.cpp b/userspace/libsinsp/sinsp.cpp index c07a54dc23..347a4f4458 100644 --- a/userspace/libsinsp/sinsp.cpp +++ b/userspace/libsinsp/sinsp.cpp @@ -392,6 +392,12 @@ void sinsp::open_common(scap_open_args* oargs, const scap_vtable* vtable, scap_p } } } + + // notify registered plugins of capture open + for (auto& p : m_plugin_manager->plugins()) + { + p->capture_open(); + } } void sinsp::mark_ppm_sc_of_interest(ppm_sc_code ppm_sc, bool enable) @@ -791,6 +797,12 @@ void sinsp::close() } } + // notify registered plugins of capture close + for (auto& p : m_plugin_manager->plugins()) + { + p->capture_close(); + } + m_mode = SINSP_MODE_NONE; } diff --git a/userspace/plugin/plugin_api.h b/userspace/plugin/plugin_api.h index d498fe57ad..c229cc2a92 100644 --- a/userspace/plugin/plugin_api.h +++ b/userspace/plugin/plugin_api.h @@ -977,6 +977,31 @@ typedef struct // 'num_metrics' must be set to the lenght of the array before returning // and it can be set to 0 if no metrics are provided. ss_plugin_metric* (*get_metrics)(ss_plugin_t* s, uint32_t* num_metrics); + + //Capture listening capability API + struct + { + // + // Called by the framework when the event capture opens. + // + // Required: no + // Arguments: + // - s: the plugin state, returned by init(). Can be NULL. + // - r: a vtable containing callback that can be used by the plugin + // for subscribing and unsubscribing routines to the framework's thread pool. + // This vtable can be retained and used for the rest of the plugin's life-cycle. + void (*capture_open)(ss_plugin_t* s, ss_plugin_routine_vtable r); + + // + // Called by the framework when the event capture closes. + // + // Required: no + // Arguments: + // - s: the plugin state, returned by init(). Can be NULL. + // - r: a vtable containing callback that can be used by the plugin + // for subscribing and unsubscribing routines to the framework's thread pool. + void (*capture_close)(ss_plugin_t* s, ss_plugin_routine_vtable r); + }; } plugin_api; #ifdef __cplusplus diff --git a/userspace/plugin/plugin_loader.c b/userspace/plugin/plugin_loader.c index 3f86ae3aff..2f22654306 100644 --- a/userspace/plugin/plugin_loader.c +++ b/userspace/plugin/plugin_loader.c @@ -137,6 +137,8 @@ plugin_handle_t* plugin_load(const char* path, char* err) SYM_RESOLVE(ret, set_async_event_handler); SYM_RESOLVE(ret, set_config); SYM_RESOLVE(ret, get_metrics); + SYM_RESOLVE(ret, capture_open); + SYM_RESOLVE(ret, capture_close); return ret; } From bbcdb4ef05d00c9a7154e06987a862e3f2887c96 Mon Sep 17 00:00:00 2001 From: Gianmatteo Palmieri Date: Fri, 5 Jul 2024 16:11:59 +0200 Subject: [PATCH 03/13] new(libsinsp) add inspector thread pool Signed-off-by: Gianmatteo Palmieri --- userspace/libsinsp/plugin.cpp | 6 ++++-- userspace/libsinsp/plugin.h | 12 ++++++++++-- userspace/libsinsp/sinsp.cpp | 9 +++++++-- userspace/libsinsp/sinsp.h | 4 ++++ userspace/libsinsp/test/plugin_manager.ut.cpp | 2 +- 5 files changed, 26 insertions(+), 7 deletions(-) diff --git a/userspace/libsinsp/plugin.cpp b/userspace/libsinsp/plugin.cpp index 33cf1f00e5..786c03d8fa 100755 --- a/userspace/libsinsp/plugin.cpp +++ b/userspace/libsinsp/plugin.cpp @@ -91,6 +91,7 @@ static void plugin_log_fn(ss_plugin_owner_t* o, const char* component, const cha std::shared_ptr sinsp_plugin::create( const plugin_api* api, const std::shared_ptr& treg, + const std::shared_ptr& tpool, std::string& errstr) { char loadererr[PLUGIN_MAX_ERRLEN]; @@ -101,7 +102,7 @@ std::shared_ptr sinsp_plugin::create( return nullptr; } - auto plugin = std::make_shared(handle, treg); + auto plugin = std::make_shared(handle, treg, tpool); if (!plugin->resolve_dylib_symbols(errstr)) { // plugin and handle get deleted here by shared_ptr @@ -114,6 +115,7 @@ std::shared_ptr sinsp_plugin::create( std::shared_ptr sinsp_plugin::create( const std::string &filepath, const std::shared_ptr& treg, + const std::shared_ptr& tpool, std::string& errstr) { char loadererr[PLUGIN_MAX_ERRLEN]; @@ -124,7 +126,7 @@ std::shared_ptr sinsp_plugin::create( return nullptr; } - auto plugin = std::make_shared(handle, treg); + auto plugin = std::make_shared(handle, treg, tpool); if (!plugin->resolve_dylib_symbols(errstr)) { // plugin and handle get deleted here by shared_ptr diff --git a/userspace/libsinsp/plugin.h b/userspace/libsinsp/plugin.h index 2f4b4a0feb..3bdd405c14 100755 --- a/userspace/libsinsp/plugin.h +++ b/userspace/libsinsp/plugin.h @@ -29,6 +29,7 @@ limitations under the License. #include #include #include +#include #include /** @@ -58,6 +59,7 @@ class sinsp_plugin static std::shared_ptr create( const std::string& path, const std::shared_ptr& treg, + const std::shared_ptr& tpool, std::string& errstr); /** @@ -67,6 +69,7 @@ class sinsp_plugin static std::shared_ptr create( const plugin_api* api, const std::shared_ptr& treg, + const std::shared_ptr& tpool, std::string& errstr); /** @@ -93,7 +96,9 @@ class sinsp_plugin } sinsp_plugin(plugin_handle_t* - handle, const std::shared_ptr& treg): + handle, + const std::shared_ptr& treg, + const std::shared_ptr& tpool): m_caps(CAP_NONE), m_name(), m_description(), @@ -124,7 +129,8 @@ class sinsp_plugin m_accessed_table_fields(), m_ephemeral_tables(), m_ephemeral_tables_clear(false), - m_accessed_entries_clear(false) { } + m_accessed_entries_clear(false), + m_thread_pool(tpool) { } virtual ~sinsp_plugin(); sinsp_plugin(const sinsp_plugin& s) = delete; sinsp_plugin& operator = (const sinsp_plugin& s) = delete; @@ -449,5 +455,7 @@ class sinsp_plugin static ss_plugin_table_t *table_api_get_table(ss_plugin_owner_t *o, const char *name, ss_plugin_state_type key_type); static ss_plugin_rc table_api_add_table(ss_plugin_owner_t *o, const ss_plugin_table_input* in); + std::shared_ptr m_thread_pool; + friend struct sinsp_table_wrapper; }; diff --git a/userspace/libsinsp/sinsp.cpp b/userspace/libsinsp/sinsp.cpp index 347a4f4458..5879ddc1b7 100644 --- a/userspace/libsinsp/sinsp.cpp +++ b/userspace/libsinsp/sinsp.cpp @@ -131,6 +131,8 @@ sinsp::sinsp(bool with_metrics) : // create state tables registry m_table_registry = std::make_shared(); m_table_registry->add_table(m_thread_manager.get()); + + m_thread_pool = std::make_shared(); } sinsp::~sinsp() @@ -803,6 +805,9 @@ void sinsp::close() p->capture_close(); } + // purge pending routines and wait for the running ones + m_thread_pool->purge(); + m_mode = SINSP_MODE_NONE; } @@ -1542,7 +1547,7 @@ void sinsp::set_statsd_port(const uint16_t port) std::shared_ptr sinsp::register_plugin(const std::string& filepath) { std::string errstr; - std::shared_ptr plugin = sinsp_plugin::create(filepath, m_table_registry, errstr); + std::shared_ptr plugin = sinsp_plugin::create(filepath, m_table_registry, m_thread_pool, errstr); if (!plugin) { throw sinsp_exception("cannot load plugin " + filepath + ": " + errstr.c_str()); @@ -1567,7 +1572,7 @@ std::shared_ptr sinsp::register_plugin(const std::string& filepath std::shared_ptr sinsp::register_plugin(const plugin_api* api) { std::string errstr; - std::shared_ptr plugin = sinsp_plugin::create(api, m_table_registry, errstr); + std::shared_ptr plugin = sinsp_plugin::create(api, m_table_registry, m_thread_pool, errstr); if (!plugin) { throw sinsp_exception("cannot load plugin with custom vtable: " + errstr); diff --git a/userspace/libsinsp/sinsp.h b/userspace/libsinsp/sinsp.h index d13489c10b..86752271bd 100644 --- a/userspace/libsinsp/sinsp.h +++ b/userspace/libsinsp/sinsp.h @@ -1404,6 +1404,10 @@ class SINSP_PUBLIC sinsp : public capture_stats_source // A registry that managers the state tables of this inspector std::shared_ptr m_table_registry; + // + // A thread pool capable of running non-blocking recurring routines + std::shared_ptr m_thread_pool; + sinsp_observer* m_observer{nullptr}; bool m_inited; diff --git a/userspace/libsinsp/test/plugin_manager.ut.cpp b/userspace/libsinsp/test/plugin_manager.ut.cpp index 128c850d0f..86352783ef 100644 --- a/userspace/libsinsp/test/plugin_manager.ut.cpp +++ b/userspace/libsinsp/test/plugin_manager.ut.cpp @@ -27,7 +27,7 @@ class mock_sinsp_plugin: public sinsp_plugin plugin_caps_t caps, const std::string& name, uint32_t id, - const std::string& source): sinsp_plugin(nullptr, nullptr) + const std::string& source): sinsp_plugin(nullptr, nullptr, nullptr) { m_caps = caps; m_name = name; From 4fc66be4e364cd4aeff458d3bfc0dff7ee4f657e Mon Sep 17 00:00:00 2001 From: Gianmatteo Palmieri Date: Fri, 5 Jul 2024 16:17:08 +0200 Subject: [PATCH 04/13] new(libsinsp): add thread pool interface Signed-off-by: Gianmatteo Palmieri --- userspace/libsinsp/thread_pool.h | 61 ++++++++++++++++++++++++++++++++ 1 file changed, 61 insertions(+) create mode 100644 userspace/libsinsp/thread_pool.h diff --git a/userspace/libsinsp/thread_pool.h b/userspace/libsinsp/thread_pool.h new file mode 100644 index 0000000000..99e9831675 --- /dev/null +++ b/userspace/libsinsp/thread_pool.h @@ -0,0 +1,61 @@ +// SPDX-License-Identifier: Apache-2.0 +/* +Copyright (C) 2024 The Falco Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +*/ + +#include +#include +#include +#include +#include + +class thread_pool +{ +public: + using routine_id_t = std::function*; + + thread_pool() = default; + + virtual ~thread_pool() = default; + + /*! + * \brief Subscribes a routine to the thread pool. + * + * \param func The routine to be subscribed, represented by a function returning a bool value. + * Returning false causes the routine to be unsubscribed from the thread pool. + * + * \return An handle representing a specific routine. + * This can later be used to unsubscribe the routine. + */ + virtual routine_id_t subscribe(const std::function& func) = 0; + + /*! + * \brief Unsubscribes a routine from the thread pool. + * + * \param id A routine handle. + */ + virtual void unsubscribe(routine_id_t id) = 0; + + /*! + * \brief Unsubscribes all the subscribed routines and waits for the running ones to finish. + */ + virtual void purge() = 0; + + /*! + * \return The count of currently subscribed routines. + */ + virtual size_t routines_num() = 0; +}; \ No newline at end of file From db5a4f3d087ed0005620afc81dd42bc9db556f46 Mon Sep 17 00:00:00 2001 From: Gianmatteo Palmieri Date: Fri, 5 Jul 2024 16:21:22 +0200 Subject: [PATCH 05/13] new(libsinsp): add thread pool implementation Signed-off-by: Gianmatteo Palmieri Co-authored-by: Jason Dellaluce --- userspace/libsinsp/CMakeLists.txt | 1 + userspace/libsinsp/thread_pool.cpp | 87 ++++++++++++++++++++++++++++++ userspace/libsinsp/thread_pool.h | 31 +++++++++++ 3 files changed, 119 insertions(+) create mode 100644 userspace/libsinsp/thread_pool.cpp diff --git a/userspace/libsinsp/CMakeLists.txt b/userspace/libsinsp/CMakeLists.txt index 1a65a095e6..9159e68a34 100644 --- a/userspace/libsinsp/CMakeLists.txt +++ b/userspace/libsinsp/CMakeLists.txt @@ -100,6 +100,7 @@ add_library(sinsp prefix_search.cpp sinsp_syslog.cpp threadinfo.cpp + thread_pool.cpp tuples.cpp sinsp.cpp token_bucket.cpp diff --git a/userspace/libsinsp/thread_pool.cpp b/userspace/libsinsp/thread_pool.cpp new file mode 100644 index 0000000000..729c03b837 --- /dev/null +++ b/userspace/libsinsp/thread_pool.cpp @@ -0,0 +1,87 @@ +// SPDX-License-Identifier: Apache-2.0 +/* +Copyright (C) 2024 The Falco Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +*/ + +#include + +#include + +void bs_thread_pool::default_bs_tp_deleter::operator()(BS::thread_pool* __ptr) const +{ + std::default_delete{}(__ptr); +} + +bs_thread_pool::bs_thread_pool(size_t num_workers): m_pool(nullptr), m_routines() +{ + if (num_workers == 0) + { + m_pool = std::unique_ptr(new BS::thread_pool()); + } + else + { + m_pool = std::unique_ptr(new BS::thread_pool(num_workers)); + } +} + +bs_thread_pool::routine_id_t bs_thread_pool::subscribe(const std::function& func) +{ + m_routines.push_back(std::make_shared>(func)); + auto& new_routine = m_routines.back(); + run_routine(new_routine); + + return static_cast(new_routine.get()); +} + +void bs_thread_pool::unsubscribe(bs_thread_pool::routine_id_t id) +{ + m_routines.remove_if([id](const std::shared_ptr>& v) + { + return v.get() == static_cast*>(id); + }); +} + +void bs_thread_pool::purge() +{ + m_routines.clear(); + + m_pool->purge(); + m_pool->wait(); +} + +size_t bs_thread_pool::routines_num() +{ + return m_routines.size(); +} + +void bs_thread_pool::run_routine(std::shared_ptr> routine) +{ + m_pool->detach_task([this, routine] + { + if (routine.use_count() <= 1) + { + return; + } + + if(!((*routine) && (*routine)())) + { + m_routines.remove(routine); + return; + } + + run_routine(routine); + }); +} \ No newline at end of file diff --git a/userspace/libsinsp/thread_pool.h b/userspace/libsinsp/thread_pool.h index 99e9831675..7c1556ba4d 100644 --- a/userspace/libsinsp/thread_pool.h +++ b/userspace/libsinsp/thread_pool.h @@ -58,4 +58,35 @@ class thread_pool * \return The count of currently subscribed routines. */ virtual size_t routines_num() = 0; +}; + +namespace BS { + class thread_pool; +}; + +class bs_thread_pool : public thread_pool +{ +public: + bs_thread_pool(size_t num_workers = 0); + + virtual ~bs_thread_pool() + { + purge(); + } + + thread_pool::routine_id_t subscribe(const std::function& func); + + void unsubscribe(thread_pool::routine_id_t id); + + void purge(); + + size_t routines_num(); + +private: + struct default_bs_tp_deleter { void operator()(BS::thread_pool* __ptr) const; }; + + void run_routine(std::shared_ptr> id); + + std::unique_ptr m_pool; + std::list>> m_routines; }; \ No newline at end of file From 148653d62643b1b639ca615317ddea437a5d53a5 Mon Sep 17 00:00:00 2001 From: Gianmatteo Palmieri Date: Fri, 5 Jul 2024 16:38:40 +0200 Subject: [PATCH 06/13] new(libsinsp): add plugin api routine vtable Signed-off-by: Gianmatteo Palmieri --- userspace/libsinsp/plugin.cpp | 164 ++++++++++++++++++++-------------- userspace/libsinsp/plugin.h | 2 + userspace/plugin/plugin_api.h | 41 +++++++++ 3 files changed, 140 insertions(+), 67 deletions(-) diff --git a/userspace/libsinsp/plugin.cpp b/userspace/libsinsp/plugin.cpp index 786c03d8fa..472833717c 100755 --- a/userspace/libsinsp/plugin.cpp +++ b/userspace/libsinsp/plugin.cpp @@ -755,6 +755,103 @@ bool sinsp_plugin::set_config(const std::string& config) return m_handle->api.set_config(m_state, &input) == SS_PLUGIN_SUCCESS; } +static void set_plugin_metric_value(metrics_v2& metric, metrics_v2_value_type type, ss_plugin_metric_value val) +{ + switch (type) + { + case METRIC_VALUE_TYPE_U32: + metric.value.u32 = val.u32; + break; + case METRIC_VALUE_TYPE_S32: + metric.value.s32 = val.s32; + break; + case METRIC_VALUE_TYPE_U64: + metric.value.u64 = val.u64; + break; + case METRIC_VALUE_TYPE_S64: + metric.value.s64 = val.s64; + break; + case METRIC_VALUE_TYPE_D: + metric.value.d = val.d; + break; + case METRIC_VALUE_TYPE_F: + metric.value.f = val.f; + break; + case METRIC_VALUE_TYPE_I: + metric.value.i = val.i; + break; + default: + break; + } +} + +std::vector sinsp_plugin::get_metrics() const +{ + if(!m_inited) + { + throw sinsp_exception(std::string(s_not_init_err) + ": " + m_name); + } + + std::vector metrics; + uint32_t num_metrics = 0; + + if(!m_handle->api.get_metrics) + { + return metrics; + } + + ss_plugin_metric *plugin_metrics = m_handle->api.get_metrics(m_state, &num_metrics); + for (uint32_t i = 0; i < num_metrics; i++) + { + ss_plugin_metric *plugin_metric = plugin_metrics + i; + + metrics_v2 metric; + + //copy plugin name + snprintf(metric.name, METRIC_NAME_MAX, "%s.%s", m_name.c_str(), plugin_metric->name); + + metric.flags = METRICS_V2_PLUGINS; + metric.unit = METRIC_VALUE_UNIT_COUNT; + metric.type = static_cast(plugin_metric->value_type); + metric.metric_type = static_cast(plugin_metric->type); + set_plugin_metric_value(metric, metric.type, plugin_metric->value); + + metrics.emplace_back(metric); + } + + return metrics; +} + +thread_pool::routine_id_t sinsp_plugin::subscribe_routine(ss_plugin_routine_fn_t routine_fn, ss_plugin_routine_state_t* routine_state) +{ + auto f = [this, routine_fn, routine_state]() -> bool { + return static_cast(routine_fn(m_state, routine_state)); + }; + + return m_thread_pool->subscribe(f); +} + +void sinsp_plugin::unsubscribe_routine(thread_pool::routine_id_t routine_id) +{ + m_thread_pool->unsubscribe(routine_id); +} + +ss_plugin_routine_t* plugin_subscribe_routine(ss_plugin_owner_t* o, ss_plugin_routine_fn_t r, ss_plugin_routine_state_t* s) +{ + auto t = static_cast(o); + auto res = t->subscribe_routine(r, s); + + return static_cast(res); +} + +void plugin_unsubscribe_routine(ss_plugin_owner_t* o, ss_plugin_routine_t* r) +{ + auto t = static_cast(o); + auto id = static_cast(r); + + t->unsubscribe_routine(id); +} + void sinsp_plugin::capture_open() { if(!m_inited) @@ -923,73 +1020,6 @@ std::vector sinsp_plugin::list_open_params() const return list; } -static void set_plugin_metric_value(metrics_v2& metric, metrics_v2_value_type type, ss_plugin_metric_value val) -{ - switch (type) - { - case METRIC_VALUE_TYPE_U32: - metric.value.u32 = val.u32; - break; - case METRIC_VALUE_TYPE_S32: - metric.value.s32 = val.s32; - break; - case METRIC_VALUE_TYPE_U64: - metric.value.u64 = val.u64; - break; - case METRIC_VALUE_TYPE_S64: - metric.value.s64 = val.s64; - break; - case METRIC_VALUE_TYPE_D: - metric.value.d = val.d; - break; - case METRIC_VALUE_TYPE_F: - metric.value.f = val.f; - break; - case METRIC_VALUE_TYPE_I: - metric.value.i = val.i; - break; - default: - break; - } -} - -std::vector sinsp_plugin::get_metrics() const -{ - if(!m_inited) - { - throw sinsp_exception(std::string(s_not_init_err) + ": " + m_name); - } - - std::vector metrics; - uint32_t num_metrics = 0; - - if(!m_handle->api.get_metrics) - { - return metrics; - } - - ss_plugin_metric *plugin_metrics = m_handle->api.get_metrics(m_state, &num_metrics); - for (uint32_t i = 0; i < num_metrics; i++) - { - ss_plugin_metric *plugin_metric = plugin_metrics + i; - - metrics_v2 metric; - - //copy plugin name - snprintf(metric.name, METRIC_NAME_MAX, "%s.%s", m_name.c_str(), plugin_metric->name); - - metric.flags = METRICS_V2_PLUGINS; - metric.unit = METRIC_VALUE_UNIT_COUNT; - metric.type = static_cast(plugin_metric->value_type); - metric.metric_type = static_cast(plugin_metric->type); - set_plugin_metric_value(metric, metric.type, plugin_metric->value); - - metrics.emplace_back(metric); - } - - return metrics; -} - /** End of Event Source CAP **/ /** Field Extraction CAP **/ diff --git a/userspace/libsinsp/plugin.h b/userspace/libsinsp/plugin.h index 3bdd405c14..838151f853 100755 --- a/userspace/libsinsp/plugin.h +++ b/userspace/libsinsp/plugin.h @@ -174,6 +174,8 @@ class sinsp_plugin std::vector get_metrics() const; void capture_open(); void capture_close(); + thread_pool::routine_id_t subscribe_routine(ss_plugin_routine_fn_t routine_fn, ss_plugin_routine_state_t* routine_state); + void unsubscribe_routine(thread_pool::routine_id_t routine_id); /** Event Sourcing **/ inline uint32_t id() const diff --git a/userspace/plugin/plugin_api.h b/userspace/plugin/plugin_api.h index c229cc2a92..0046db1e2b 100644 --- a/userspace/plugin/plugin_api.h +++ b/userspace/plugin/plugin_api.h @@ -387,6 +387,46 @@ typedef struct ss_plugin_set_config_input const char* config; } ss_plugin_set_config_input; +// +// An opaque pointer representing a routine subscribed in the framework-provided thread pool +typedef void ss_plugin_routine_t; + +// +// An opaque pointer representing the state of the routine on each iteration +typedef void ss_plugin_routine_state_t; + +// +// The function executed by the routine on each iteration. +// Arguments: +// - s: the plugin state, returned by init(). Can be NULL. +// - i: the routine state, provided by the plugin when the routine is subscribed +// +// Return value: Returning false causes the routine to be unsubcribed from the thread pool. +typedef ss_plugin_bool (*ss_plugin_routine_fn_t)(ss_plugin_t* s, ss_plugin_routine_state_t* i); + +// +// Vtable used by the plugin to subscribe and unsubscribe recurring loop-like routines +// to the framework-provide thread pool +typedef struct +{ + // + // Subscribes a routine to the framework-provided thread pool. + // Arguments: + // - o: the plugin's owner + // - f: the function executed by the routine on each iteration + // - i: the routine's state + // + // Return value: A routine handle that can be used to later unsubscribe the routine. + ss_plugin_routine_t* (*subscribe)(ss_plugin_owner_t* o, ss_plugin_routine_fn_t f, ss_plugin_routine_state_t* i); + + // + // Unsubscribes a routine from the framework-provided thread pool. + // Arguments: + // - o: the plugin's owner + // - r: the routine's handle + void (*unsubscribe)(ss_plugin_owner_t* o, ss_plugin_routine_t* r); +} ss_plugin_routine_vtable; + // // Function handler used by plugin for sending asynchronous events to the // Falcosecurity libs during a live event capture. The asynchronous events @@ -970,6 +1010,7 @@ typedef struct // // Return an updated set of metrics provided by this plugin. // Required: no + // Arguments: // - s: the plugin state, returned by init(). Can be NULL. // - num_metrics: lenght of the returned metrics array. // From 2423cd458759f57cb6f6da4f8fb74ae646781c70 Mon Sep 17 00:00:00 2001 From: Gianmatteo Palmieri Date: Fri, 5 Jul 2024 16:39:26 +0200 Subject: [PATCH 07/13] chore: bump plugin api version Signed-off-by: Gianmatteo Palmieri --- userspace/plugin/plugin_api.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/userspace/plugin/plugin_api.h b/userspace/plugin/plugin_api.h index 0046db1e2b..38d72cbd31 100644 --- a/userspace/plugin/plugin_api.h +++ b/userspace/plugin/plugin_api.h @@ -30,7 +30,7 @@ extern "C" { // // todo(jasondellaluce): when/if major changes to v4, check and solve all todos #define PLUGIN_API_VERSION_MAJOR 3 -#define PLUGIN_API_VERSION_MINOR 6 +#define PLUGIN_API_VERSION_MINOR 7 #define PLUGIN_API_VERSION_PATCH 0 // From 19fd117ca4dfffcaccd33aa0f048710ad40a83cf Mon Sep 17 00:00:00 2001 From: Gianmatteo Palmieri Date: Fri, 5 Jul 2024 16:41:52 +0200 Subject: [PATCH 08/13] new(test): add thread pool unit test Signed-off-by: Gianmatteo Palmieri --- userspace/libsinsp/test/CMakeLists.txt | 1 + userspace/libsinsp/test/thread_pool.ut.cpp | 73 ++++++++++++++++++++++ 2 files changed, 74 insertions(+) create mode 100644 userspace/libsinsp/test/thread_pool.ut.cpp diff --git a/userspace/libsinsp/test/CMakeLists.txt b/userspace/libsinsp/test/CMakeLists.txt index 3284c6e651..132165c938 100644 --- a/userspace/libsinsp/test/CMakeLists.txt +++ b/userspace/libsinsp/test/CMakeLists.txt @@ -120,6 +120,7 @@ set(LIBSINSP_UNIT_TESTS_SOURCES eventformatter.ut.cpp sinsp_metrics.ut.cpp thread_table.ut.cpp + thread_pool.ut.cpp ifinfo.ut.cpp public_sinsp_API/event_related.cpp public_sinsp_API/sinsp_logger.cpp diff --git a/userspace/libsinsp/test/thread_pool.ut.cpp b/userspace/libsinsp/test/thread_pool.ut.cpp new file mode 100644 index 0000000000..77a7fac933 --- /dev/null +++ b/userspace/libsinsp/test/thread_pool.ut.cpp @@ -0,0 +1,73 @@ +// SPDX-License-Identifier: Apache-2.0 +/* +Copyright (C) 2024 The Falco Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +*/ + +#include +#include + +TEST_F(sinsp_with_test_input, thread_pool) +{ + open_inspector(); + + auto& tp = m_inspector.m_thread_pool; + + ASSERT_NE(tp, nullptr); + ASSERT_EQ(tp->routines_num(), 0); + + // subscribe a routine that keeps running until unsubscribed + auto r = tp->subscribe([] + { + return true; + }); + + // check if the routine has been subscribed + ASSERT_NE(r, nullptr); + ASSERT_EQ(tp->routines_num(), 1); + + // check if the routine has been unsubscribed + tp->unsubscribe(r); + ASSERT_EQ(tp->routines_num(), 0); + + // subscribe a routine that keeps running until a condition is met (returns false) + int count = 0; + r = tp->subscribe([&count] + { + if(count >= 1000) + { + return false; + } + count++; + usleep(500); + return true; + }); + ASSERT_EQ(tp->routines_num(), 1); + + // the routine above keeps increasing a counter every 500 microsenconds, until the counter reaches 1000 + // we give the routine enough time to finish, then we check if it has been unsubscribed + sleep(1); + ASSERT_EQ(count, 1000); + ASSERT_EQ(tp->routines_num(), 0); + + // all the remaining routines should be unsubscribed when the inspector is closed + r = tp->subscribe([] + { + return true; + }); + ASSERT_EQ(tp->routines_num(), 1); + m_inspector.close(); + ASSERT_EQ(tp->routines_num(), 0); +} \ No newline at end of file From 9b23323ded5bb622713bc026789146a30a8cf834 Mon Sep 17 00:00:00 2001 From: Gianmatteo Palmieri Date: Fri, 5 Jul 2024 16:43:57 +0200 Subject: [PATCH 09/13] new(test): add plugin routines test Signed-off-by: Gianmatteo Palmieri --- userspace/libsinsp/test/plugins.ut.cpp | 38 ++++ userspace/libsinsp/test/plugins/routines.cpp | 162 ++++++++++++++++++ .../libsinsp/test/plugins/test_plugins.h | 1 + userspace/libsinsp/test/thread_pool.ut.cpp | 9 +- 4 files changed, 205 insertions(+), 5 deletions(-) create mode 100644 userspace/libsinsp/test/plugins/routines.cpp diff --git a/userspace/libsinsp/test/plugins.ut.cpp b/userspace/libsinsp/test/plugins.ut.cpp index dd7258dc27..5ee62ba61d 100644 --- a/userspace/libsinsp/test/plugins.ut.cpp +++ b/userspace/libsinsp/test/plugins.ut.cpp @@ -929,3 +929,41 @@ TEST_F(sinsp_with_test_input, plugin_metrics) } #endif + +TEST_F(sinsp_with_test_input, plugin_routines) +{ + auto p = register_plugin(&m_inspector, get_plugin_api_sample_routines); + open_inspector(); + + // step #0: the plugins subscribes a routine on capture open + auto routines_num = m_inspector.m_thread_pool->routines_num(); + ASSERT_EQ(routines_num, 1); + + // step #1: the plugin subscribes another routine + add_event_advance_ts(increasing_ts(), 0, PPME_SYSCALL_OPEN_E, 3, "/tmp/the_file", PPM_O_RDWR, 0); + routines_num = m_inspector.m_thread_pool->routines_num(); + ASSERT_EQ(routines_num, 2); + + // step #2: the plugin unsubscribes the previous routine + add_event_advance_ts(increasing_ts(), 0, PPME_SYSCALL_OPEN_E, 3, "/tmp/the_file", PPM_O_RDWR, 0); + routines_num = m_inspector.m_thread_pool->routines_num(); + ASSERT_EQ(routines_num, 1); + + // step #3: the plugin subscribes another routine + add_event_advance_ts(increasing_ts(), 0, PPME_SYSCALL_OPEN_E, 3, "/tmp/the_file", PPM_O_RDWR, 0); + routines_num = m_inspector.m_thread_pool->routines_num(); + ASSERT_EQ(routines_num, 2); + + // step #4: the plugin sets a flag that causes the previous routine to be unsubscibed + add_event_advance_ts(increasing_ts(), 0, PPME_SYSCALL_OPEN_E, 3, "/tmp/the_file", PPM_O_RDWR, 0); + std::this_thread::sleep_for(std::chrono::nanoseconds(1000)); //wait for a bit to let routine finish + routines_num = m_inspector.m_thread_pool->routines_num(); + ASSERT_EQ(routines_num, 1); + + // step: #5: the plugin doesn't unsubscribe the last routine, but the thread pool shuould unsubscribe it on capture close + m_inspector.close(); + std::this_thread::sleep_for(std::chrono::nanoseconds(100));; //wait for a bit to let routine finish + routines_num = m_inspector.m_thread_pool->routines_num(); + ASSERT_EQ(routines_num, 0); + +} diff --git a/userspace/libsinsp/test/plugins/routines.cpp b/userspace/libsinsp/test/plugins/routines.cpp new file mode 100644 index 0000000000..0b1bdb928c --- /dev/null +++ b/userspace/libsinsp/test/plugins/routines.cpp @@ -0,0 +1,162 @@ +// SPDX-License-Identifier: Apache-2.0 +/* +Copyright (C) 2024 The Falco Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +#include +#include "sample_table.h" +#include "test_plugins.h" + +struct plugin_state +{ + std::string lasterr; + ss_plugin_owner_t *owner; + ss_plugin_routine_vtable routine_vtable; + + uint8_t step = 1; + bool flag = true; + ss_plugin_routine_t *routine; +}; + +static const char* plugin_get_required_api_version() +{ + return PLUGIN_API_VERSION_STR; +} + +static const char* plugin_get_version() +{ + return "0.1.0"; +} + +static const char* plugin_get_name() +{ + return "sample_routines"; +} + +static const char* plugin_get_description() +{ + return "some desc"; +} + +static const char* plugin_get_contact() +{ + return "some contact"; +} + +static const char* plugin_get_parse_event_sources() +{ + return "[\"syscall\"]"; +} + +static uint16_t* plugin_get_parse_event_types(uint32_t* num_types, ss_plugin_t* s) +{ + static uint16_t types[] = { + PPME_SYSCALL_OPEN_E, + }; + *num_types = sizeof(types) / sizeof(uint16_t); + return &types[0]; +} + +static ss_plugin_t* plugin_init(const ss_plugin_init_input* in, ss_plugin_rc* rc) +{ + *rc = SS_PLUGIN_SUCCESS; + plugin_state *ret = new plugin_state(); + + ret->owner = in->owner; + + return ret; +} + +static void plugin_destroy(ss_plugin_t* s) +{ + delete ((plugin_state *) s); +} + +static const char* plugin_get_last_error(ss_plugin_t* s) +{ + return ((plugin_state *) s)->lasterr.c_str(); +} + +static ss_plugin_bool test_routine(ss_plugin_t *s, ss_plugin_routine_state_t *i) +{ + bool flag = *(bool*)i; + + //this routine keeps running while flag is true + return flag; +} + +static ss_plugin_bool do_nothing(ss_plugin_t *s, ss_plugin_routine_state_t *i) +{ + //this routine always keeps running + return true; +} + +static ss_plugin_rc plugin_parse_event(ss_plugin_t *s, const ss_plugin_event_input *ev, const ss_plugin_event_parse_input* in) +{ + plugin_state *ps = (plugin_state *) s; + + switch (ps->step) + { + case 1: + ps->routine = ps->routine_vtable.subscribe(ps->owner, do_nothing, (ss_plugin_routine_state_t*)&ps->flag); + break; + case 2: + ps->routine_vtable.unsubscribe(ps->owner, ps->routine); + break; + case 3: + ps->routine = ps->routine_vtable.subscribe(ps->owner, test_routine, (ss_plugin_routine_state_t*)&ps->flag); + break; + case 4: + ps->flag = false; + break; + default: + break; + } + + ps->step++; + + return SS_PLUGIN_SUCCESS; +} + +static void plugin_capture_open(ss_plugin_t* s, ss_plugin_routine_vtable r) +{ + plugin_state *ps = (plugin_state *) s; + ps->routine_vtable = r; + + ps->routine_vtable.subscribe(ps->owner, do_nothing, (ss_plugin_routine_state_t*)&ps->flag); +} + +static void plugin_capture_close(ss_plugin_t* s, ss_plugin_routine_vtable r) +{ + +} + +void get_plugin_api_sample_routines(plugin_api& out) +{ + memset(&out, 0, sizeof(plugin_api)); + out.get_required_api_version = plugin_get_required_api_version; + out.get_version = plugin_get_version; + out.get_description = plugin_get_description; + out.get_contact = plugin_get_contact; + out.get_name = plugin_get_name; + out.get_last_error = plugin_get_last_error; + out.init = plugin_init; + out.destroy = plugin_destroy; + out.get_parse_event_sources = plugin_get_parse_event_sources; + out.get_parse_event_types = plugin_get_parse_event_types; + out.parse_event = plugin_parse_event; + out.capture_open = plugin_capture_open; + out.capture_close = plugin_capture_close; +} \ No newline at end of file diff --git a/userspace/libsinsp/test/plugins/test_plugins.h b/userspace/libsinsp/test/plugins/test_plugins.h index 6d5c1ab7b0..76badedc95 100644 --- a/userspace/libsinsp/test/plugins/test_plugins.h +++ b/userspace/libsinsp/test/plugins/test_plugins.h @@ -29,3 +29,4 @@ void get_plugin_api_sample_syscall_tables(plugin_api& out); void get_plugin_api_sample_syscall_subtables(plugin_api& out); void get_plugin_api_sample_syscall_subtables_array(plugin_api& out); void get_plugin_api_sample_metrics(plugin_api& out); +void get_plugin_api_sample_routines(plugin_api& out); diff --git a/userspace/libsinsp/test/thread_pool.ut.cpp b/userspace/libsinsp/test/thread_pool.ut.cpp index 77a7fac933..0133012a73 100644 --- a/userspace/libsinsp/test/thread_pool.ut.cpp +++ b/userspace/libsinsp/test/thread_pool.ut.cpp @@ -46,20 +46,19 @@ TEST_F(sinsp_with_test_input, thread_pool) int count = 0; r = tp->subscribe([&count] { - if(count >= 1000) + if(count >= 1024) { return false; } count++; - usleep(500); return true; }); ASSERT_EQ(tp->routines_num(), 1); - // the routine above keeps increasing a counter every 500 microsenconds, until the counter reaches 1000 + // the routine above keeps increasing a counter, until the counter reaches 1024 // we give the routine enough time to finish, then we check if it has been unsubscribed - sleep(1); - ASSERT_EQ(count, 1000); + std::this_thread::sleep_for(std::chrono::seconds(1)); + ASSERT_EQ(count, 1024); ASSERT_EQ(tp->routines_num(), 0); // all the remaining routines should be unsubscribed when the inspector is closed From 4b087310076e78cfb43e430533ddcb46053259fd Mon Sep 17 00:00:00 2001 From: Gianmatteo Palmieri Date: Fri, 12 Jul 2024 11:05:13 +0200 Subject: [PATCH 10/13] fix(test): mock plugin not initialized Signed-off-by: Gianmatteo Palmieri --- userspace/libsinsp/test/events_plugin.ut.cpp | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/userspace/libsinsp/test/events_plugin.ut.cpp b/userspace/libsinsp/test/events_plugin.ut.cpp index 06eaa36efa..32b6939fdf 100644 --- a/userspace/libsinsp/test/events_plugin.ut.cpp +++ b/userspace/libsinsp/test/events_plugin.ut.cpp @@ -105,6 +105,20 @@ static void set_mock_plugin_api(plugin_api& api) api.next_batch = mock_plugin_next_batch; } +static std::shared_ptr register_plugin_api( + sinsp* i, + plugin_api& api, + const std::string& initcfg = "") +{ + std::string err; + auto pl = i->register_plugin(&api); + if (!pl->init(initcfg, err)) + { + throw sinsp_exception(err); + } + return pl; +} + TEST_F(sinsp_with_test_input, event_sources) { sinsp_evt* evt = NULL; @@ -119,7 +133,7 @@ TEST_F(sinsp_with_test_input, event_sources) // create and register a mock plugin plugin_api mock_api; set_mock_plugin_api(mock_api); - m_inspector.register_plugin(&mock_api); + ASSERT_NO_THROW(register_plugin_api(&m_inspector, mock_api)); // regular events have the "syscall" event source evt = add_event_advance_ts(increasing_ts(), 1, PPME_SYSCALL_OPEN_X, 6, (uint64_t)3, "/tmp/the_file", PPM_O_RDWR, 0, 5, (uint64_t)123); From 13ad718fc96e58270c9bc550d42387c52cc343fb Mon Sep 17 00:00:00 2001 From: Gianmatteo Palmieri Date: Mon, 15 Jul 2024 11:14:49 +0200 Subject: [PATCH 11/13] new(libsinsp): capture listening capability Signed-off-by: Gianmatteo Palmieri --- userspace/libsinsp/plugin.cpp | 32 ++++++++++++++++++-- userspace/libsinsp/test/plugins.ut.cpp | 1 - userspace/libsinsp/test/plugins/routines.cpp | 6 ++-- userspace/plugin/plugin_api.h | 32 +++++++++++++++----- userspace/plugin/plugin_loader.c | 10 ++++++ userspace/plugin/plugin_loader.h | 13 ++++---- 6 files changed, 74 insertions(+), 20 deletions(-) diff --git a/userspace/libsinsp/plugin.cpp b/userspace/libsinsp/plugin.cpp index 472833717c..17e0881db1 100755 --- a/userspace/libsinsp/plugin.cpp +++ b/userspace/libsinsp/plugin.cpp @@ -863,9 +863,23 @@ void sinsp_plugin::capture_open() routine_vtable.subscribe = &plugin_subscribe_routine; routine_vtable.unsubscribe = &plugin_unsubscribe_routine; + ss_plugin_capture_listen_input in; + ss_plugin_table_reader_vtable_ext table_reader_ext; + ss_plugin_table_writer_vtable_ext table_writer_ext; + ss_plugin_table_reader_vtable table_reader; + ss_plugin_table_writer_vtable table_writer; + + in.owner = (ss_plugin_owner_t *) this; + in.table_reader_ext = &table_reader_ext; + in.table_writer_ext = &table_writer_ext; + in.routine = routine_vtable; + + sinsp_plugin::table_read_api(table_reader, table_reader_ext); + sinsp_plugin::table_write_api(table_writer, table_writer_ext); + if(m_handle->api.capture_open) { - m_handle->api.capture_open(m_state, routine_vtable); + m_handle->api.capture_open(m_state, &in); } } @@ -880,9 +894,23 @@ void sinsp_plugin::capture_close() routine_vtable.subscribe = &plugin_subscribe_routine; routine_vtable.unsubscribe = &plugin_unsubscribe_routine; + ss_plugin_capture_listen_input in; + ss_plugin_table_reader_vtable_ext table_reader_ext; + ss_plugin_table_writer_vtable_ext table_writer_ext; + ss_plugin_table_reader_vtable table_reader; + ss_plugin_table_writer_vtable table_writer; + + in.owner = (ss_plugin_owner_t *) this; + in.table_reader_ext = &table_reader_ext; + in.table_writer_ext = &table_writer_ext; + in.routine = routine_vtable; + + sinsp_plugin::table_read_api(table_reader, table_reader_ext); + sinsp_plugin::table_write_api(table_writer, table_writer_ext); + if(m_handle->api.capture_close) { - m_handle->api.capture_close(m_state, routine_vtable); + m_handle->api.capture_close(m_state, &in); } } diff --git a/userspace/libsinsp/test/plugins.ut.cpp b/userspace/libsinsp/test/plugins.ut.cpp index 5ee62ba61d..e84b20f853 100644 --- a/userspace/libsinsp/test/plugins.ut.cpp +++ b/userspace/libsinsp/test/plugins.ut.cpp @@ -965,5 +965,4 @@ TEST_F(sinsp_with_test_input, plugin_routines) std::this_thread::sleep_for(std::chrono::nanoseconds(100));; //wait for a bit to let routine finish routines_num = m_inspector.m_thread_pool->routines_num(); ASSERT_EQ(routines_num, 0); - } diff --git a/userspace/libsinsp/test/plugins/routines.cpp b/userspace/libsinsp/test/plugins/routines.cpp index 0b1bdb928c..bb549c43b8 100644 --- a/userspace/libsinsp/test/plugins/routines.cpp +++ b/userspace/libsinsp/test/plugins/routines.cpp @@ -130,15 +130,15 @@ static ss_plugin_rc plugin_parse_event(ss_plugin_t *s, const ss_plugin_event_inp return SS_PLUGIN_SUCCESS; } -static void plugin_capture_open(ss_plugin_t* s, ss_plugin_routine_vtable r) +static void plugin_capture_open(ss_plugin_t* s, const ss_plugin_capture_listen_input* i) { plugin_state *ps = (plugin_state *) s; - ps->routine_vtable = r; + ps->routine_vtable = i->routine; ps->routine_vtable.subscribe(ps->owner, do_nothing, (ss_plugin_routine_state_t*)&ps->flag); } -static void plugin_capture_close(ss_plugin_t* s, ss_plugin_routine_vtable r) +static void plugin_capture_close(ss_plugin_t* s, const ss_plugin_capture_listen_input* i) { } diff --git a/userspace/plugin/plugin_api.h b/userspace/plugin/plugin_api.h index 38d72cbd31..b0b6a476fb 100644 --- a/userspace/plugin/plugin_api.h +++ b/userspace/plugin/plugin_api.h @@ -427,6 +427,25 @@ typedef struct void (*unsubscribe)(ss_plugin_owner_t* o, ss_plugin_routine_t* r); } ss_plugin_routine_vtable; +// Input passed to the plugin when the framework start and stops the capture. +typedef struct ss_plugin_capture_listen_input +{ + // + // The plugin's owner. Can be passed by the plugin to the callbacks available + // in this struct in order to invoke functions of its owner. + ss_plugin_owner_t* owner; + // + // Vtable containing callbacks that can be used by the plugin + // for subscribing and unsubscribing routines to the framework's thread pool. + ss_plugin_routine_vtable routine; + // + // Vtable for controlling a state table for read operations. + ss_plugin_table_reader_vtable_ext* table_reader_ext; + // + // Vtable for controlling a state table for write operations. + ss_plugin_table_writer_vtable_ext* table_writer_ext; +} ss_plugin_capture_listen_input; + // // Function handler used by plugin for sending asynchronous events to the // Falcosecurity libs during a live event capture. The asynchronous events @@ -1028,20 +1047,17 @@ typedef struct // Required: no // Arguments: // - s: the plugin state, returned by init(). Can be NULL. - // - r: a vtable containing callback that can be used by the plugin - // for subscribing and unsubscribing routines to the framework's thread pool. - // This vtable can be retained and used for the rest of the plugin's life-cycle. - void (*capture_open)(ss_plugin_t* s, ss_plugin_routine_vtable r); + // - i: input containing vtables for performing table operations and subscribe/unsubscribe async routines + void (*capture_open)(ss_plugin_t* s, const ss_plugin_capture_listen_input* i); // // Called by the framework when the event capture closes. // - // Required: no + // Required: yes if capture_open is defined // Arguments: // - s: the plugin state, returned by init(). Can be NULL. - // - r: a vtable containing callback that can be used by the plugin - // for subscribing and unsubscribing routines to the framework's thread pool. - void (*capture_close)(ss_plugin_t* s, ss_plugin_routine_vtable r); + // - i: input containing vtables for performing table operations and subscribe/unsubscribe async routines + void (*capture_close)(ss_plugin_t* s, const ss_plugin_capture_listen_input* i); }; } plugin_api; diff --git a/userspace/plugin/plugin_loader.c b/userspace/plugin/plugin_loader.c index 2f22654306..8361f121bb 100644 --- a/userspace/plugin/plugin_loader.c +++ b/userspace/plugin/plugin_loader.c @@ -296,6 +296,16 @@ plugin_caps_t plugin_get_capabilities(const plugin_handle_t* h, char* err) err_append(err, "must implement both 'plugin_get_async_events' and 'plugin_set_async_event_handler' (async events)", ", "); } + if (h->api.capture_open != NULL && h->api.capture_close != NULL) + { + caps = (plugin_caps_t)((uint32_t) caps | (uint32_t) CAP_CAPTURE_LISTENING); + } + else if (h->api.capture_open != NULL) + { + caps = (plugin_caps_t)((uint32_t) caps | (uint32_t) CAP_BROKEN); + err_append(err, "must implement both 'plugin_capture_open' and 'plugin_capture_close' (capture listening)", ", "); + } + return caps; } diff --git a/userspace/plugin/plugin_loader.h b/userspace/plugin/plugin_loader.h index 01a45bfe5c..f33b553bc1 100644 --- a/userspace/plugin/plugin_loader.h +++ b/userspace/plugin/plugin_loader.h @@ -39,12 +39,13 @@ extern "C" { */ typedef enum { - CAP_NONE = 0, - CAP_SOURCING = 1 << 0, - CAP_EXTRACTION = 1 << 1, - CAP_PARSING = 1 << 2, - CAP_ASYNC = 1 << 3, - CAP_BROKEN = 1 << 31, // used to report inconsistencies + CAP_NONE = 0, + CAP_SOURCING = 1 << 0, + CAP_EXTRACTION = 1 << 1, + CAP_PARSING = 1 << 2, + CAP_ASYNC = 1 << 3, + CAP_CAPTURE_LISTENING = 1 << 4, + CAP_BROKEN = 1 << 31, // used to report inconsistencies } plugin_caps_t; /*! From f33a73fd92e5984dafb3d9249c32aa2efe1f3628 Mon Sep 17 00:00:00 2001 From: Gianmatteo Palmieri Date: Fri, 19 Jul 2024 15:56:36 +0200 Subject: [PATCH 12/13] fix(libsinsp): disable thread pool on webassembly Signed-off-by: Gianmatteo Palmieri --- userspace/libsinsp/plugin.cpp | 10 ++++++++++ userspace/libsinsp/sinsp.cpp | 7 ++++++- userspace/libsinsp/test/plugins.ut.cpp | 6 ++++++ userspace/libsinsp/test/thread_pool.ut.cpp | 4 +++- userspace/plugin/plugin_api.h | 2 +- 5 files changed, 26 insertions(+), 3 deletions(-) diff --git a/userspace/libsinsp/plugin.cpp b/userspace/libsinsp/plugin.cpp index 17e0881db1..7eaf4d1faf 100755 --- a/userspace/libsinsp/plugin.cpp +++ b/userspace/libsinsp/plugin.cpp @@ -824,6 +824,11 @@ std::vector sinsp_plugin::get_metrics() const thread_pool::routine_id_t sinsp_plugin::subscribe_routine(ss_plugin_routine_fn_t routine_fn, ss_plugin_routine_state_t* routine_state) { + if(!m_thread_pool) + { + return static_cast(nullptr); + } + auto f = [this, routine_fn, routine_state]() -> bool { return static_cast(routine_fn(m_state, routine_state)); }; @@ -833,6 +838,11 @@ thread_pool::routine_id_t sinsp_plugin::subscribe_routine(ss_plugin_routine_fn_t void sinsp_plugin::unsubscribe_routine(thread_pool::routine_id_t routine_id) { + if(!m_thread_pool || !routine_id) + { + return; + } + m_thread_pool->unsubscribe(routine_id); } diff --git a/userspace/libsinsp/sinsp.cpp b/userspace/libsinsp/sinsp.cpp index 5879ddc1b7..e119da5b4b 100644 --- a/userspace/libsinsp/sinsp.cpp +++ b/userspace/libsinsp/sinsp.cpp @@ -132,7 +132,9 @@ sinsp::sinsp(bool with_metrics) : m_table_registry = std::make_shared(); m_table_registry->add_table(m_thread_manager.get()); +#if !defined(__EMSCRIPTEN__) m_thread_pool = std::make_shared(); +#endif } sinsp::~sinsp() @@ -806,7 +808,10 @@ void sinsp::close() } // purge pending routines and wait for the running ones - m_thread_pool->purge(); + if(m_thread_pool) + { + m_thread_pool->purge(); + } m_mode = SINSP_MODE_NONE; } diff --git a/userspace/libsinsp/test/plugins.ut.cpp b/userspace/libsinsp/test/plugins.ut.cpp index e84b20f853..01143a9458 100644 --- a/userspace/libsinsp/test/plugins.ut.cpp +++ b/userspace/libsinsp/test/plugins.ut.cpp @@ -930,11 +930,15 @@ TEST_F(sinsp_with_test_input, plugin_metrics) #endif +#if !defined(__EMSCRIPTEN__) + TEST_F(sinsp_with_test_input, plugin_routines) { auto p = register_plugin(&m_inspector, get_plugin_api_sample_routines); open_inspector(); + ASSERT_NE(m_inspector.m_thread_pool, nullptr); + // step #0: the plugins subscribes a routine on capture open auto routines_num = m_inspector.m_thread_pool->routines_num(); ASSERT_EQ(routines_num, 1); @@ -966,3 +970,5 @@ TEST_F(sinsp_with_test_input, plugin_routines) routines_num = m_inspector.m_thread_pool->routines_num(); ASSERT_EQ(routines_num, 0); } + +#endif diff --git a/userspace/libsinsp/test/thread_pool.ut.cpp b/userspace/libsinsp/test/thread_pool.ut.cpp index 0133012a73..7373943c6b 100644 --- a/userspace/libsinsp/test/thread_pool.ut.cpp +++ b/userspace/libsinsp/test/thread_pool.ut.cpp @@ -19,6 +19,7 @@ limitations under the License. #include #include +#if !defined(__EMSCRIPTEN__) TEST_F(sinsp_with_test_input, thread_pool) { open_inspector(); @@ -69,4 +70,5 @@ TEST_F(sinsp_with_test_input, thread_pool) ASSERT_EQ(tp->routines_num(), 1); m_inspector.close(); ASSERT_EQ(tp->routines_num(), 0); -} \ No newline at end of file +} +#endif \ No newline at end of file diff --git a/userspace/plugin/plugin_api.h b/userspace/plugin/plugin_api.h index b0b6a476fb..14a373e78b 100644 --- a/userspace/plugin/plugin_api.h +++ b/userspace/plugin/plugin_api.h @@ -416,7 +416,7 @@ typedef struct // - f: the function executed by the routine on each iteration // - i: the routine's state // - // Return value: A routine handle that can be used to later unsubscribe the routine. + // Return value: A routine handle that can be used to later unsubscribe the routine. Returns null in case of failure. ss_plugin_routine_t* (*subscribe)(ss_plugin_owner_t* o, ss_plugin_routine_fn_t f, ss_plugin_routine_state_t* i); // From ae81bb6135808cf892c5e806a2e504764288332b Mon Sep 17 00:00:00 2001 From: Gianmatteo Palmieri Date: Thu, 29 Aug 2024 09:34:22 +0200 Subject: [PATCH 13/13] fix(libsinsp): address reviewer thread pool suggestions Signed-off-by: Gianmatteo Palmieri Co-authored-by: Jason Dellaluce --- .github/workflows/ci.yml | 10 ++-- cmake/modules/CompilerFlags.cmake | 4 ++ cmake/modules/libsinsp.cmake | 12 +++-- userspace/libsinsp/CMakeLists.txt | 11 +++- userspace/libsinsp/plugin.cpp | 44 ++++++++-------- userspace/libsinsp/plugin.h | 13 +++-- userspace/libsinsp/sinsp.cpp | 23 ++++++++- userspace/libsinsp/sinsp.h | 9 ++-- userspace/libsinsp/test/plugins.ut.cpp | 20 ++++---- userspace/libsinsp/test/plugins/routines.cpp | 16 ++++-- userspace/libsinsp/test/thread_pool.ut.cpp | 26 +++++++--- userspace/libsinsp/thread_pool.h | 35 +------------ .../{thread_pool.cpp => thread_pool_bs.cpp} | 31 ++++++++---- userspace/libsinsp/thread_pool_bs.h | 50 +++++++++++++++++++ userspace/plugin/plugin_api.h | 14 ++++-- 15 files changed, 208 insertions(+), 110 deletions(-) rename userspace/libsinsp/{thread_pool.cpp => thread_pool_bs.cpp} (66%) create mode 100644 userspace/libsinsp/thread_pool_bs.h diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index c2fce5cf5e..2957148a63 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -64,7 +64,7 @@ jobs: UBSAN_OPTIONS: print_stacktrace=1 run: | mkdir -p build - cd build && cmake ${{ matrix.cmake_opts }} ../ + cd build && cmake ${{ matrix.cmake_opts }} -DENABLE_THREAD_POOL=ON ../ KERNELDIR=/lib/modules/$(ls /lib/modules)/build make -j4 make run-unit-tests @@ -90,7 +90,7 @@ jobs: - name: Build and test ๐Ÿ—๏ธ๐Ÿงช run: | mkdir -p build - cd build && cmake -DBUILD_BPF=On -DBUILD_DRIVER=Off -DUSE_BUNDLED_DEPS=On -DUSE_BUNDLED_LIBELF=Off -DUSE_SHARED_LIBELF=Off -DBUILD_LIBSCAP_MODERN_BPF=ON -DMUSL_OPTIMIZED_BUILD=On ../ + cd build && cmake -DBUILD_BPF=On -DBUILD_DRIVER=Off -DUSE_BUNDLED_DEPS=On -DUSE_BUNDLED_LIBELF=Off -DUSE_SHARED_LIBELF=Off -DBUILD_LIBSCAP_MODERN_BPF=ON -DMUSL_OPTIMIZED_BUILD=On -DENABLE_THREAD_POOL=ON ../ make run-unit-tests -j4 build-shared-libs-linux-amd64: @@ -115,7 +115,7 @@ jobs: - name: Build and test ๐Ÿ—๏ธ๐Ÿงช run: | mkdir -p build - cd build && cmake -DBUILD_SHARED_LIBS=True -DUSE_BUNDLED_DEPS=False -DMINIMAL_BUILD=True -DCMAKE_INSTALL_PREFIX=/tmp/libs-test ../ + cd build && cmake -DBUILD_SHARED_LIBS=True -DUSE_BUNDLED_DEPS=False -DMINIMAL_BUILD=True -DCMAKE_INSTALL_PREFIX=/tmp/libs-test -DENABLE_THREAD_POOL=ON ../ make -j4 make run-unit-tests @@ -155,7 +155,7 @@ jobs: - name: Build and test ๐Ÿ—๏ธ๐Ÿงช run: | mkdir -p build - cd build && cmake -DUSE_BUNDLED_DEPS=ON -DCMAKE_BUILD_TYPE=Release -DCMAKE_MSVC_RUNTIME_LIBRARY=${{ matrix.crt }} -DCREATE_TEST_TARGETS=ON -DMINIMAL_BUILD=ON .. + cd build && cmake -DUSE_BUNDLED_DEPS=ON -DCMAKE_BUILD_TYPE=Release -DCMAKE_MSVC_RUNTIME_LIBRARY=${{ matrix.crt }} -DCREATE_TEST_TARGETS=ON -DMINIMAL_BUILD=ON -DENABLE_THREAD_POOL=ON .. cmake --build . --config Release --parallel 4 && make run-unit-tests || libsinsp\test\Release\unit-test-libsinsp.exe build-shared-libs-macos-amd64: @@ -174,7 +174,7 @@ jobs: - name: Build ๐Ÿ—๏ธ run: | mkdir -p build - cd build && cmake -DBUILD_SHARED_LIBS=True -DUSE_BUNDLED_DEPS=False -DUSE_BUNDLED_VALIJSON=ON -DUSE_BUNDLED_BS_THREADPOOL=ON -DCMAKE_BUILD_TYPE=Release -DCREATE_TEST_TARGETS=OFF -DMINIMAL_BUILD=ON -DCMAKE_INSTALL_PREFIX=/tmp/libs-test .. + cd build && cmake -DBUILD_SHARED_LIBS=True -DUSE_BUNDLED_DEPS=False -DUSE_BUNDLED_VALIJSON=ON -DUSE_BUNDLED_BS_THREADPOOL=ON -DENABLE_THREAD_POOL=ON -DCMAKE_BUILD_TYPE=Release -DCREATE_TEST_TARGETS=OFF -DMINIMAL_BUILD=ON -DCMAKE_INSTALL_PREFIX=/tmp/libs-test .. cmake --build . --config Release --parallel $(getconf _NPROCESSORS_ONLN) - name: Install diff --git a/cmake/modules/CompilerFlags.cmake b/cmake/modules/CompilerFlags.cmake index 9a1cdb9289..619874aaf7 100644 --- a/cmake/modules/CompilerFlags.cmake +++ b/cmake/modules/CompilerFlags.cmake @@ -77,6 +77,10 @@ if(NOT MSVC) set(FALCOSECURITY_LIBS_USERSPACE_LINK_FLAGS "${FALCOSECURITY_LIBS_USERSPACE_COMPILE_FLAGS};--coverage") endif() + if(ENABLE_THREAD_POOL) + set(FALCOSECURITY_LIBS_COMMON_FLAGS "${FALCOSECURITY_LIBS_COMMON_FLAGS} -DENABLE_THREAD_POOL") + endif() + set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} ${FALCOSECURITY_LIBS_COMMON_FLAGS}") # we need also `-std=c++17` here beacuse `set(CMAKE_CXX_STANDARD 17)` is not enough to enforce c++17 # with some Cmake versions: https://github.com/falcosecurity/libs/pull/950 diff --git a/cmake/modules/libsinsp.cmake b/cmake/modules/libsinsp.cmake index 4c3e5101e1..815130de09 100644 --- a/cmake/modules/libsinsp.cmake +++ b/cmake/modules/libsinsp.cmake @@ -20,6 +20,7 @@ if(NOT LIBS_DIR) endif() option(USE_BUNDLED_DEPS "Enable bundled dependencies instead of using the system ones" ON) +option(ENABLE_THREAD_POOL "Enable inspector thread pool" OFF) if(DEFINED LIBSINSP_USER_AGENT) add_definitions(-DLIBSINSP_USER_AGENT="${LIBSINSP_USER_AGENT}") @@ -37,7 +38,10 @@ endif() include(jsoncpp) include(valijson) include(re2) -include(bs_threadpool) + +if(ENABLE_THREAD_POOL AND NOT EMSCRIPTEN) + include(bs_threadpool) +endif() set(LIBSINSP_INCLUDE_DIRS ${LIBS_DIR} ${LIBS_DIR}/userspace ${LIBSCAP_INCLUDE_DIRS} ${DRIVER_CONFIG_DIR}) @@ -55,8 +59,10 @@ list(APPEND LIBSINSP_INCLUDE_DIRS ${VALIJSON_ABSOLUTE_INCLUDE_DIR}) get_filename_component(RE2_ABSOLUTE_INCLUDE_DIR ${RE2_INCLUDE} ABSOLUTE) list(APPEND LIBSINSP_INCLUDE_DIRS ${RE2_ABSOLUTE_INCLUDE_DIR}) -get_filename_component(BS_THREADPOOL_ABSOLUTE_INCLUDE_DIR ${BS_THREADPOOL_INCLUDE} ABSOLUTE) -list(APPEND LIBSINSP_INCLUDE_DIRS ${BS_THREADPOOL_ABSOLUTE_INCLUDE_DIR}) +if (ENABLE_THREAD_POOL AND NOT EMSCRIPTEN) + get_filename_component(BS_THREADPOOL_ABSOLUTE_INCLUDE_DIR ${BS_THREADPOOL_INCLUDE} ABSOLUTE) + list(APPEND LIBSINSP_INCLUDE_DIRS ${BS_THREADPOOL_ABSOLUTE_INCLUDE_DIR}) +endif() if(NOT MINIMAL_BUILD AND NOT EMSCRIPTEN AND NOT APPLE) get_filename_component(CARES_ABSOLUTE_INCLUDE_DIR ${CARES_INCLUDE} ABSOLUTE) diff --git a/userspace/libsinsp/CMakeLists.txt b/userspace/libsinsp/CMakeLists.txt index 9159e68a34..f701a539a3 100644 --- a/userspace/libsinsp/CMakeLists.txt +++ b/userspace/libsinsp/CMakeLists.txt @@ -16,6 +16,7 @@ # option(USE_BUNDLED_DEPS "Enable bundled dependencies instead of using the system ones" ON) +option(ENABLE_THREAD_POOL "Enable inspector thread pool" OFF) if(NOT MSVC) add_definitions(-DHAVE_PWD_H) @@ -100,7 +101,6 @@ add_library(sinsp prefix_search.cpp sinsp_syslog.cpp threadinfo.cpp - thread_pool.cpp tuples.cpp sinsp.cpp token_bucket.cpp @@ -113,6 +113,13 @@ add_library(sinsp events/sinsp_events_ppm_sc.cpp ) +if (ENABLE_THREAD_POOL AND NOT EMSCRIPTEN) + target_sources(sinsp + PRIVATE + thread_pool_bs.cpp + ) +endif() + if(NOT WIN32 AND NOT APPLE) target_sources(sinsp PRIVATE procfs_utils.cpp sinsp_cgroup.cpp) endif() @@ -200,7 +207,7 @@ if(USE_BUNDLED_JSONCPP) add_dependencies(sinsp jsoncpp) endif() -if(USE_BUNDLED_BS_THREADPOOL) +if(ENABLE_THREAD_POOL AND USE_BUNDLED_BS_THREADPOOL) add_dependencies(sinsp bs_threadpool) endif() diff --git a/userspace/libsinsp/plugin.cpp b/userspace/libsinsp/plugin.cpp index 7eaf4d1faf..1a777b8c0a 100755 --- a/userspace/libsinsp/plugin.cpp +++ b/userspace/libsinsp/plugin.cpp @@ -826,7 +826,7 @@ thread_pool::routine_id_t sinsp_plugin::subscribe_routine(ss_plugin_routine_fn_t { if(!m_thread_pool) { - return static_cast(nullptr); + return reinterpret_cast(nullptr); } auto f = [this, routine_fn, routine_state]() -> bool { @@ -836,14 +836,14 @@ thread_pool::routine_id_t sinsp_plugin::subscribe_routine(ss_plugin_routine_fn_t return m_thread_pool->subscribe(f); } -void sinsp_plugin::unsubscribe_routine(thread_pool::routine_id_t routine_id) +bool sinsp_plugin::unsubscribe_routine(thread_pool::routine_id_t routine_id) { if(!m_thread_pool || !routine_id) { - return; + return false; } - m_thread_pool->unsubscribe(routine_id); + return m_thread_pool->unsubscribe(routine_id); } ss_plugin_routine_t* plugin_subscribe_routine(ss_plugin_owner_t* o, ss_plugin_routine_fn_t r, ss_plugin_routine_state_t* s) @@ -851,24 +851,29 @@ ss_plugin_routine_t* plugin_subscribe_routine(ss_plugin_owner_t* o, ss_plugin_ro auto t = static_cast(o); auto res = t->subscribe_routine(r, s); - return static_cast(res); + return reinterpret_cast(res); } -void plugin_unsubscribe_routine(ss_plugin_owner_t* o, ss_plugin_routine_t* r) +ss_plugin_rc plugin_unsubscribe_routine(ss_plugin_owner_t* o, ss_plugin_routine_t* r) { auto t = static_cast(o); - auto id = static_cast(r); + auto id = reinterpret_cast(r); - t->unsubscribe_routine(id); + return t->unsubscribe_routine(id) ? SS_PLUGIN_SUCCESS : SS_PLUGIN_FAILURE; } -void sinsp_plugin::capture_open() +bool sinsp_plugin::capture_open() { if(!m_inited) { throw sinsp_exception(std::string(s_not_init_err) + ": " + m_name); } + if(!m_handle->api.capture_open) + { + return false; + } + ss_plugin_routine_vtable routine_vtable; routine_vtable.subscribe = &plugin_subscribe_routine; routine_vtable.unsubscribe = &plugin_unsubscribe_routine; @@ -882,24 +887,26 @@ void sinsp_plugin::capture_open() in.owner = (ss_plugin_owner_t *) this; in.table_reader_ext = &table_reader_ext; in.table_writer_ext = &table_writer_ext; - in.routine = routine_vtable; + in.routine = &routine_vtable; sinsp_plugin::table_read_api(table_reader, table_reader_ext); sinsp_plugin::table_write_api(table_writer, table_writer_ext); - if(m_handle->api.capture_open) - { - m_handle->api.capture_open(m_state, &in); - } + return m_handle->api.capture_open(m_state, &in) == SS_PLUGIN_SUCCESS; } -void sinsp_plugin::capture_close() +bool sinsp_plugin::capture_close() { if(!m_inited) { throw sinsp_exception(std::string(s_not_init_err) + ": " + m_name); } + if(!m_handle->api.capture_close) + { + return false; + } + ss_plugin_routine_vtable routine_vtable; routine_vtable.subscribe = &plugin_subscribe_routine; routine_vtable.unsubscribe = &plugin_unsubscribe_routine; @@ -913,15 +920,12 @@ void sinsp_plugin::capture_close() in.owner = (ss_plugin_owner_t *) this; in.table_reader_ext = &table_reader_ext; in.table_writer_ext = &table_writer_ext; - in.routine = routine_vtable; + in.routine = &routine_vtable; sinsp_plugin::table_read_api(table_reader, table_reader_ext); sinsp_plugin::table_write_api(table_writer, table_writer_ext); - if(m_handle->api.capture_close) - { - m_handle->api.capture_close(m_state, &in); - } + return m_handle->api.capture_close(m_state, &in) == SS_PLUGIN_SUCCESS; } /** Event Source CAP **/ diff --git a/userspace/libsinsp/plugin.h b/userspace/libsinsp/plugin.h index 838151f853..9976b3f957 100755 --- a/userspace/libsinsp/plugin.h +++ b/userspace/libsinsp/plugin.h @@ -29,9 +29,14 @@ limitations under the License. #include #include #include -#include #include +#if defined(ENABLE_THREAD_POOL) && !defined(__EMSCRIPTEN__) +#include +#else +#include +#endif + /** * @brief An object-oriented representation of a plugin. */ @@ -172,10 +177,10 @@ class sinsp_plugin std::string get_init_schema(ss_plugin_schema_type& schema_type) const; bool set_config(const std::string& config); std::vector get_metrics() const; - void capture_open(); - void capture_close(); + bool capture_open(); + bool capture_close(); thread_pool::routine_id_t subscribe_routine(ss_plugin_routine_fn_t routine_fn, ss_plugin_routine_state_t* routine_state); - void unsubscribe_routine(thread_pool::routine_id_t routine_id); + bool unsubscribe_routine(thread_pool::routine_id_t routine_id); /** Event Sourcing **/ inline uint32_t id() const diff --git a/userspace/libsinsp/sinsp.cpp b/userspace/libsinsp/sinsp.cpp index e119da5b4b..b5ac697fbd 100644 --- a/userspace/libsinsp/sinsp.cpp +++ b/userspace/libsinsp/sinsp.cpp @@ -132,9 +132,12 @@ sinsp::sinsp(bool with_metrics) : m_table_registry = std::make_shared(); m_table_registry->add_table(m_thread_manager.get()); -#if !defined(__EMSCRIPTEN__) - m_thread_pool = std::make_shared(); +#if defined(ENABLE_THREAD_POOL) && !defined(__EMSCRIPTEN__) + m_thread_pool = std::make_shared(); +#else + m_thread_pool = nullptr; #endif + } sinsp::~sinsp() @@ -2167,3 +2170,19 @@ void sinsp::set_track_connection_status(bool enabled) m_parser->set_track_connection_status(enabled); } +std::shared_ptr sinsp::get_thread_pool() +{ + return m_thread_pool; +} + +bool sinsp::set_thread_pool(std::shared_ptr tpool) +{ + if(!m_thread_pool) + { + m_thread_pool = tpool; + return true; + } + + return false; +} + diff --git a/userspace/libsinsp/sinsp.h b/userspace/libsinsp/sinsp.h index 86752271bd..a6b1b9c808 100644 --- a/userspace/libsinsp/sinsp.h +++ b/userspace/libsinsp/sinsp.h @@ -1018,6 +1018,9 @@ class SINSP_PUBLIC sinsp : public capture_stats_source bool get_track_connection_status() const; inline void set_track_connection_status(bool enabled); + std::shared_ptr get_thread_pool(); + bool set_thread_pool(std::shared_ptr tpool); + /** * \brief Get a new timestamp. * @@ -1203,6 +1206,8 @@ class SINSP_PUBLIC sinsp : public capture_stats_source int32_t m_quantization_interval = -1; + std::shared_ptr m_thread_pool; + public: std::unique_ptr m_thread_manager; @@ -1404,10 +1409,6 @@ class SINSP_PUBLIC sinsp : public capture_stats_source // A registry that managers the state tables of this inspector std::shared_ptr m_table_registry; - // - // A thread pool capable of running non-blocking recurring routines - std::shared_ptr m_thread_pool; - sinsp_observer* m_observer{nullptr}; bool m_inited; diff --git a/userspace/libsinsp/test/plugins.ut.cpp b/userspace/libsinsp/test/plugins.ut.cpp index 01143a9458..8d95c52131 100644 --- a/userspace/libsinsp/test/plugins.ut.cpp +++ b/userspace/libsinsp/test/plugins.ut.cpp @@ -930,44 +930,46 @@ TEST_F(sinsp_with_test_input, plugin_metrics) #endif -#if !defined(__EMSCRIPTEN__) +#if defined(ENABLE_THREAD_POOL) && !defined(__EMSCRIPTEN__) TEST_F(sinsp_with_test_input, plugin_routines) { auto p = register_plugin(&m_inspector, get_plugin_api_sample_routines); open_inspector(); - ASSERT_NE(m_inspector.m_thread_pool, nullptr); + auto tp = m_inspector.get_thread_pool(); + + ASSERT_NE(tp, nullptr); // step #0: the plugins subscribes a routine on capture open - auto routines_num = m_inspector.m_thread_pool->routines_num(); + auto routines_num = tp->routines_num(); ASSERT_EQ(routines_num, 1); // step #1: the plugin subscribes another routine add_event_advance_ts(increasing_ts(), 0, PPME_SYSCALL_OPEN_E, 3, "/tmp/the_file", PPM_O_RDWR, 0); - routines_num = m_inspector.m_thread_pool->routines_num(); + routines_num = tp->routines_num(); ASSERT_EQ(routines_num, 2); // step #2: the plugin unsubscribes the previous routine add_event_advance_ts(increasing_ts(), 0, PPME_SYSCALL_OPEN_E, 3, "/tmp/the_file", PPM_O_RDWR, 0); - routines_num = m_inspector.m_thread_pool->routines_num(); + routines_num = tp->routines_num(); ASSERT_EQ(routines_num, 1); // step #3: the plugin subscribes another routine add_event_advance_ts(increasing_ts(), 0, PPME_SYSCALL_OPEN_E, 3, "/tmp/the_file", PPM_O_RDWR, 0); - routines_num = m_inspector.m_thread_pool->routines_num(); + routines_num = tp->routines_num(); ASSERT_EQ(routines_num, 2); // step #4: the plugin sets a flag that causes the previous routine to be unsubscibed add_event_advance_ts(increasing_ts(), 0, PPME_SYSCALL_OPEN_E, 3, "/tmp/the_file", PPM_O_RDWR, 0); std::this_thread::sleep_for(std::chrono::nanoseconds(1000)); //wait for a bit to let routine finish - routines_num = m_inspector.m_thread_pool->routines_num(); + routines_num = tp->routines_num(); ASSERT_EQ(routines_num, 1); // step: #5: the plugin doesn't unsubscribe the last routine, but the thread pool shuould unsubscribe it on capture close m_inspector.close(); - std::this_thread::sleep_for(std::chrono::nanoseconds(100));; //wait for a bit to let routine finish - routines_num = m_inspector.m_thread_pool->routines_num(); + std::this_thread::sleep_for(std::chrono::nanoseconds(100)); //wait for a bit to let routine finish + routines_num = tp->routines_num(); ASSERT_EQ(routines_num, 0); } diff --git a/userspace/libsinsp/test/plugins/routines.cpp b/userspace/libsinsp/test/plugins/routines.cpp index bb549c43b8..5a52480894 100644 --- a/userspace/libsinsp/test/plugins/routines.cpp +++ b/userspace/libsinsp/test/plugins/routines.cpp @@ -19,6 +19,9 @@ limitations under the License. #include "sample_table.h" #include "test_plugins.h" +#include +#include + struct plugin_state { std::string lasterr; @@ -26,7 +29,7 @@ struct plugin_state ss_plugin_routine_vtable routine_vtable; uint8_t step = 1; - bool flag = true; + std::atomic flag = true; ss_plugin_routine_t *routine; }; @@ -130,17 +133,20 @@ static ss_plugin_rc plugin_parse_event(ss_plugin_t *s, const ss_plugin_event_inp return SS_PLUGIN_SUCCESS; } -static void plugin_capture_open(ss_plugin_t* s, const ss_plugin_capture_listen_input* i) +static ss_plugin_rc plugin_capture_open(ss_plugin_t* s, const ss_plugin_capture_listen_input* i) { plugin_state *ps = (plugin_state *) s; - ps->routine_vtable = i->routine; + ps->routine_vtable.subscribe = i->routine->subscribe; + ps->routine_vtable.unsubscribe = i->routine->unsubscribe; ps->routine_vtable.subscribe(ps->owner, do_nothing, (ss_plugin_routine_state_t*)&ps->flag); + + return SS_PLUGIN_SUCCESS; } -static void plugin_capture_close(ss_plugin_t* s, const ss_plugin_capture_listen_input* i) +static ss_plugin_rc plugin_capture_close(ss_plugin_t* s, const ss_plugin_capture_listen_input* i) { - + return SS_PLUGIN_SUCCESS; } void get_plugin_api_sample_routines(plugin_api& out) diff --git a/userspace/libsinsp/test/thread_pool.ut.cpp b/userspace/libsinsp/test/thread_pool.ut.cpp index 7373943c6b..e0c78ee076 100644 --- a/userspace/libsinsp/test/thread_pool.ut.cpp +++ b/userspace/libsinsp/test/thread_pool.ut.cpp @@ -19,12 +19,12 @@ limitations under the License. #include #include -#if !defined(__EMSCRIPTEN__) +#if defined(ENABLE_THREAD_POOL) && !defined(__EMSCRIPTEN__) TEST_F(sinsp_with_test_input, thread_pool) { open_inspector(); - auto& tp = m_inspector.m_thread_pool; + auto tp = m_inspector.get_thread_pool(); ASSERT_NE(tp, nullptr); ASSERT_EQ(tp->routines_num(), 0); @@ -36,19 +36,26 @@ TEST_F(sinsp_with_test_input, thread_pool) }); // check if the routine has been subscribed - ASSERT_NE(r, nullptr); + ASSERT_NE(r, 0); ASSERT_EQ(tp->routines_num(), 1); // check if the routine has been unsubscribed - tp->unsubscribe(r); + auto res = tp->unsubscribe(r); ASSERT_EQ(tp->routines_num(), 0); + ASSERT_EQ(res, true); + + // unsuccessful unsubscribe + res = tp->unsubscribe(0); + ASSERT_EQ(res, false); // subscribe a routine that keeps running until a condition is met (returns false) - int count = 0; - r = tp->subscribe([&count] + std::atomic count = 0; + std::atomic routine_exited = false; + r = tp->subscribe([&count, &routine_exited] { if(count >= 1024) { + routine_exited = true; return false; } count++; @@ -57,8 +64,11 @@ TEST_F(sinsp_with_test_input, thread_pool) ASSERT_EQ(tp->routines_num(), 1); // the routine above keeps increasing a counter, until the counter reaches 1024 - // we give the routine enough time to finish, then we check if it has been unsubscribed - std::this_thread::sleep_for(std::chrono::seconds(1)); + // we wait for the routine to exit, then we check if it has been unsubscribed + while(!routine_exited) + { + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + } ASSERT_EQ(count, 1024); ASSERT_EQ(tp->routines_num(), 0); diff --git a/userspace/libsinsp/thread_pool.h b/userspace/libsinsp/thread_pool.h index 7c1556ba4d..1921e892f1 100644 --- a/userspace/libsinsp/thread_pool.h +++ b/userspace/libsinsp/thread_pool.h @@ -25,7 +25,7 @@ limitations under the License. class thread_pool { public: - using routine_id_t = std::function*; + using routine_id_t = uintptr_t; thread_pool() = default; @@ -47,7 +47,7 @@ class thread_pool * * \param id A routine handle. */ - virtual void unsubscribe(routine_id_t id) = 0; + virtual bool unsubscribe(routine_id_t id) = 0; /*! * \brief Unsubscribes all the subscribed routines and waits for the running ones to finish. @@ -58,35 +58,4 @@ class thread_pool * \return The count of currently subscribed routines. */ virtual size_t routines_num() = 0; -}; - -namespace BS { - class thread_pool; -}; - -class bs_thread_pool : public thread_pool -{ -public: - bs_thread_pool(size_t num_workers = 0); - - virtual ~bs_thread_pool() - { - purge(); - } - - thread_pool::routine_id_t subscribe(const std::function& func); - - void unsubscribe(thread_pool::routine_id_t id); - - void purge(); - - size_t routines_num(); - -private: - struct default_bs_tp_deleter { void operator()(BS::thread_pool* __ptr) const; }; - - void run_routine(std::shared_ptr> id); - - std::unique_ptr m_pool; - std::list>> m_routines; }; \ No newline at end of file diff --git a/userspace/libsinsp/thread_pool.cpp b/userspace/libsinsp/thread_pool_bs.cpp similarity index 66% rename from userspace/libsinsp/thread_pool.cpp rename to userspace/libsinsp/thread_pool_bs.cpp index 729c03b837..429f221099 100644 --- a/userspace/libsinsp/thread_pool.cpp +++ b/userspace/libsinsp/thread_pool_bs.cpp @@ -16,16 +16,16 @@ limitations under the License. */ -#include +#include #include -void bs_thread_pool::default_bs_tp_deleter::operator()(BS::thread_pool* __ptr) const +void thread_pool_bs::default_bs_tp_deleter::operator()(BS::thread_pool* __ptr) const { std::default_delete{}(__ptr); } -bs_thread_pool::bs_thread_pool(size_t num_workers): m_pool(nullptr), m_routines() +thread_pool_bs::thread_pool_bs(size_t num_workers): m_pool(nullptr), m_routines() { if (num_workers == 0) { @@ -37,24 +37,33 @@ bs_thread_pool::bs_thread_pool(size_t num_workers): m_pool(nullptr), m_routines( } } -bs_thread_pool::routine_id_t bs_thread_pool::subscribe(const std::function& func) +thread_pool_bs::routine_id_t thread_pool_bs::subscribe(const std::function& func) { m_routines.push_back(std::make_shared>(func)); auto& new_routine = m_routines.back(); run_routine(new_routine); - return static_cast(new_routine.get()); + return reinterpret_cast(new_routine.get()); } -void bs_thread_pool::unsubscribe(bs_thread_pool::routine_id_t id) +bool thread_pool_bs::unsubscribe(thread_pool_bs::routine_id_t id) { - m_routines.remove_if([id](const std::shared_ptr>& v) + bool removed = false; + m_routines.remove_if([id, &removed](const std::shared_ptr>& v) { - return v.get() == static_cast*>(id); + if(v.get() == reinterpret_cast*>(id)) + { + removed = true; + return true; + } + + return false; }); + + return removed; } -void bs_thread_pool::purge() +void thread_pool_bs::purge() { m_routines.clear(); @@ -62,12 +71,12 @@ void bs_thread_pool::purge() m_pool->wait(); } -size_t bs_thread_pool::routines_num() +size_t thread_pool_bs::routines_num() { return m_routines.size(); } -void bs_thread_pool::run_routine(std::shared_ptr> routine) +void thread_pool_bs::run_routine(std::shared_ptr> routine) { m_pool->detach_task([this, routine] { diff --git a/userspace/libsinsp/thread_pool_bs.h b/userspace/libsinsp/thread_pool_bs.h new file mode 100644 index 0000000000..1b9d82afb9 --- /dev/null +++ b/userspace/libsinsp/thread_pool_bs.h @@ -0,0 +1,50 @@ +// SPDX-License-Identifier: Apache-2.0 +/* +Copyright (C) 2024 The Falco Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +*/ + +#include + +namespace BS { + class thread_pool; +}; + +class thread_pool_bs : public thread_pool +{ +public: + thread_pool_bs(size_t num_workers = 0); + + virtual ~thread_pool_bs() + { + purge(); + } + + thread_pool::routine_id_t subscribe(const std::function& func); + + bool unsubscribe(thread_pool::routine_id_t id); + + void purge(); + + size_t routines_num(); + +private: + struct default_bs_tp_deleter { void operator()(BS::thread_pool* __ptr) const; }; + + void run_routine(std::shared_ptr> id); + + std::unique_ptr m_pool; + std::list>> m_routines; +}; \ No newline at end of file diff --git a/userspace/plugin/plugin_api.h b/userspace/plugin/plugin_api.h index 14a373e78b..b173efcf1a 100644 --- a/userspace/plugin/plugin_api.h +++ b/userspace/plugin/plugin_api.h @@ -424,7 +424,9 @@ typedef struct // Arguments: // - o: the plugin's owner // - r: the routine's handle - void (*unsubscribe)(ss_plugin_owner_t* o, ss_plugin_routine_t* r); + // + // Return value: A ss_plugin_rc with values SS_PLUGIN_SUCCESS or SS_PLUGIN_FAILURE. + ss_plugin_rc (*unsubscribe)(ss_plugin_owner_t* o, ss_plugin_routine_t* r); } ss_plugin_routine_vtable; // Input passed to the plugin when the framework start and stops the capture. @@ -437,7 +439,7 @@ typedef struct ss_plugin_capture_listen_input // // Vtable containing callbacks that can be used by the plugin // for subscribing and unsubscribing routines to the framework's thread pool. - ss_plugin_routine_vtable routine; + ss_plugin_routine_vtable* routine; // // Vtable for controlling a state table for read operations. ss_plugin_table_reader_vtable_ext* table_reader_ext; @@ -1048,7 +1050,9 @@ typedef struct // Arguments: // - s: the plugin state, returned by init(). Can be NULL. // - i: input containing vtables for performing table operations and subscribe/unsubscribe async routines - void (*capture_open)(ss_plugin_t* s, const ss_plugin_capture_listen_input* i); + // + // Return value: A ss_plugin_rc with values SS_PLUGIN_SUCCESS or SS_PLUGIN_FAILURE. + ss_plugin_rc (*capture_open)(ss_plugin_t* s, const ss_plugin_capture_listen_input* i); // // Called by the framework when the event capture closes. @@ -1057,7 +1061,9 @@ typedef struct // Arguments: // - s: the plugin state, returned by init(). Can be NULL. // - i: input containing vtables for performing table operations and subscribe/unsubscribe async routines - void (*capture_close)(ss_plugin_t* s, const ss_plugin_capture_listen_input* i); + // + // Return value: A ss_plugin_rc with values SS_PLUGIN_SUCCESS or SS_PLUGIN_FAILURE. + ss_plugin_rc (*capture_close)(ss_plugin_t* s, const ss_plugin_capture_listen_input* i); }; } plugin_api;