Skip to content

Commit

Permalink
modified the plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
vorabrijesh committed Aug 8, 2022
1 parent f2df0f2 commit 6784ae6
Show file tree
Hide file tree
Showing 6 changed files with 157 additions and 137 deletions.
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ find_package(Threads REQUIRED)
if (SIMPLEWEBSOCKETSERVER_FOUND)
include_directories(BEFORE ${SimpleWebsocketServer})
zeek_plugin_begin(Ennetix Websocket ${ZEEK_PLUGIN_BEGIN_OPTS})
zeek_plugin_cc(src/WebsocketWriter.cc)
zeek_plugin_cc(src/WebsocketWriter.cpp)
zeek_plugin_cc(src/Plugin.cc)
zeek_plugin_bif(src/websocket.bif)
zeek_plugin_dist_files(README CHANGES COPYING VERSION)
Expand Down
10 changes: 4 additions & 6 deletions src/Plugin.cc
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@

#include "Plugin.h"

namespace plugin { namespace Ennetix_Websocket { Plugin plugin; } }

using namespace plugin::Ennetix_Websocket;

zeek::plugin::Configuration Plugin::Configure()
{
zeek::plugin::Configuration Plugin::Configure() {
auto loggingComponent = new zeek::logging::Component(
"WebsocketWriter",
zeek::logging::writer::WebsocketWriter::Instantiate
"WebsocketWriter",
zeek::logging::writer::WebsocketWriter::Instantiate
);
AddComponent(loggingComponent);
zeek::plugin::Configuration config;
Expand All @@ -19,4 +17,4 @@ zeek::plugin::Configuration Plugin::Configure()
config.version.minor = 1;
config.version.patch = 0;
return config;
}
}
8 changes: 5 additions & 3 deletions src/Plugin.h
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@

#pragma once
#ifndef ZEEK_PLUGIN_WEBSOCKET
#define ZEEK_PLUGIN_WEBSOCKET

#include <zeek/plugin/Plugin.h>
#include "WebsocketWriter.h"
#include "WebsocketWriter.hpp"

namespace plugin {
namespace Ennetix_Websocket {
Expand All @@ -18,3 +18,5 @@ extern Plugin plugin;

}
}

#endif
126 changes: 0 additions & 126 deletions src/WebsocketWriter.cc

This file was deleted.

145 changes: 145 additions & 0 deletions src/WebsocketWriter.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
#include "WebsocketWriter.hpp"

// zeek::logging::writer defined in Websocket
using namespace zeek::logging::writer;

using WsServer = SimpleWeb::SocketServer<SimpleWeb::WS>;

// The Constructor is called once for each log filter that uses this log writer.
WebsocketWriter::WebsocketWriter(WriterFrontend *frontend)
: WriterBackend(frontend) {

}

WebsocketWriter::~WebsocketWriter() {
// Cleanup must happen in DoFinish, not in the destructor
}

bool WebsocketWriter::DoInit(
const WriterInfo &info,
int num_fields,
const threading::Field *const *fields
) {

threading::formatter::JSON::TimeFormat tf = threading::formatter::JSON::TS_EPOCH;
formatter = new threading::formatter::JSON(this, tf);
only_once = 0;
input_log= "";
return true;
}

bool WebsocketWriter::DoFinish(double network_time) {
return true;
}

bool WebsocketWriter::DoWrite(
int num_fields,
const threading::Field *const *fields,
threading::Value **vals
) {

ODesc buff;
buff.Clear();

formatter->Describe(&buff, num_fields, fields, vals);
// send the formatted log entry to kafka
const char *raw = (const char *)buff.Bytes();
std::cout<<const_cast<char *>(raw)<<std::endl;
std::string s(const_cast<char *>(raw));
input_log += s + "\n";

server.config.port = 8080;
auto &echo = server.endpoint["^/echo/?$"];
echo.on_message = [this](
std::shared_ptr<WsServer::Connection> connection,
std::shared_ptr<WsServer::InMessage> in_message
) {

auto out_message = std::make_shared<std::string>(input_log); //in_message->string();
// std::cout << "Server: Message received: \"" << out_message << "\" from " << connection.get() << std::endl;

// std::cout << "Server: Sending message \"" << out_message << "\" to " << connection.get() << std::endl;

// connection->send is an asynchronous function
connection->send(input_log, [](const SimpleWeb::error_code &ec) {
if(ec) {
std::cout << "Server: Error sending message. " <<
// See http://www.boost.org/doc/libs/1_55_0/doc/html/boost_asio/reference.html, Error Codes for error code meanings
"Error: " << ec << ", error message: " << ec.message() << std::endl;
}
});
input_log = "";
};

echo.on_open = [](std::shared_ptr<WsServer::Connection> connection) {
std::cout << "Server: Opened connection " << connection.get() << std::endl;
};

// See RFC 6455 7.4.1. for status codes
echo.on_close = [](
std::shared_ptr<WsServer::Connection> connection,
int status,
const std::string& /*reason*/
) {
std::cout << "Server: Closed connection " << connection.get() << " with status code " << status << std::endl;
};

// Can modify handshake response headers here if needed
echo.on_handshake = [](
std::shared_ptr<WsServer::Connection> /*connection*/,
SimpleWeb::CaseInsensitiveMultimap & /*response_header*/
) {
return SimpleWeb::StatusCode::information_switching_protocols; // Upgrade to websocket
};

// See http://www.boost.org/doc/libs/1_55_0/doc/html/boost_asio/reference.html, Error Codes for error code meanings
echo.on_error = [](
std::shared_ptr<WsServer::Connection> connection,
const SimpleWeb::error_code &ec
) {
std::cout << "Server: Error in connection " << connection.get() << ". "
<< "Error: " << ec << ", error message: " << ec.message() << std::endl;
};

if (only_once == 0){ // start the server thread only once.
only_once = 1;

std::promise<unsigned short> server_port;
std::thread server_thread([this, &server_port]() {
// Start server
server.start([&server_port](unsigned short port) {
server_port.set_value(port);
});
});

std::cout << "Server listening on port " << server_port.get_future().get() << std::endl << std::endl;
server_thread.detach();
}
return true;
}

bool WebsocketWriter::DoSetBuf(bool enabled) {
// no change in behavior
return true;
}

bool WebsocketWriter::DoFlush(double network_time) {
return true;
}

bool WebsocketWriter::DoRotate(
const char *rotated_path,
double open, double close,
bool terminating
) {
// no need to perform log rotation
return FinishedRotation();
}

bool WebsocketWriter::DoHeartbeat(
double network_time,
double current_time
) {

return true;
}
3 changes: 2 additions & 1 deletion src/WebsocketWriter.h → src/WebsocketWriter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,11 @@
#include "websocket.bif.h"

using WsServer = SimpleWeb::SocketServer<SimpleWeb::WS>;

namespace zeek::logging::writer {

/**
* A logging writer that sends data to a Kafka broker.
* A logging writer that sends data to a websockets
*/
class WebsocketWriter : public WriterBackend {

Expand Down

0 comments on commit 6784ae6

Please sign in to comment.