From 6784ae685f31c3d958846e4fc3aa85ddcc87c964 Mon Sep 17 00:00:00 2001 From: vorabrijesh Date: Mon, 8 Aug 2022 09:45:16 -0700 Subject: [PATCH] modified the plugin --- CMakeLists.txt | 2 +- src/Plugin.cc | 10 +- src/Plugin.h | 8 +- src/WebsocketWriter.cc | 126 --------------- src/WebsocketWriter.cpp | 145 ++++++++++++++++++ ...{WebsocketWriter.h => WebsocketWriter.hpp} | 3 +- 6 files changed, 157 insertions(+), 137 deletions(-) delete mode 100644 src/WebsocketWriter.cc create mode 100644 src/WebsocketWriter.cpp rename src/{WebsocketWriter.h => WebsocketWriter.hpp} (96%) diff --git a/CMakeLists.txt b/CMakeLists.txt index a891ab8..d67f36c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -18,7 +18,7 @@ find_package(Threads REQUIRED) if (SIMPLEWEBSOCKETSERVER_FOUND) include_directories(BEFORE ${SimpleWebsocketServer}) zeek_plugin_begin(Ennetix Websocket ${ZEEK_PLUGIN_BEGIN_OPTS}) - zeek_plugin_cc(src/WebsocketWriter.cc) + zeek_plugin_cc(src/WebsocketWriter.cpp) zeek_plugin_cc(src/Plugin.cc) zeek_plugin_bif(src/websocket.bif) zeek_plugin_dist_files(README CHANGES COPYING VERSION) diff --git a/src/Plugin.cc b/src/Plugin.cc index 6f65095..36a19eb 100644 --- a/src/Plugin.cc +++ b/src/Plugin.cc @@ -1,15 +1,13 @@ - #include "Plugin.h" namespace plugin { namespace Ennetix_Websocket { Plugin plugin; } } using namespace plugin::Ennetix_Websocket; -zeek::plugin::Configuration Plugin::Configure() - { +zeek::plugin::Configuration Plugin::Configure() { auto loggingComponent = new zeek::logging::Component( - "WebsocketWriter", - zeek::logging::writer::WebsocketWriter::Instantiate + "WebsocketWriter", + zeek::logging::writer::WebsocketWriter::Instantiate ); AddComponent(loggingComponent); zeek::plugin::Configuration config; @@ -19,4 +17,4 @@ zeek::plugin::Configuration Plugin::Configure() config.version.minor = 1; config.version.patch = 0; return config; - } +} diff --git a/src/Plugin.h b/src/Plugin.h index 58620e7..01285f3 100644 --- a/src/Plugin.h +++ b/src/Plugin.h @@ -1,8 +1,8 @@ - -#pragma once +#ifndef ZEEK_PLUGIN_WEBSOCKET +#define ZEEK_PLUGIN_WEBSOCKET #include -#include "WebsocketWriter.h" +#include "WebsocketWriter.hpp" namespace plugin { namespace Ennetix_Websocket { @@ -18,3 +18,5 @@ extern Plugin plugin; } } + +#endif \ No newline at end of file diff --git a/src/WebsocketWriter.cc b/src/WebsocketWriter.cc deleted file mode 100644 index 00ded14..0000000 --- a/src/WebsocketWriter.cc +++ /dev/null @@ -1,126 +0,0 @@ -#include "WebsocketWriter.h" - -using namespace zeek::logging; -using namespace writer; -using namespace std; - -using WsServer = SimpleWeb::SocketServer; - - -// The Constructor is called once for each log filter that uses this log writer. -WebsocketWriter::WebsocketWriter(WriterFrontend *frontend) - : WriterBackend(frontend), formatter(NULL) { - -} - -WebsocketWriter::~WebsocketWriter() { - // Cleanup must happen in DoFinish, not in the destructor -} - -bool WebsocketWriter::DoInit(const WriterInfo &info, int num_fields, const threading::Field *const *fields) { - threading::formatter::JSON::TimeFormat tf = threading::formatter::JSON::TS_EPOCH; - formatter = new threading::formatter::JSON(this, tf); - only_once = 0; - input_log= ""; - return true; -} - -bool WebsocketWriter::DoFinish(double network_time) { - return true; -} - - -bool WebsocketWriter::DoWrite(int num_fields, const threading::Field *const *fields, - threading::Value **vals) { - ODesc buff; - buff.Clear(); - - formatter->Describe(&buff, num_fields, fields, vals); - // send the formatted log entry to kafka - const char *raw = (const char *)buff.Bytes(); - std::cout<(raw)<(raw)); - input_log += s + "\n"; - - - server.config.port = 8080; - auto &echo = server.endpoint["^/echo/?$"]; - echo.on_message = [this](shared_ptr connection, shared_ptr in_message) { - - auto out_message = make_shared(input_log); //in_message->string(); - // cout << "Server: Message received: \"" << out_message << "\" from " << connection.get() << endl; - - // cout << "Server: Sending message \"" << out_message << "\" to " << connection.get() << endl; - - // connection->send is an asynchronous function - connection->send(input_log, [](const SimpleWeb::error_code &ec) { - if(ec) { - cout << "Server: Error sending message. " << - // See http://www.boost.org/doc/libs/1_55_0/doc/html/boost_asio/reference.html, Error Codes for error code meanings - "Error: " << ec << ", error message: " << ec.message() << endl; - } - }); - input_log = ""; - }; - - echo.on_open = [](shared_ptr connection) { - cout << "Server: Opened connection " << connection.get() << endl; - }; - - // See RFC 6455 7.4.1. for status codes - echo.on_close = [](shared_ptr connection, int status, const string & /*reason*/) { - cout << "Server: Closed connection " << connection.get() << " with status code " << status << endl; - }; - - // Can modify handshake response headers here if needed - echo.on_handshake = [](shared_ptr /*connection*/, SimpleWeb::CaseInsensitiveMultimap & /*response_header*/) { - return SimpleWeb::StatusCode::information_switching_protocols; // Upgrade to websocket - }; - - // See http://www.boost.org/doc/libs/1_55_0/doc/html/boost_asio/reference.html, Error Codes for error code meanings - echo.on_error = [](shared_ptr connection, const SimpleWeb::error_code &ec) { - cout << "Server: Error in connection " << connection.get() << ". " - << "Error: " << ec << ", error message: " << ec.message() << endl; - }; - - if (only_once==0) // start the server thread only once. - { - only_once = 1; - - promise server_port; - std::thread server_thread( [this, &server_port]() { - // Start server - server.start([&server_port](unsigned short port) { - server_port.set_value(port); - }); - }); - cout << "Server listening on port " << server_port.get_future().get() << endl << endl; - - // server_thread.join(); - server_thread.detach(); - } - return true; -} - - -bool WebsocketWriter::DoSetBuf(bool enabled) { - // no change in behavior - return true; -} - - -bool WebsocketWriter::DoFlush(double network_time) { - return true; -} - - -bool WebsocketWriter::DoRotate(const char *rotated_path, double open, double close, - bool terminating) { - // no need to perform log rotation - return FinishedRotation(); -} - -bool WebsocketWriter::DoHeartbeat(double network_time, double current_time) { - - return true; -} \ No newline at end of file diff --git a/src/WebsocketWriter.cpp b/src/WebsocketWriter.cpp new file mode 100644 index 0000000..f74e452 --- /dev/null +++ b/src/WebsocketWriter.cpp @@ -0,0 +1,145 @@ +#include "WebsocketWriter.hpp" + +// zeek::logging::writer defined in Websocket +using namespace zeek::logging::writer; + +using WsServer = SimpleWeb::SocketServer; + +// The Constructor is called once for each log filter that uses this log writer. +WebsocketWriter::WebsocketWriter(WriterFrontend *frontend) + : WriterBackend(frontend) { + +} + +WebsocketWriter::~WebsocketWriter() { + // Cleanup must happen in DoFinish, not in the destructor +} + +bool WebsocketWriter::DoInit( + const WriterInfo &info, + int num_fields, + const threading::Field *const *fields +) { + + threading::formatter::JSON::TimeFormat tf = threading::formatter::JSON::TS_EPOCH; + formatter = new threading::formatter::JSON(this, tf); + only_once = 0; + input_log= ""; + return true; +} + +bool WebsocketWriter::DoFinish(double network_time) { + return true; +} + +bool WebsocketWriter::DoWrite( + int num_fields, + const threading::Field *const *fields, + threading::Value **vals +) { + + ODesc buff; + buff.Clear(); + + formatter->Describe(&buff, num_fields, fields, vals); + // send the formatted log entry to kafka + const char *raw = (const char *)buff.Bytes(); + std::cout<(raw)<(raw)); + input_log += s + "\n"; + + server.config.port = 8080; + auto &echo = server.endpoint["^/echo/?$"]; + echo.on_message = [this]( + std::shared_ptr connection, + std::shared_ptr in_message + ) { + + auto out_message = std::make_shared(input_log); //in_message->string(); + // std::cout << "Server: Message received: \"" << out_message << "\" from " << connection.get() << std::endl; + + // std::cout << "Server: Sending message \"" << out_message << "\" to " << connection.get() << std::endl; + + // connection->send is an asynchronous function + connection->send(input_log, [](const SimpleWeb::error_code &ec) { + if(ec) { + std::cout << "Server: Error sending message. " << + // See http://www.boost.org/doc/libs/1_55_0/doc/html/boost_asio/reference.html, Error Codes for error code meanings + "Error: " << ec << ", error message: " << ec.message() << std::endl; + } + }); + input_log = ""; + }; + + echo.on_open = [](std::shared_ptr connection) { + std::cout << "Server: Opened connection " << connection.get() << std::endl; + }; + + // See RFC 6455 7.4.1. for status codes + echo.on_close = []( + std::shared_ptr connection, + int status, + const std::string& /*reason*/ + ) { + std::cout << "Server: Closed connection " << connection.get() << " with status code " << status << std::endl; + }; + + // Can modify handshake response headers here if needed + echo.on_handshake = []( + std::shared_ptr /*connection*/, + SimpleWeb::CaseInsensitiveMultimap & /*response_header*/ + ) { + return SimpleWeb::StatusCode::information_switching_protocols; // Upgrade to websocket + }; + + // See http://www.boost.org/doc/libs/1_55_0/doc/html/boost_asio/reference.html, Error Codes for error code meanings + echo.on_error = []( + std::shared_ptr connection, + const SimpleWeb::error_code &ec + ) { + std::cout << "Server: Error in connection " << connection.get() << ". " + << "Error: " << ec << ", error message: " << ec.message() << std::endl; + }; + + if (only_once == 0){ // start the server thread only once. + only_once = 1; + + std::promise server_port; + std::thread server_thread([this, &server_port]() { + // Start server + server.start([&server_port](unsigned short port) { + server_port.set_value(port); + }); + }); + + std::cout << "Server listening on port " << server_port.get_future().get() << std::endl << std::endl; + server_thread.detach(); + } + return true; +} + +bool WebsocketWriter::DoSetBuf(bool enabled) { + // no change in behavior + return true; +} + +bool WebsocketWriter::DoFlush(double network_time) { + return true; +} + +bool WebsocketWriter::DoRotate( + const char *rotated_path, + double open, double close, + bool terminating +) { + // no need to perform log rotation + return FinishedRotation(); +} + +bool WebsocketWriter::DoHeartbeat( + double network_time, + double current_time +) { + + return true; +} \ No newline at end of file diff --git a/src/WebsocketWriter.h b/src/WebsocketWriter.hpp similarity index 96% rename from src/WebsocketWriter.h rename to src/WebsocketWriter.hpp index 59b2663..126ad20 100644 --- a/src/WebsocketWriter.h +++ b/src/WebsocketWriter.hpp @@ -14,10 +14,11 @@ #include "websocket.bif.h" using WsServer = SimpleWeb::SocketServer; + namespace zeek::logging::writer { /** - * A logging writer that sends data to a Kafka broker. + * A logging writer that sends data to a websockets */ class WebsocketWriter : public WriterBackend {