Skip to content

Commit

Permalink
ffi: Add support for serializing/deserializing auto-generated and use…
Browse files Browse the repository at this point in the history
…r-generated schema tree node IDs. (y-scope#557)

Co-authored-by: kirkrodrigues <2454684+kirkrodrigues@users.noreply.github.com>
  • Loading branch information
2 people authored and Jack Luo committed Dec 4, 2024
1 parent 2c84316 commit 3e8335b
Show file tree
Hide file tree
Showing 9 changed files with 337 additions and 98 deletions.
34 changes: 15 additions & 19 deletions components/core/src/clp/ffi/ir_stream/Serializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -403,15 +403,16 @@ auto Serializer<encoded_variable_t>::serialize_schema_tree_node(
return false;
}

auto const parent_id{locator.get_parent_id()};
if (parent_id <= UINT8_MAX) {
m_schema_tree_node_buf.push_back(cProtocol::Payload::SchemaTreeNodeParentIdUByte);
m_schema_tree_node_buf.push_back(bit_cast<int8_t>(static_cast<uint8_t>(parent_id)));
} else if (parent_id <= UINT16_MAX) {
m_schema_tree_node_buf.push_back(cProtocol::Payload::SchemaTreeNodeParentIdUShort);
serialize_int(static_cast<uint16_t>(parent_id), m_schema_tree_node_buf);
} else {
// Out of range
if (false
== encode_and_serialize_schema_tree_node_id<
false,
cProtocol::Payload::EncodedSchemaTreeNodeParentIdByte,
cProtocol::Payload::EncodedSchemaTreeNodeParentIdShort,
cProtocol::Payload::EncodedSchemaTreeNodeParentIdInt>(
locator.get_parent_id(),
m_schema_tree_node_buf
))
{
return false;
}

Expand All @@ -420,16 +421,11 @@ auto Serializer<encoded_variable_t>::serialize_schema_tree_node(

template <typename encoded_variable_t>
auto Serializer<encoded_variable_t>::serialize_key(SchemaTree::Node::id_t id) -> bool {
if (id <= UINT8_MAX) {
m_key_group_buf.push_back(cProtocol::Payload::KeyIdUByte);
m_key_group_buf.push_back(bit_cast<int8_t>(static_cast<uint8_t>(id)));
} else if (id <= UINT16_MAX) {
m_key_group_buf.push_back(cProtocol::Payload::KeyIdUShort);
serialize_int(static_cast<uint16_t>(id), m_key_group_buf);
} else {
return false;
}
return true;
return encode_and_serialize_schema_tree_node_id<
false,
cProtocol::Payload::EncodedSchemaTreeNodeIdByte,
cProtocol::Payload::EncodedSchemaTreeNodeIdShort,
cProtocol::Payload::EncodedSchemaTreeNodeIdInt>(id, m_key_group_buf);
}

template <typename encoded_variable_t>
Expand Down
3 changes: 1 addition & 2 deletions components/core/src/clp/ffi/ir_stream/Serializer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,7 @@ class Serializer {
/**
* Serializes the given key ID into `m_key_group_buf`.
* @param id
* @return true on success.
* @return false if the ID exceeds the representable range.
* @return Forwards `encode_and_serialize_schema_tree_node_id`'s return values.
*/
[[nodiscard]] auto serialize_key(SchemaTree::Node::id_t id) -> bool;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <memory>
#include <optional>
#include <string>
#include <system_error>
#include <type_traits>
#include <utility>
#include <vector>
Expand Down Expand Up @@ -43,16 +44,16 @@ using Schema = std::vector<SchemaTree::Node::id_t>;
/**
* Deserializes the parent ID of a schema tree node.
* @param reader
* @param parent_id Returns the deserialized result.
* @return IRErrorCode::IRErrorCode_Success on success.
* @return IRErrorCode::IRErrorCode_Incomplete_IR if the stream is truncated.
* @return IRErrorCode::IRErrorCode_Corrupted_IR if the next packet in the stream isn't a parent ID.
* @return Forwards `deserialize_tag`'s return values on any other failure.
* @return A result containing a pair or an error code indicating the failure:
* - The pair:
* - Whether the node ID is for an auto-generated node.
* - The decoded node ID.
* - The possible error codes:
* - Forwards `deserialize_tag`'s return values.
* @return Forwards `deserialize_and_decode_schema_tree_node_id`'s return values.
*/
[[nodiscard]] auto deserialize_schema_tree_node_parent_id(
ReaderInterface& reader,
SchemaTree::Node::id_t& parent_id
) -> IRErrorCode;
[[nodiscard]] auto deserialize_schema_tree_node_parent_id(ReaderInterface& reader
) -> OUTCOME_V2_NAMESPACE::std_result<std::pair<bool, SchemaTree::Node::id_t>>;

/**
* Deserializes the key name of a schema tree node.
Expand Down Expand Up @@ -100,13 +101,14 @@ deserialize_int_val(ReaderInterface& reader, encoded_tag_t tag, value_int_t& val
* Deserializes the IDs of all keys in a log event.
* @param reader
* @param tag Takes the current tag as input and returns the last tag read.
* @param schema Returns the deserialized schema.
* @return IRErrorCode::IRErrorCode_Success on success.
* @return IRErrorCode::IRErrorCode_Incomplete_IR if the stream is truncated.
* @return Forwards `deserialize_tag`'s return values on any other failure.
* @return A result containing the deserialized schema or an error code indicating the failure:
* - std::err::protocol_not_supported if the IR stream contains auto-generated keys (TODO: Remove
* this once auto-generated keys are fully supported).
* - Forwards `deserialize_tag`'s return values.
* - Forwards `deserialize_and_decode_schema_tree_node_id`'s return values.
*/
[[nodiscard]] auto
deserialize_schema(ReaderInterface& reader, encoded_tag_t& tag, Schema& schema) -> IRErrorCode;
[[nodiscard]] auto deserialize_schema(ReaderInterface& reader, encoded_tag_t& tag)
-> OUTCOME_V2_NAMESPACE::std_result<Schema>;

/**
* Deserializes the next value and pushes the result into `node_id_value_pairs`.
Expand Down Expand Up @@ -170,10 +172,17 @@ requires(std::is_same_v<ir::four_byte_encoded_variable_t, encoded_variable_t>
) -> IRErrorCode;

/**
* @param tag
* @return Whether the given tag can be a valid leading tag of a log event IR unit.
*/
[[nodiscard]] auto is_log_event_ir_unit_tag(encoded_tag_t tag) -> bool;

/**
* @param tag
* @return Whether the given tag represents a valid encoded key ID.
*/
[[nodiscard]] auto is_encoded_key_id_tag(encoded_tag_t tag) -> bool;

auto schema_tree_node_tag_to_type(encoded_tag_t tag) -> std::optional<SchemaTree::Node::Type> {
switch (tag) {
case cProtocol::Payload::SchemaTreeNodeInt:
Expand All @@ -193,30 +202,16 @@ auto schema_tree_node_tag_to_type(encoded_tag_t tag) -> std::optional<SchemaTree
}
}

auto deserialize_schema_tree_node_parent_id(
ReaderInterface& reader,
SchemaTree::Node::id_t& parent_id
) -> IRErrorCode {
auto deserialize_schema_tree_node_parent_id(ReaderInterface& reader
) -> OUTCOME_V2_NAMESPACE::std_result<std::pair<bool, SchemaTree::Node::id_t>> {
encoded_tag_t tag{};
if (auto const err{deserialize_tag(reader, tag)}; IRErrorCode::IRErrorCode_Success != err) {
return err;
}
if (cProtocol::Payload::SchemaTreeNodeParentIdUByte == tag) {
uint8_t deserialized_id{};
if (false == deserialize_int(reader, deserialized_id)) {
return IRErrorCode::IRErrorCode_Incomplete_IR;
}
parent_id = static_cast<SchemaTree::Node::id_t>(deserialized_id);
} else if (cProtocol::Payload::SchemaTreeNodeParentIdUShort == tag) {
uint16_t deserialized_id{};
if (false == deserialize_int(reader, deserialized_id)) {
return IRErrorCode::IRErrorCode_Incomplete_IR;
}
parent_id = static_cast<SchemaTree::Node::id_t>(deserialized_id);
} else {
return IRErrorCode::IRErrorCode_Corrupted_IR;
return ir_error_code_to_errc(err);
}
return IRErrorCode_Success;
return deserialize_and_decode_schema_tree_node_id<
cProtocol::Payload::EncodedSchemaTreeNodeParentIdByte,
cProtocol::Payload::EncodedSchemaTreeNodeParentIdShort,
cProtocol::Payload::EncodedSchemaTreeNodeParentIdInt>(tag, reader);
}

auto deserialize_schema_tree_node_key_name(ReaderInterface& reader, std::string& key_name)
Expand Down Expand Up @@ -297,32 +292,35 @@ auto deserialize_string(ReaderInterface& reader, encoded_tag_t tag, std::string&
return IRErrorCode::IRErrorCode_Success;
}

auto deserialize_schema(ReaderInterface& reader, encoded_tag_t& tag, Schema& schema)
-> IRErrorCode {
schema.clear();
auto deserialize_schema(ReaderInterface& reader, encoded_tag_t& tag)
-> OUTCOME_V2_NAMESPACE::std_result<Schema> {
Schema schema;
while (true) {
if (cProtocol::Payload::KeyIdUByte == tag) {
uint8_t id{};
if (false == deserialize_int(reader, id)) {
return IRErrorCode::IRErrorCode_Incomplete_IR;
}
schema.push_back(static_cast<SchemaTree::Node::id_t>(id));
} else if (cProtocol::Payload::KeyIdUShort == tag) {
uint16_t id{};
if (false == deserialize_int(reader, id)) {
return IRErrorCode::IRErrorCode_Incomplete_IR;
}
schema.push_back(static_cast<SchemaTree::Node::id_t>(id));
} else {
if (false == is_encoded_key_id_tag(tag)) {
// The log event must be an empty value.
break;
}

auto const schema_tree_node_id_result{deserialize_and_decode_schema_tree_node_id<
cProtocol::Payload::EncodedSchemaTreeNodeIdByte,
cProtocol::Payload::EncodedSchemaTreeNodeIdShort,
cProtocol::Payload::EncodedSchemaTreeNodeIdInt>(tag, reader)};
if (schema_tree_node_id_result.has_error()) {
return schema_tree_node_id_result.error();
}
auto const [is_auto_generated, node_id]{schema_tree_node_id_result.value()};
if (is_auto_generated) {
// Currently, we don't support auto-generated keys.
return std::errc::protocol_not_supported;
}
schema.push_back(node_id);

if (auto const err{deserialize_tag(reader, tag)}; IRErrorCode::IRErrorCode_Success != err) {
return err;
return ir_error_code_to_errc(err);
}
}

return IRErrorCode::IRErrorCode_Success;
return schema;
}

auto deserialize_value_and_insert_to_node_id_value_pairs(
Expand Down Expand Up @@ -469,12 +467,24 @@ auto is_log_event_ir_unit_tag(encoded_tag_t tag) -> bool {
// The log event is an empty object
return true;
}
if (cProtocol::Payload::KeyIdUByte == tag || cProtocol::Payload::KeyIdUShort == tag) {
if (is_encoded_key_id_tag(tag)) {
// If not empty, the log event must start with a tag byte indicating the key ID
return true;
}
return false;
}

auto is_encoded_key_id_tag(encoded_tag_t tag) -> bool {
// Ideally, we could check whether the tag is within the range of
// [EncodedKeyIdByte, EncodedKeyIdInt], but we don't for two reasons:
// - We optimize for streams that have few key IDs, meaning we can short circuit in the first
// branch below.
// - Using a range check assumes all length indicators are defined continuously, in order, but
// we don't have static checks for this assumption.
return cProtocol::Payload::EncodedSchemaTreeNodeIdByte == tag
|| cProtocol::Payload::EncodedSchemaTreeNodeIdShort == tag
|| cProtocol::Payload::EncodedSchemaTreeNodeIdInt == tag;
}
} // namespace

auto get_ir_unit_type_from_tag(encoded_tag_t tag) -> std::optional<IrUnitType> {
Expand Down Expand Up @@ -508,11 +518,14 @@ auto deserialize_ir_unit_schema_tree_node_insertion(
return ir_error_code_to_errc(IRErrorCode::IRErrorCode_Corrupted_IR);
}

SchemaTree::Node::id_t parent_id{};
if (auto const err{deserialize_schema_tree_node_parent_id(reader, parent_id)};
IRErrorCode_Success != err)
{
return ir_error_code_to_errc(err);
auto const parent_node_id_result{deserialize_schema_tree_node_parent_id(reader)};
if (parent_node_id_result.has_error()) {
return parent_node_id_result.error();
}
auto const [is_auto_generated, parent_id]{parent_node_id_result.value()};
if (is_auto_generated) {
// Currently, we don't support auto-generated keys.
return std::errc::protocol_not_supported;
}

if (auto const err{deserialize_schema_tree_node_key_name(reader, key_name)};
Expand Down Expand Up @@ -541,12 +554,11 @@ auto deserialize_ir_unit_kv_pair_log_event(
std::shared_ptr<SchemaTree> schema_tree,
UtcOffset utc_offset
) -> OUTCOME_V2_NAMESPACE::std_result<KeyValuePairLogEvent> {
Schema schema;
if (auto const err{deserialize_schema(reader, tag, schema)};
IRErrorCode::IRErrorCode_Success != err)
{
return ir_error_code_to_errc(err);
auto const schema_result{deserialize_schema(reader, tag)};
if (schema_result.has_error()) {
return schema_result.error();
}
auto const& schema{schema_result.value()};

KeyValuePairLogEvent::NodeIdValuePairs node_id_value_pairs;
if (false == schema.empty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ namespace clp::ffi::ir_stream {
* indicating the failure:
* - std::errc::result_out_of_range if the IR stream is truncated.
* - std::errc::protocol_error if the deserialized node type isn't supported.
* - std::errc::protocol_not_supported if the IR stream contains auto-generated keys (TODO: Remove
* this once auto-generated keys are fully supported).
* - Forwards `deserialize_schema_tree_node_key_name`'s return values.
* - Forwards `deserialize_schema_tree_node_parent_id`'s return values.
*/
Expand Down Expand Up @@ -63,6 +65,7 @@ namespace clp::ffi::ir_stream {
* - std::errc::protocol_error if the IR stream is corrupted.
* - std::errc::protocol_not_supported if the IR stream contains an unsupported metadata format
* or uses an unsupported version.
* - Forwards `deserialize_schema`'s return values.
* - Forwards `KeyValuePairLogEvent::create`'s return values if the intermediate deserialized result
* cannot construct a valid key-value pair log event.
*/
Expand Down
12 changes: 7 additions & 5 deletions components/core/src/clp/ffi/ir_stream/protocol_constants.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ constexpr int8_t LengthUShort = 0x12;

constexpr char VersionKey[] = "VERSION";
constexpr char VersionValue[] = "0.0.2";
constexpr char BetaVersionValue[] = "0.1.0-beta";
constexpr char BetaVersionValue[] = "0.1.0-beta.1";

// The following regex can be used to validate a Semantic Versioning string. The source of the
// regex can be found here: https://semver.org/
Expand Down Expand Up @@ -67,11 +67,13 @@ constexpr int8_t ValueEightByteEncodingClpStr = 0x5A;
constexpr int8_t ValueEmpty = 0x5E;
constexpr int8_t ValueNull = 0x5F;

constexpr int8_t SchemaTreeNodeParentIdUByte = 0x60;
constexpr int8_t SchemaTreeNodeParentIdUShort = 0x61;
constexpr int8_t EncodedSchemaTreeNodeParentIdByte = 0x60;
constexpr int8_t EncodedSchemaTreeNodeParentIdShort = 0x61;
constexpr int8_t EncodedSchemaTreeNodeParentIdInt = 0x62;

constexpr int8_t KeyIdUByte = 0x65;
constexpr int8_t KeyIdUShort = 0x66;
constexpr int8_t EncodedSchemaTreeNodeIdByte = 0x65;
constexpr int8_t EncodedSchemaTreeNodeIdShort = 0x66;
constexpr int8_t EncodedSchemaTreeNodeIdInt = 0x67;

constexpr int8_t SchemaTreeNodeMask = 0x70;

Expand Down
2 changes: 2 additions & 0 deletions components/core/src/clp/ffi/ir_stream/utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ auto serialize_string(std::string_view str, std::vector<int8_t>& output_buf) ->

auto ir_error_code_to_errc(IRErrorCode ir_error_code) -> std::errc {
switch (ir_error_code) {
case IRErrorCode_Success:
return {};
case IRErrorCode_Incomplete_IR:
return std::errc::result_out_of_range;
case IRErrorCode_Corrupted_IR:
Expand Down
Loading

0 comments on commit 3e8335b

Please sign in to comment.