From 1c679083a54f9ea54b7107768cb1daebd3b655f1 Mon Sep 17 00:00:00 2001 From: Lin Zhihao <59785146+LinZhihao-723@users.noreply.github.com> Date: Mon, 7 Oct 2024 19:03:39 -0400 Subject: [PATCH] ffi: Redesign `Deserializer` API to deserialize key-value pair IR streams one IR unit at a time (fixes #539). (#549) --- components/core/CMakeLists.txt | 2 +- .../src/clp/ffi/ir_stream/Deserializer.cpp | 135 ----------- .../src/clp/ffi/ir_stream/Deserializer.hpp | 222 ++++++++++++++++-- .../core/src/clp/ffi/ir_stream/IrUnitType.hpp | 18 ++ .../ir_unit_deserialization_methods.cpp | 39 +++ .../ir_unit_deserialization_methods.hpp | 9 + .../core/tests/test-ir_encoding_methods.cpp | 87 +++++-- 7 files changed, 346 insertions(+), 166 deletions(-) delete mode 100644 components/core/src/clp/ffi/ir_stream/Deserializer.cpp create mode 100644 components/core/src/clp/ffi/ir_stream/IrUnitType.hpp diff --git a/components/core/CMakeLists.txt b/components/core/CMakeLists.txt index 178f3e529..09577fa31 100644 --- a/components/core/CMakeLists.txt +++ b/components/core/CMakeLists.txt @@ -326,7 +326,6 @@ set(SOURCE_FILES_unitTest src/clp/ffi/encoding_methods.hpp src/clp/ffi/encoding_methods.inc src/clp/ffi/ir_stream/byteswap.hpp - src/clp/ffi/ir_stream/Deserializer.cpp src/clp/ffi/ir_stream/Deserializer.hpp src/clp/ffi/ir_stream/decoding_methods.cpp src/clp/ffi/ir_stream/decoding_methods.hpp @@ -334,6 +333,7 @@ set(SOURCE_FILES_unitTest src/clp/ffi/ir_stream/encoding_methods.cpp src/clp/ffi/ir_stream/encoding_methods.hpp src/clp/ffi/ir_stream/IrUnitHandlerInterface.hpp + src/clp/ffi/ir_stream/IrUnitType.hpp src/clp/ffi/ir_stream/ir_unit_deserialization_methods.cpp src/clp/ffi/ir_stream/ir_unit_deserialization_methods.hpp src/clp/ffi/ir_stream/protocol_constants.hpp diff --git a/components/core/src/clp/ffi/ir_stream/Deserializer.cpp b/components/core/src/clp/ffi/ir_stream/Deserializer.cpp deleted file mode 100644 index 5402f9b98..000000000 --- a/components/core/src/clp/ffi/ir_stream/Deserializer.cpp +++ /dev/null @@ -1,135 +0,0 @@ -#include "Deserializer.hpp" - -#include -#include -#include -#include -#include -#include -#include - -#include -#include - -#include "../../ReaderInterface.hpp" -#include "../../time_types.hpp" -#include "../../TransactionManager.hpp" -#include "../KeyValuePairLogEvent.hpp" -#include "decoding_methods.hpp" -#include "ir_unit_deserialization_methods.hpp" -#include "protocol_constants.hpp" -#include "utils.hpp" - -namespace clp::ffi::ir_stream { -namespace { -/** - * @param tag - * @return Whether the tag represents a schema tree node. - */ -[[nodiscard]] auto is_schema_tree_node_tag(encoded_tag_t tag) -> bool; - -auto is_schema_tree_node_tag(encoded_tag_t tag) -> bool { - return (tag & cProtocol::Payload::SchemaTreeNodeMask) == cProtocol::Payload::SchemaTreeNodeMask; -} -} // namespace - -auto Deserializer::create(ReaderInterface& reader -) -> OUTCOME_V2_NAMESPACE::std_result { - bool is_four_byte_encoded{}; - if (auto const err{get_encoding_type(reader, is_four_byte_encoded)}; - IRErrorCode::IRErrorCode_Success != err) - { - return ir_error_code_to_errc(err); - } - - std::vector metadata; - encoded_tag_t metadata_type{}; - if (auto const err{deserialize_preamble(reader, metadata_type, metadata)}; - IRErrorCode::IRErrorCode_Success != err) - { - return ir_error_code_to_errc(err); - } - - if (cProtocol::Metadata::EncodingJson != metadata_type) { - return std::errc::protocol_not_supported; - } - - auto metadata_json = nlohmann::json::parse(metadata, nullptr, false); - if (metadata_json.is_discarded()) { - return std::errc::protocol_error; - } - auto const version_iter{metadata_json.find(cProtocol::Metadata::VersionKey)}; - if (metadata_json.end() == version_iter || false == version_iter->is_string()) { - return std::errc::protocol_error; - } - auto const version = version_iter->get_ref(); - // TODO: Just before the KV-pair IR format is formally released, we should replace this - // hard-coded version check with `ffi::ir_stream::validate_protocol_version`. - if (std::string_view{static_cast(cProtocol::Metadata::BetaVersionValue)} - != version) - { - return std::errc::protocol_not_supported; - } - - return Deserializer{}; -} - -auto Deserializer::deserialize_to_next_log_event(clp::ReaderInterface& reader -) -> OUTCOME_V2_NAMESPACE::std_result { - auto const utc_offset_snapshot{m_utc_offset}; - m_schema_tree->take_snapshot(); - TransactionManager revert_manager{ - []() noexcept -> void {}, - [&]() noexcept -> void { - m_utc_offset = utc_offset_snapshot; - m_schema_tree->revert(); - } - }; - - encoded_tag_t tag{}; - std::string schema_tree_node_key_name; - while (true) { - if (auto const err{deserialize_tag(reader, tag)}; IRErrorCode::IRErrorCode_Success != err) { - return ir_error_code_to_errc(err); - } - - if (cProtocol::Eof == tag) { - return std::errc::no_message_available; - } - - if (is_schema_tree_node_tag(tag)) { - auto const result{deserialize_ir_unit_schema_tree_node_insertion( - reader, - tag, - schema_tree_node_key_name - )}; - if (result.has_error()) { - return result.error(); - } - auto const& locator{result.value()}; - if (m_schema_tree->has_node(locator)) { - return std::errc::protocol_error; - } - std::ignore = m_schema_tree->insert_node(locator); - continue; - } - - if (cProtocol::Payload::UtcOffsetChange == tag) { - auto const result{deserialize_ir_unit_utc_offset_change(reader)}; - if (result.has_error()) { - return result.error(); - } - m_utc_offset = result.value(); - continue; - } - - break; - } - - auto result{deserialize_ir_unit_kv_pair_log_event(reader, tag, m_schema_tree, m_utc_offset)}; - if (false == result.has_error()) { - revert_manager.mark_success(); - } - return std::move(result); -} -} // namespace clp::ffi::ir_stream diff --git a/components/core/src/clp/ffi/ir_stream/Deserializer.hpp b/components/core/src/clp/ffi/ir_stream/Deserializer.hpp index 0fb7e8c83..c1fc13c85 100644 --- a/components/core/src/clp/ffi/ir_stream/Deserializer.hpp +++ b/components/core/src/clp/ffi/ir_stream/Deserializer.hpp @@ -1,37 +1,55 @@ #ifndef CLP_FFI_IR_STREAM_DESERIALIZER_HPP #define CLP_FFI_IR_STREAM_DESERIALIZER_HPP +#include +#include #include +#include +#include +#include +#include +#include #include #include "../../ReaderInterface.hpp" #include "../../time_types.hpp" #include "../KeyValuePairLogEvent.hpp" #include "../SchemaTree.hpp" +#include "decoding_methods.hpp" +#include "ir_unit_deserialization_methods.hpp" +#include "IrUnitHandlerInterface.hpp" +#include "IrUnitType.hpp" +#include "protocol_constants.hpp" +#include "utils.hpp" namespace clp::ffi::ir_stream { /** - * A deserializer for log events from a CLP kv-pair IR stream. The class ensures any internal state - * remains consistent even when a deserialization failure occurs (i.e., it's transactional). + * A deserializer for reading IR units from a CLP kv-pair IR stream. An IR unit handler should be + * provided to perform user-defined operations on each deserialized IR unit. * * NOTE: This class is designed only to provide deserialization functionalities. Callers are * responsible for maintaining a `ReaderInterface` to input IR bytes from an I/O stream. + * + * @tparam IrUnitHandler */ +template +requires(std::move_constructible) class Deserializer { public: // Factory function /** * Creates a deserializer by reading the stream's preamble from the given reader. * @param reader + * @param ir_unit_handler * @return A result containing the deserializer or an error code indicating the failure: * - std::errc::result_out_of_range if the IR stream is truncated * - std::errc::protocol_error if the IR stream is corrupted * - std::errc::protocol_not_supported if the IR stream contains an unsupported metadata format * or uses an unsupported version */ - [[nodiscard]] static auto create(ReaderInterface& reader - ) -> OUTCOME_V2_NAMESPACE::std_result; + [[nodiscard]] static auto create(ReaderInterface& reader, IrUnitHandler ir_unit_handler) + -> OUTCOME_V2_NAMESPACE::std_result; // Delete copy constructor and assignment Deserializer(Deserializer const&) = delete; @@ -48,27 +66,199 @@ class Deserializer { /** * Deserializes the stream from the given reader up to and including the next log event IR unit. * @param reader - * @return A result containing the deserialized log event or an error code indicating the - * failure: - * - std::errc::no_message_available if the IR stream has been fully consumed. - * - std::errc::result_out_of_range if the IR stream is truncated. - * - std::errc::protocol_error if the IR stream is corrupted. - * - std::errc::protocol_not_supported if the IR stream contains an unsupported metadata format - * or uses an unsupported version. - * - Forwards `KeyValuePairLogEvent::create`'s return values if the intermediate deserialized - * result cannot construct a valid key-value pair log event. + * @return std::errc::no_message_available if no tag bytes can be read to determine the next IR + * unit type. + * @return std::errc::protocol_not_supported if the IR unit type is not supported. + * @return std::errc::operation_not_permitted if the deserializer already reached the end of + * stream by deserializing an end-of-stream IR unit in the previous calls. + * @return IRUnitType::LogEvent if a log event IR unit is deserialized, or an error code + * indicating the failure: + * - Forwards `deserialize_ir_unit_kv_pair_log_event`'s return values if it failed to + * deserialize and construct the log event. + * - Forwards `handle_log_event`'s return values from the user-defined IR unit handler on + * unit handling failure. + * @return IRUnitType::SchemaTreeNodeInsertion if a schema tree node insertion IR unit is + * deserialized, or an error code indicating the failure: + * - Forwards `deserialize_ir_unit_schema_tree_node_insertion`'s return values if it failed to + * deserialize and construct the schema tree node locator. + * - Forwards `handle_schema_tree_node_insertion`'s return values from the user-defined IR unit + * handler on unit handling failure. + * - std::errc::protocol_error if the deserialized schema tree node already exists in the schema + * tree. + * @return IRUnitType::UtcOffsetChange if a UTC offset change IR unit is deserialized, or an + * error code indicating the failure: + * - Forwards `deserialize_ir_unit_utc_offset_change`'s return values if it failed to + * deserialize the UTC offset. + * - Forwards `handle_utc_offset_change`'s return values from the user-defined IR unit handler + * on unit handling failure. + * @return IRUnitType::EndOfStream if an end-of-stream IR unit is deserialized, or an error code + * indicating the failure: + * - Forwards `handle_end_of_stream`'s return values from the user-defined IR unit handler on + * unit handling failure. */ - [[nodiscard]] auto deserialize_to_next_log_event(ReaderInterface& reader - ) -> OUTCOME_V2_NAMESPACE::std_result; + [[nodiscard]] auto deserialize_next_ir_unit(ReaderInterface& reader + ) -> OUTCOME_V2_NAMESPACE::std_result; + + /** + * @return Whether the stream has completed. A stream is considered completed if an + * end-of-stream IR unit has already been deserialized. + */ + [[nodiscard]] auto is_stream_completed() const -> bool { return m_is_complete; } + + [[nodiscard]] auto get_ir_unit_handler() const -> IrUnitHandler const& { + return m_ir_unit_handler; + } + + [[nodiscard]] auto get_ir_unit_handler() -> IrUnitHandler& { return m_ir_unit_handler; } private: // Constructor - Deserializer() = default; + Deserializer(IrUnitHandler ir_unit_handler) : m_ir_unit_handler{std::move(ir_unit_handler)} {} // Variables std::shared_ptr m_schema_tree{std::make_shared()}; UtcOffset m_utc_offset{0}; + IrUnitHandler m_ir_unit_handler; + bool m_is_complete{false}; }; + +template +requires(std::move_constructible) +auto Deserializer::create(ReaderInterface& reader, IrUnitHandler ir_unit_handler) + -> OUTCOME_V2_NAMESPACE::std_result { + bool is_four_byte_encoded{}; + if (auto const err{get_encoding_type(reader, is_four_byte_encoded)}; + IRErrorCode::IRErrorCode_Success != err) + { + return ir_error_code_to_errc(err); + } + + std::vector metadata; + encoded_tag_t metadata_type{}; + if (auto const err{deserialize_preamble(reader, metadata_type, metadata)}; + IRErrorCode::IRErrorCode_Success != err) + { + return ir_error_code_to_errc(err); + } + + if (cProtocol::Metadata::EncodingJson != metadata_type) { + return std::errc::protocol_not_supported; + } + + auto metadata_json = nlohmann::json::parse(metadata, nullptr, false); + if (metadata_json.is_discarded()) { + return std::errc::protocol_error; + } + auto const version_iter{metadata_json.find(cProtocol::Metadata::VersionKey)}; + if (metadata_json.end() == version_iter || false == version_iter->is_string()) { + return std::errc::protocol_error; + } + auto const version = version_iter->get_ref(); + // TODO: Just before the KV-pair IR format is formally released, we should replace this + // hard-coded version check with `ffi::ir_stream::validate_protocol_version`. + if (std::string_view{static_cast(cProtocol::Metadata::BetaVersionValue)} + != version) + { + return std::errc::protocol_not_supported; + } + + return Deserializer{std::move(ir_unit_handler)}; +} + +template +requires(std::move_constructible) +auto Deserializer::deserialize_next_ir_unit(ReaderInterface& reader +) -> OUTCOME_V2_NAMESPACE::std_result { + if (is_stream_completed()) { + return std::errc::operation_not_permitted; + } + + encoded_tag_t tag{}; + if (IRErrorCode::IRErrorCode_Success != deserialize_tag(reader, tag)) { + return std::errc::no_message_available; + } + + auto const optional_ir_unit_type{get_ir_unit_type_from_tag(tag)}; + if (false == optional_ir_unit_type.has_value()) { + return std::errc::protocol_not_supported; + } + + auto const ir_unit_type{optional_ir_unit_type.value()}; + switch (ir_unit_type) { + case IrUnitType::LogEvent: { + auto result{ + deserialize_ir_unit_kv_pair_log_event(reader, tag, m_schema_tree, m_utc_offset) + }; + if (result.has_error()) { + return result.error(); + } + + if (auto const err{m_ir_unit_handler.handle_log_event(std::move(result.value()))}; + IRErrorCode::IRErrorCode_Success != err) + { + return ir_error_code_to_errc(err); + } + break; + } + + case IrUnitType::SchemaTreeNodeInsertion: { + std::string key_name; + auto const result{deserialize_ir_unit_schema_tree_node_insertion(reader, tag, key_name) + }; + if (result.has_error()) { + return result.error(); + } + + auto const node_locator{result.value()}; + if (m_schema_tree->has_node(node_locator)) { + return std::errc::protocol_error; + } + + if (auto const err{m_ir_unit_handler.handle_schema_tree_node_insertion(node_locator)}; + IRErrorCode::IRErrorCode_Success != err) + { + return ir_error_code_to_errc(err); + } + + std::ignore = m_schema_tree->insert_node(node_locator); + break; + } + + case IrUnitType::UtcOffsetChange: { + auto const result{deserialize_ir_unit_utc_offset_change(reader)}; + if (result.has_error()) { + return result.error(); + } + + auto const new_utc_offset{result.value()}; + if (auto const err{ + m_ir_unit_handler.handle_utc_offset_change(m_utc_offset, new_utc_offset) + }; + IRErrorCode::IRErrorCode_Success != err) + { + return ir_error_code_to_errc(err); + } + + m_utc_offset = new_utc_offset; + break; + } + + case IrUnitType::EndOfStream: { + if (auto const err{m_ir_unit_handler.handle_end_of_stream()}; + IRErrorCode::IRErrorCode_Success != err) + { + return ir_error_code_to_errc(err); + } + m_is_complete = true; + break; + } + + default: + return std::errc::protocol_not_supported; + } + + return ir_unit_type; +} } // namespace clp::ffi::ir_stream #endif // CLP_FFI_IR_STREAM_DESERIALIZER_HPP diff --git a/components/core/src/clp/ffi/ir_stream/IrUnitType.hpp b/components/core/src/clp/ffi/ir_stream/IrUnitType.hpp new file mode 100644 index 000000000..1ac38750f --- /dev/null +++ b/components/core/src/clp/ffi/ir_stream/IrUnitType.hpp @@ -0,0 +1,18 @@ +#ifndef CLP_FFI_IR_STREAM_IRUNITTYPE_HPP +#define CLP_FFI_IR_STREAM_IRUNITTYPE_HPP + +#include + +namespace clp::ffi::ir_stream { +/** + * Enum defining the possible IR unit types in CLP kv-pair IR stream. + */ +enum class IrUnitType : uint8_t { + LogEvent = 0, + SchemaTreeNodeInsertion, + UtcOffsetChange, + EndOfStream, +}; +} // namespace clp::ffi::ir_stream + +#endif // CLP_FFI_IR_STREAM_IRUNITTYPE_HPP diff --git a/components/core/src/clp/ffi/ir_stream/ir_unit_deserialization_methods.cpp b/components/core/src/clp/ffi/ir_stream/ir_unit_deserialization_methods.cpp index cfbe235d5..3e9986d46 100644 --- a/components/core/src/clp/ffi/ir_stream/ir_unit_deserialization_methods.cpp +++ b/components/core/src/clp/ffi/ir_stream/ir_unit_deserialization_methods.cpp @@ -22,6 +22,7 @@ #include "../SchemaTreeNode.hpp" #include "../Value.hpp" #include "decoding_methods.hpp" +#include "IrUnitType.hpp" #include "protocol_constants.hpp" #include "utils.hpp" @@ -169,6 +170,11 @@ requires(std::is_same_v KeyValuePairLogEvent::NodeIdValuePairs& node_id_value_pairs ) -> IRErrorCode; +/** + * @return Whether the given tag can be a valid leading tag of a log event IR unit. + */ +[[nodiscard]] auto is_log_event_ir_unit_tag(encoded_tag_t tag) -> bool; + auto schema_tree_node_tag_to_type(encoded_tag_t tag) -> std::optional { switch (tag) { case cProtocol::Payload::SchemaTreeNodeInt: @@ -458,8 +464,41 @@ auto deserialize_value_and_construct_node_id_value_pairs( } return IRErrorCode::IRErrorCode_Success; } + +auto is_log_event_ir_unit_tag(encoded_tag_t tag) -> bool { + if (cProtocol::Payload::ValueEmpty == tag) { + // The log event is an empty object + return true; + } + if (cProtocol::Payload::KeyIdUByte == tag || cProtocol::Payload::KeyIdUShort == tag) { + // If not empty, the log event must start with a tag byte indicating the key ID + return true; + } + return false; +} } // namespace +auto get_ir_unit_type_from_tag(encoded_tag_t tag) -> std::optional { + // First, we check the tags that have one-to-one IR unit mapping + if (cProtocol::Eof == tag) { + return IrUnitType::EndOfStream; + } + if (cProtocol::Payload::UtcOffsetChange == tag) { + return IrUnitType::UtcOffsetChange; + } + + // Then, check tags that may match any byte within a continuous range + if ((tag & cProtocol::Payload::SchemaTreeNodeMask) == cProtocol::Payload::SchemaTreeNodeMask) { + return IrUnitType::SchemaTreeNodeInsertion; + } + + if (is_log_event_ir_unit_tag(tag)) { + return IrUnitType::LogEvent; + } + + return std::nullopt; +} + auto deserialize_ir_unit_schema_tree_node_insertion( ReaderInterface& reader, encoded_tag_t tag, diff --git a/components/core/src/clp/ffi/ir_stream/ir_unit_deserialization_methods.hpp b/components/core/src/clp/ffi/ir_stream/ir_unit_deserialization_methods.hpp index 0ba21866c..454676cf5 100644 --- a/components/core/src/clp/ffi/ir_stream/ir_unit_deserialization_methods.hpp +++ b/components/core/src/clp/ffi/ir_stream/ir_unit_deserialization_methods.hpp @@ -2,6 +2,7 @@ #define CLP_FFI_IR_STREAM_IR_UNIT_DESERIALIZATION_METHODS_HPP #include +#include #include #include @@ -11,8 +12,16 @@ #include "../KeyValuePairLogEvent.hpp" #include "../SchemaTree.hpp" #include "decoding_methods.hpp" +#include "IrUnitType.hpp" namespace clp::ffi::ir_stream { +/** + * @param tag + * @return The IR unit type of indicated by the given tag on success. + * @return std::nullopt if the tag doesn't represent a valid IR unit. + */ +[[nodiscard]] auto get_ir_unit_type_from_tag(encoded_tag_t tag) -> std::optional; + /** * Deserializes a schema tree node insertion IR unit. * @param reader diff --git a/components/core/tests/test-ir_encoding_methods.cpp b/components/core/tests/test-ir_encoding_methods.cpp index ba8c59280..5c27a6fab 100644 --- a/components/core/tests/test-ir_encoding_methods.cpp +++ b/components/core/tests/test-ir_encoding_methods.cpp @@ -16,8 +16,11 @@ #include "../src/clp/ffi/ir_stream/decoding_methods.hpp" #include "../src/clp/ffi/ir_stream/Deserializer.hpp" #include "../src/clp/ffi/ir_stream/encoding_methods.hpp" +#include "../src/clp/ffi/ir_stream/IrUnitType.hpp" #include "../src/clp/ffi/ir_stream/protocol_constants.hpp" #include "../src/clp/ffi/ir_stream/Serializer.hpp" +#include "../src/clp/ffi/KeyValuePairLogEvent.hpp" +#include "../src/clp/ffi/SchemaTree.hpp" #include "../src/clp/ir/LogEventDeserializer.hpp" #include "../src/clp/ir/types.hpp" #include "../src/clp/time_types.hpp" @@ -42,6 +45,7 @@ using clp::ffi::ir_stream::IRErrorCode; using clp::ffi::ir_stream::serialize_utc_offset_change; using clp::ffi::ir_stream::Serializer; using clp::ffi::ir_stream::validate_protocol_version; +using clp::ffi::KeyValuePairLogEvent; using clp::ffi::wildcard_query_matches_any_encoded_var; using clp::ir::eight_byte_encoded_variable_t; using clp::ir::epoch_time_ms_t; @@ -82,6 +86,47 @@ class UnstructuredLogEvent { UtcOffset m_utc_offset{0}; }; +/** + * Class that implements `clp::ffi::ir_stream::IrUnitHandlerInterface` for testing purposes. + */ +class IrUnitHandler { +public: + // Implements `clp::ffi::ir_stream::IrUnitHandlerInterface` interface + [[nodiscard]] auto handle_log_event(KeyValuePairLogEvent&& log_event) -> IRErrorCode { + m_deserialized_log_events.emplace_back(std::move(log_event)); + return IRErrorCode::IRErrorCode_Success; + } + + [[nodiscard]] static auto handle_utc_offset_change( + [[maybe_unused]] UtcOffset utc_offset_old, + [[maybe_unused]] UtcOffset utc_offset_new + ) -> IRErrorCode { + return IRErrorCode::IRErrorCode_Success; + } + + [[nodiscard]] static auto handle_schema_tree_node_insertion( + [[maybe_unused]] clp::ffi::SchemaTree::NodeLocator schema_tree_node_locator + ) -> IRErrorCode { + return IRErrorCode::IRErrorCode_Success; + } + + [[nodiscard]] auto handle_end_of_stream() -> IRErrorCode { + m_is_complete = true; + return IRErrorCode::IRErrorCode_Success; + } + + // Methods + [[nodiscard]] auto is_complete() const -> bool { return m_is_complete; } + + [[nodiscard]] auto get_deserialized_log_events() const -> vector const& { + return m_deserialized_log_events; + } + +private: + vector m_deserialized_log_events; + bool m_is_complete{false}; +}; + /** * Serializes the given log events into an IR buffer. * @tparam encoded_variable_t Type of the encoded variables. @@ -1127,25 +1172,39 @@ TEMPLATE_TEST_CASE( // Deserialize the results BufferReader reader{size_checked_pointer_cast(ir_buf.data()), ir_buf.size()}; - auto deserializer_result = Deserializer::create(reader); + auto deserializer_result{Deserializer::create(reader, IrUnitHandler{})}; REQUIRE_FALSE(deserializer_result.has_error()); auto& deserializer = deserializer_result.value(); - - for (auto const& json_obj : serialized_json_objects) { - auto const kv_log_event_result = deserializer.deserialize_to_next_log_event(reader); - REQUIRE_FALSE(kv_log_event_result.has_error()); - - auto const& kv_log_event = kv_log_event_result.value(); - auto const num_leaves_in_json_obj = count_num_leaves(json_obj); - auto const num_kv_pairs = kv_log_event.get_node_id_value_pairs().size(); + while (true) { + auto const result{deserializer.deserialize_next_ir_unit(reader)}; + REQUIRE_FALSE(result.has_error()); + if (result.value() == clp::ffi::ir_stream::IrUnitType::EndOfStream) { + break; + } + } + auto const& ir_unit_handler{deserializer.get_ir_unit_handler()}; + + // Check the stream is complete + REQUIRE(ir_unit_handler.is_complete()); + REQUIRE(deserializer.is_stream_completed()); + // Check the number of log events deserialized matches the number of log events serialized + auto const& deserialized_log_events{ir_unit_handler.get_deserialized_log_events()}; + REQUIRE((serialized_json_objects.size() == deserialized_log_events.size())); + + auto const num_log_events{serialized_json_objects.size()}; + for (size_t idx{0}; idx < num_log_events; ++idx) { + auto const& expect{serialized_json_objects.at(idx)}; + auto const& deserialized_log_event{deserialized_log_events.at(idx)}; + + auto const num_leaves_in_json_obj{count_num_leaves(expect)}; + auto const num_kv_pairs{deserialized_log_event.get_node_id_value_pairs().size()}; REQUIRE((num_leaves_in_json_obj == num_kv_pairs)); - auto const serialized_json_result = kv_log_event.serialize_to_json(); + auto const serialized_json_result{deserialized_log_event.serialize_to_json()}; REQUIRE_FALSE(serialized_json_result.has_error()); - REQUIRE((json_obj == serialized_json_result.value())); + REQUIRE((expect == serialized_json_result.value())); } - auto const eof_result{deserializer.deserialize_to_next_log_event(reader)}; - REQUIRE(eof_result.has_error()); - REQUIRE((std::errc::no_message_available == eof_result.error())); + auto const eof_result{deserializer.deserialize_next_ir_unit(reader)}; + REQUIRE((eof_result.has_error() && std::errc::operation_not_permitted == eof_result.error())); }