Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Implement kv log event serialization method #13

Open
wants to merge 17 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
460 changes: 460 additions & 0 deletions components/core/src/clp/ffi/ir_stream/Serializer.cpp

Large diffs are not rendered by default.

36 changes: 36 additions & 0 deletions components/core/src/clp/ffi/ir_stream/Serializer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@

#include <cstdint>
#include <span>
#include <string>
#include <vector>

#include <boost-outcome/include/boost/outcome/std_result.hpp>
#include <msgpack.hpp>

#include "../../time_types.hpp"
#include "../SchemaTree.hpp"
#include "../SchemaTreeNode.hpp"

namespace clp::ffi::ir_stream {
/**
Expand Down Expand Up @@ -79,14 +82,47 @@ class Serializer {
*/
auto change_utc_offset(UtcOffset utc_offset) -> void;

/**
* Serializes the given msgpack map as a key-value pair log event.
* @param msgpack_map
* @return Whether serialization succeeded.
*/
[[nodiscard]] auto serialize_msgpack_map(msgpack::object_map const& msgpack_map) -> bool;

private:
// Constructors
Serializer() = default;

// Methods
/**
* Serializes a schema tree node identified by the given locator into `m_schema_tree_node_buf`.
* @param locator
* @return Whether serialization succeeded.
*/
[[nodiscard]] auto serialize_schema_tree_node(SchemaTree::NodeLocator const& locator) -> bool;

/**
* Serializes the given key ID into `m_key_group_buf`.
* @param id
* @return true on success.
* @return false if the ID exceeds the representable range.
*/
[[nodiscard]] auto serialize_key(SchemaTreeNode::id_t id) -> bool;

/**
* Serializes the given MessagePack value into `m_value_group_buf`.
* @param val
* @param schema_tree_node_type The type of the schema tree node that corresponds to `val`.
* @return Whether serialization succeeded.
*/
[[nodiscard]] auto
serialize_val(msgpack::object const& val, SchemaTreeNode::Type schema_tree_node_type) -> bool;

UtcOffset m_curr_utc_offset{0};
Buffer m_ir_buf;
SchemaTree m_schema_tree;

std::string m_logtype_buf;
Buffer m_schema_tree_node_buf;
Buffer m_key_group_buf;
Buffer m_value_group_buf;
Expand Down
26 changes: 17 additions & 9 deletions components/core/src/clp/ffi/ir_stream/encoding_methods.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,22 @@ bool serialize_log_event(
string_view message,
string& logtype,
vector<int8_t>& ir_buf
) {
if (false == serialize_message(message, logtype, ir_buf)) {
return false;
}

// Encode timestamp
ir_buf.push_back(cProtocol::Payload::TimestampVal);
serialize_int(timestamp, ir_buf);

return true;
}

bool serialize_message(
std::string_view message,
std::string& logtype,
std::vector<int8_t>& ir_buf
) {
auto encoded_var_handler = [&ir_buf](eight_byte_encoded_variable_t encoded_var) {
ir_buf.push_back(cProtocol::Payload::VarEightByteEncoding);
Expand All @@ -153,15 +169,7 @@ bool serialize_log_event(
return false;
}

if (false == serialize_logtype(logtype, ir_buf)) {
return false;
}

// Encode timestamp
ir_buf.push_back(cProtocol::Payload::TimestampVal);
serialize_int(timestamp, ir_buf);

return true;
return serialize_logtype(logtype, ir_buf);
}
} // namespace eight_byte_encoding

Expand Down
9 changes: 9 additions & 0 deletions components/core/src/clp/ffi/ir_stream/encoding_methods.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,15 @@ bool serialize_log_event(
std::string& logtype,
std::vector<int8_t>& ir_buf
);

/**
* Serializes the given message into the eight-byte encoding IR stream.
* @param message
* @param logtype Returns the message's logtype.
* @param ir_buf
* @return Whether the message was serialized successfully.
*/
bool serialize_message(std::string_view message, std::string& logtype, std::vector<int8_t>& ir_buf);
} // namespace eight_byte_encoding

namespace four_byte_encoding {
Expand Down
29 changes: 29 additions & 0 deletions components/core/src/clp/ffi/ir_stream/protocol_constants.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,35 @@ constexpr int8_t TimestampDeltaInt = 0x33;
constexpr int8_t TimestampDeltaLong = 0x34;

constexpr int8_t UtcOffsetChange = 0x3F;

constexpr int8_t StrLenUByte = 0x41;
constexpr int8_t StrLenUShort = 0x42;
constexpr int8_t StrLenUInt = 0x43;

constexpr int8_t ValueInt8 = 0x51;
constexpr int8_t ValueInt16 = 0x52;
constexpr int8_t ValueInt32 = 0x53;
constexpr int8_t ValueInt64 = 0x54;
constexpr int8_t ValueFloat = 0x56;
constexpr int8_t ValueTrue = 0x57;
constexpr int8_t ValueFalse = 0x58;
constexpr int8_t ValueFourByteEncodingClpStr = 0x59;
constexpr int8_t ValueEightByteEncodingClpStr = 0x5A;
constexpr int8_t ValueEmpty = 0x5E;
constexpr int8_t ValueNull = 0x5F;

constexpr int8_t SchemaTreeNodeParentIdUByte = 0x60;
constexpr int8_t SchemaTreeNodeParentIdUShort = 0x61;

constexpr int8_t KeyIdUByte = 0x65;
constexpr int8_t KeyIdUShort = 0x66;

constexpr int8_t SchemaTreeNodeInt = 0x71;
constexpr int8_t SchemaTreeNodeFloat = 0x72;
constexpr int8_t SchemaTreeNodeBool = 0x73;
constexpr int8_t SchemaTreeNodeStr = 0x74;
constexpr int8_t SchemaTreeNodeUnstructuredArray = 0x75;
constexpr int8_t SchemaTreeNodeObj = 0x76;
} // namespace Payload

constexpr int8_t FourByteEncodingMagicNumber[]
Expand Down
34 changes: 27 additions & 7 deletions components/core/src/clp/ffi/ir_stream/utils.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "utils.hpp"

#include <cstdint>
#include <string_view>
#include <vector>

#include <json/single_include/nlohmann/json.hpp>
Expand All @@ -9,24 +10,43 @@
#include "protocol_constants.hpp"

namespace clp::ffi::ir_stream {
auto serialize_metadata(nlohmann::json& metadata, std::vector<int8_t>& ir_buf) -> bool {
ir_buf.push_back(cProtocol::Metadata::EncodingJson);
auto serialize_metadata(nlohmann::json& metadata, std::vector<int8_t>& output_buf) -> bool {
output_buf.push_back(cProtocol::Metadata::EncodingJson);

auto const metadata_serialized
= metadata.dump(-1, ' ', false, nlohmann::json::error_handler_t::ignore);
auto const metadata_serialized_length = metadata_serialized.length();
if (metadata_serialized_length <= UINT8_MAX) {
ir_buf.push_back(cProtocol::Metadata::LengthUByte);
ir_buf.push_back(bit_cast<int8_t>(static_cast<uint8_t>(metadata_serialized_length)));
output_buf.push_back(cProtocol::Metadata::LengthUByte);
output_buf.push_back(bit_cast<int8_t>(static_cast<uint8_t>(metadata_serialized_length)));
} else if (metadata_serialized_length <= UINT16_MAX) {
ir_buf.push_back(cProtocol::Metadata::LengthUShort);
serialize_int(static_cast<uint16_t>(metadata_serialized_length), ir_buf);
output_buf.push_back(cProtocol::Metadata::LengthUShort);
serialize_int(static_cast<uint16_t>(metadata_serialized_length), output_buf);
} else {
// Can't encode metadata longer than 64 KiB
return false;
}
ir_buf.insert(ir_buf.cend(), metadata_serialized.cbegin(), metadata_serialized.cend());
output_buf.insert(output_buf.cend(), metadata_serialized.cbegin(), metadata_serialized.cend());

return true;
}

auto serialize_string(std::string_view str, std::vector<int8_t>& output_buf) -> bool {
auto const length{str.length()};
if (length <= UINT8_MAX) {
output_buf.push_back(cProtocol::Payload::StrLenUByte);
output_buf.push_back(bit_cast<int8_t>(static_cast<uint8_t>(length)));
} else if (length <= UINT16_MAX) {
output_buf.push_back(cProtocol::Payload::StrLenUShort);
serialize_int(static_cast<uint16_t>(length), output_buf);
} else if (length <= UINT32_MAX) {
output_buf.push_back(cProtocol::Payload::StrLenUInt);
serialize_int(static_cast<uint32_t>(length), output_buf);
} else {
// Out of range
return false;
}
output_buf.insert(output_buf.cend(), str.cbegin(), str.cend());
return true;
}
} // namespace clp::ffi::ir_stream
61 changes: 55 additions & 6 deletions components/core/src/clp/ffi/ir_stream/utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,33 +3,61 @@

#include <cstdint>
#include <span>
#include <string>
#include <string_view>
#include <vector>

#include <json/single_include/nlohmann/json.hpp>

#include "../../ir/types.hpp"
#include "byteswap.hpp"
#include "encoding_methods.hpp"
#include "protocol_constants.hpp"

namespace clp::ffi::ir_stream {
/**
* Serializes the given metadata into the IR stream.
* @param metadata
* @param ir_buf
* @param output_buf
* @return Whether serialization succeeded.
*/
[[nodiscard]] auto
serialize_metadata(nlohmann::json& metadata, std::vector<int8_t>& ir_buf) -> bool;
serialize_metadata(nlohmann::json& metadata, std::vector<int8_t>& output_buf) -> bool;

/**
* Serializes the given integer into the IR stream.
* @tparam integer_t
* @param value
* @param ir_buf
* @param output_buf
*/
template <typename integer_t>
auto serialize_int(integer_t value, std::vector<int8_t>& ir_buf) -> void;
auto serialize_int(integer_t value, std::vector<int8_t>& output_buf) -> void;

/**
* Serializes a string using CLP's encoding for unstructured text.
* @tparam encoded_variable_t
* @param str
* @param logtype Returns the corresponding logtype.
* @param output_buf
* @return Whether serialization succeeded.
*/
template <typename encoded_variable_t>
[[nodiscard]] auto serialize_clp_string(
std::string_view str,
std::string& logtype,
std::vector<int8_t>& output_buf
) -> bool;

/**
* Serializes a string.
* @param str
* @param output_buf
* @return Whether serialization succeeded.
*/
[[nodiscard]] auto serialize_string(std::string_view str, std::vector<int8_t>& output_buf) -> bool;

template <typename integer_t>
auto serialize_int(integer_t value, std::vector<int8_t>& ir_buf) -> void {
auto serialize_int(integer_t value, std::vector<int8_t>& output_buf) -> void {
integer_t value_big_endian{};
static_assert(sizeof(integer_t) == 2 || sizeof(integer_t) == 4 || sizeof(integer_t) == 8);
if constexpr (sizeof(value) == 2) {
Expand All @@ -41,7 +69,28 @@ auto serialize_int(integer_t value, std::vector<int8_t>& ir_buf) -> void {
}
// NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast)
std::span<int8_t> const data_view{reinterpret_cast<int8_t*>(&value_big_endian), sizeof(value)};
ir_buf.insert(ir_buf.end(), data_view.begin(), data_view.end());
output_buf.insert(output_buf.end(), data_view.begin(), data_view.end());
}

template <typename encoded_variable_t>
[[nodiscard]] auto serialize_clp_string(
std::string_view str,
std::string& logtype,
std::vector<int8_t>& output_buf
) -> bool {
static_assert(
(std::is_same_v<encoded_variable_t, clp::ir::eight_byte_encoded_variable_t>
|| std::is_same_v<encoded_variable_t, clp::ir::four_byte_encoded_variable_t>)
);
bool succeeded{};
if constexpr (std::is_same_v<encoded_variable_t, clp::ir::four_byte_encoded_variable_t>) {
output_buf.push_back(cProtocol::Payload::ValueFourByteEncodingClpStr);
succeeded = four_byte_encoding::serialize_message(str, logtype, output_buf);
} else {
output_buf.push_back(cProtocol::Payload::ValueEightByteEncodingClpStr);
succeeded = eight_byte_encoding::serialize_message(str, logtype, output_buf);
}
return succeeded;
}
} // namespace clp::ffi::ir_stream
#endif
Loading
Loading