Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ffi: Add support for specifying UTC offset changes in an IR stream. #386

Merged
merged 19 commits into from
Jun 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions components/core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,7 @@ set(SOURCE_FILES_unitTest
src/clp/TimestampPattern.cpp
src/clp/TimestampPattern.hpp
src/clp/TraceableException.hpp
src/clp/time_types.hpp
src/clp/type_utils.hpp
src/clp/Utils.cpp
src/clp/Utils.hpp
Expand Down
1 change: 1 addition & 0 deletions components/core/src/clp/clg/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ set(
../streaming_compression/zstd/Decompressor.hpp
../StringReader.cpp
../StringReader.hpp
../time_types.hpp
../TimestampPattern.cpp
../TimestampPattern.hpp
../TraceableException.hpp
Expand Down
1 change: 1 addition & 0 deletions components/core/src/clp/clo/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ set(
../StringReader.hpp
../Thread.cpp
../Thread.hpp
../time_types.hpp
../TimestampPattern.cpp
../TimestampPattern.hpp
../TraceableException.hpp
Expand Down
1 change: 1 addition & 0 deletions components/core/src/clp/clp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ set(
../streaming_compression/zstd/Decompressor.hpp
../StringReader.cpp
../StringReader.hpp
../time_types.hpp
../TimestampPattern.cpp
../TimestampPattern.hpp
../TraceableException.hpp
Expand Down
63 changes: 47 additions & 16 deletions components/core/src/clp/ffi/ir_stream/decoding_methods.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ deserialize_timestamp(ReaderInterface& reader, encoded_tag_t encoded_tag, epoch_
* Deserializes the next log event from the given reader
* @tparam encoded_variable_t Type of the encoded variable
* @param reader
* @param encoded_tag
* @param message Returns the deserialized message
* @param timestamp Returns the timestamp delta if
* encoded_variable_t == four_byte_encoded_variable_t or the actual timestamp if
Expand All @@ -86,8 +87,12 @@ deserialize_timestamp(ReaderInterface& reader, encoded_tag_t encoded_tag, epoch_
* @return Same as ffi::ir_stream::deserialize_log_event
*/
template <typename encoded_variable_t>
static IRErrorCode
generic_deserialize_log_event(ReaderInterface& reader, string& message, epoch_time_ms_t& timestamp);
static IRErrorCode generic_deserialize_log_event(
ReaderInterface& reader,
encoded_tag_t encoded_tag,
string& message,
epoch_time_ms_t& timestamp
);

/**
* Deserializes metadata from the given reader
Expand Down Expand Up @@ -269,6 +274,7 @@ deserialize_timestamp(ReaderInterface& reader, encoded_tag_t encoded_tag, epoch_
template <typename encoded_variable_t>
static IRErrorCode generic_deserialize_log_event(
ReaderInterface& reader,
encoded_tag_t encoded_tag,
string& message,
epoch_time_ms_t& timestamp
) {
Expand All @@ -278,7 +284,7 @@ static IRErrorCode generic_deserialize_log_event(
vector<string> dict_vars;
string logtype;
if (auto error_code
= deserialize_log_event(reader, logtype, encoded_vars, dict_vars, timestamp);
= deserialize_log_event(reader, encoded_tag, logtype, encoded_vars, dict_vars, timestamp);
IRErrorCode_Success != error_code)
{
return error_code;
Expand Down Expand Up @@ -351,19 +357,12 @@ static IRErrorCode deserialize_metadata(
template <typename encoded_variable_t>
auto deserialize_log_event(
ReaderInterface& reader,
encoded_tag_t encoded_tag,
string& logtype,
vector<encoded_variable_t>& encoded_vars,
vector<string>& dict_vars,
epoch_time_ms_t& timestamp_or_timestamp_delta
) -> IRErrorCode {
encoded_tag_t encoded_tag{cProtocol::Eof};
if (ErrorCode_Success != reader.try_read_numeric_value(encoded_tag)) {
return IRErrorCode_Incomplete_IR;
}
if (cProtocol::Eof == encoded_tag) {
return IRErrorCode_Eof;
}

// Handle variables
string var_str;
bool is_encoded_var{false};
Expand Down Expand Up @@ -433,6 +432,13 @@ IRErrorCode get_encoding_type(ReaderInterface& reader, bool& is_four_bytes_encod
return IRErrorCode_Success;
}

IRErrorCode deserialize_tag(ReaderInterface& reader, encoded_tag_t& tag) {
if (ErrorCode_Success != reader.try_read_numeric_value(tag)) {
return IRErrorCode_Incomplete_IR;
}
return IRErrorCode_Success;
}

IRErrorCode deserialize_preamble(
ReaderInterface& reader,
encoded_tag_t& metadata_type,
Expand Down Expand Up @@ -503,27 +509,51 @@ IRProtocolErrorCode validate_protocol_version(std::string_view protocol_version)
return IRProtocolErrorCode_Supported;
}

IRErrorCode deserialize_utc_offset_change(ReaderInterface& reader, UtcOffset& utc_offset) {
int64_t serialized_utc_offset{};
if (false == deserialize_int(reader, serialized_utc_offset)) {
return IRErrorCode_Incomplete_IR;
}
utc_offset = UtcOffset{serialized_utc_offset};
return IRErrorCode_Success;
}

namespace four_byte_encoding {
IRErrorCode
deserialize_log_event(ReaderInterface& reader, string& message, epoch_time_ms_t& timestamp_delta) {
IRErrorCode deserialize_log_event(
ReaderInterface& reader,
encoded_tag_t encoded_tag,
string& message,
epoch_time_ms_t& timestamp_delta
) {
return generic_deserialize_log_event<four_byte_encoded_variable_t>(
reader,
encoded_tag,
message,
timestamp_delta
);
}
} // namespace four_byte_encoding

namespace eight_byte_encoding {
IRErrorCode
deserialize_log_event(ReaderInterface& reader, string& message, epoch_time_ms_t& timestamp) {
return generic_deserialize_log_event<eight_byte_encoded_variable_t>(reader, message, timestamp);
IRErrorCode deserialize_log_event(
ReaderInterface& reader,
encoded_tag_t encoded_tag,
string& message,
epoch_time_ms_t& timestamp
) {
return generic_deserialize_log_event<eight_byte_encoded_variable_t>(
reader,
encoded_tag,
message,
timestamp
);
}
} // namespace eight_byte_encoding

// Explicitly declare specializations
template auto deserialize_log_event<four_byte_encoded_variable_t>(
ReaderInterface& reader,
encoded_tag_t encoded_tag,
string& logtype,
vector<four_byte_encoded_variable_t>& encoded_vars,
vector<string>& dict_vars,
Expand All @@ -532,6 +562,7 @@ template auto deserialize_log_event<four_byte_encoded_variable_t>(

template auto deserialize_log_event<eight_byte_encoded_variable_t>(
ReaderInterface& reader,
encoded_tag_t encoded_tag,
string& logtype,
vector<eight_byte_encoded_variable_t>& encoded_vars,
vector<string>& dict_vars,
Expand Down
32 changes: 27 additions & 5 deletions components/core/src/clp/ffi/ir_stream/decoding_methods.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

#include "../../ir/types.hpp"
#include "../../ReaderInterface.hpp"
#include "../../time_types.hpp"
#include "../encoding_methods.hpp"

namespace clp::ffi::ir_stream {
Expand Down Expand Up @@ -55,10 +56,20 @@ class DecodingException : public TraceableException {
*/
IRErrorCode get_encoding_type(ReaderInterface& reader, bool& is_four_bytes_encoding);

/**
* Deserializes the tag for the next packet.
* @param reader
* @param tag Returns the tag of the next packet.
* @return IRErrorCode_Success on success
* @return IRErrorCode_Incomplete_IR if reader doesn't contain enough data to deserialize
*/
[[nodiscard]] IRErrorCode deserialize_tag(ReaderInterface& reader, encoded_tag_t& tag);

/**
* Deserializes a log event from the given stream
* @tparam encoded_variable_t
* @param reader
* @param encoded_tag Tag of the next packet to read
* @param logtype Returns the logtype
* @param encoded_vars Returns the encoded variables
* @param dict_vars Returns the dictionary variables
Expand All @@ -67,11 +78,11 @@ IRErrorCode get_encoding_type(ReaderInterface& reader, bool& is_four_bytes_encod
* @return IRErrorCode_Success on success
* @return IRErrorCode_Corrupted_IR if reader contains invalid IR
* @return IRErrorCode_Incomplete_IR if reader doesn't contain enough data
* @return IRErrorCode_Eof on reaching the end of the stream
*/
template <typename encoded_variable_t>
auto deserialize_log_event(
ReaderInterface& reader,
encoded_tag_t encoded_tag,
std::string& logtype,
std::vector<encoded_variable_t>& encoded_vars,
std::vector<std::string>& dict_vars,
Expand Down Expand Up @@ -149,6 +160,15 @@ IRErrorCode deserialize_preamble(
std::vector<int8_t>& metadata
);

/**
* Deserializes a UTC offset change packet.
* @param reader
* @param utc_offset The deserialized UTC offset.
* @return IRErrorCode_Success on success
* @return IRErrorCode_Incomplete_IR if reader doesn't contain enough data to deserialize
*/
IRErrorCode deserialize_utc_offset_change(ReaderInterface& reader, UtcOffset& utc_offset);

/**
* Validates whether the given protocol version can be supported by the current build.
* @param protocol_version
Expand All @@ -164,37 +184,39 @@ IRProtocolErrorCode validate_protocol_version(std::string_view protocol_version)

namespace eight_byte_encoding {
/**
* Deserializes the next log event from an eight-byte encoding IR stream.
* Deserializes the next log event from an eight-byte encoding IR stream and decodes the message.
* @param reader
* @param encoded_tag
* @param message Returns the deserialized message
* @param timestamp Returns the deserialized timestamp
* @return ErrorCode_Success on success
* @return ErrorCode_Corrupted_IR if reader contains invalid IR
* @return ErrorCode_Decode_Error if the log event cannot be properly deserialized
* @return ErrorCode_Incomplete_IR if reader doesn't contain enough data to deserialize
* @return ErrorCode_End_of_IR if the IR ends
LinZhihao-723 marked this conversation as resolved.
Show resolved Hide resolved
*/
IRErrorCode deserialize_log_event(
ReaderInterface& reader,
encoded_tag_t encoded_tag,
std::string& message,
ir::epoch_time_ms_t& timestamp
);
} // namespace eight_byte_encoding

namespace four_byte_encoding {
/**
* Deserializes the next log event from a four-byte encoding IR stream.
* Deserializes the next log event from a four-byte encoding IR stream and decodes the message.
* @param reader
* @param encoded_tag
* @param message Returns the deserialized message
* @param timestamp_delta Returns the deserialized timestamp delta
* @return ErrorCode_Success on success
* @return ErrorCode_Corrupted_IR if reader contains invalid IR
* @return ErrorCode_Decode_Error if the log event cannot be properly deserialized
* @return ErrorCode_Incomplete_IR if reader doesn't contain enough data to deserialize
* @return ErrorCode_End_of_IR if the IR ends
*/
IRErrorCode deserialize_log_event(
ReaderInterface& reader,
encoded_tag_t encoded_tag,
std::string& message,
ir::epoch_time_ms_t& timestamp_delta
);
Expand Down
6 changes: 6 additions & 0 deletions components/core/src/clp/ffi/ir_stream/encoding_methods.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

#include "../../ir/parsing.hpp"
#include "../../ir/types.hpp"
#include "../../time_types.hpp"
#include "byteswap.hpp"
#include "protocol_constants.hpp"

Expand Down Expand Up @@ -306,4 +307,9 @@ bool serialize_timestamp(epoch_time_ms_t timestamp_delta, std::vector<int8_t>& i
return true;
}
} // namespace four_byte_encoding

void serialize_utc_offset_change(UtcOffset utc_offset, std::vector<int8_t>& ir_buf) {
ir_buf.emplace_back(cProtocol::Payload::UtcOffsetChange);
serialize_int(static_cast<int64_t>(utc_offset.count()), ir_buf);
}
} // namespace clp::ffi::ir_stream
8 changes: 8 additions & 0 deletions components/core/src/clp/ffi/ir_stream/encoding_methods.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <vector>

#include "../../ir/types.hpp"
#include "../../time_types.hpp"
#include "../encoding_methods.hpp"

namespace clp::ffi::ir_stream {
Expand Down Expand Up @@ -91,6 +92,13 @@ bool serialize_message(std::string_view message, std::string& logtype, std::vect
*/
bool serialize_timestamp(ir::epoch_time_ms_t timestamp_delta, std::vector<int8_t>& ir_buf);
} // namespace four_byte_encoding

/**
* Serializes the given UTC offset into the IR stream
* @param utc_offset
* @param ir_buf
*/
void serialize_utc_offset_change(UtcOffset utc_offset, std::vector<int8_t>& ir_buf);
} // namespace clp::ffi::ir_stream

#endif // CLP_FFI_IR_STREAM_ENCODING_METHODS_HPP
4 changes: 3 additions & 1 deletion components/core/src/clp/ffi/ir_stream/protocol_constants.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ constexpr int8_t LengthUByte = 0x11;
constexpr int8_t LengthUShort = 0x12;

constexpr char VersionKey[] = "VERSION";
constexpr char VersionValue[] = "0.0.1";
constexpr char VersionValue[] = "0.0.2";

// The following regex can be used to validate a Semantic Versioning string. The source of the
// regex can be found here: https://semver.org/
Expand Down Expand Up @@ -47,6 +47,8 @@ constexpr int8_t TimestampDeltaByte = 0x31;
constexpr int8_t TimestampDeltaShort = 0x32;
constexpr int8_t TimestampDeltaInt = 0x33;
constexpr int8_t TimestampDeltaLong = 0x34;

constexpr int8_t UtcOffsetChange = 0x3F;
} // namespace Payload

constexpr int8_t FourByteEncodingMagicNumber[]
Expand Down
8 changes: 7 additions & 1 deletion components/core/src/clp/ir/LogEvent.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <vector>

#include "../Defs.h"
#include "time_types.hpp"
#include "types.hpp"

namespace clp::ir {
Expand All @@ -18,18 +19,22 @@ class LogEvent {
// Constructors
LogEvent(
epoch_time_ms_t timestamp,
UtcOffset utc_offset,
std::string logtype,
std::vector<std::string> dict_vars,
std::vector<encoded_variable_t> encoded_vars
)
: m_timestamp{timestamp},
m_utc_offset{utc_offset},
m_logtype{std::move(logtype)},
m_dict_vars{std::move(dict_vars)},
m_encoded_vars{std::move(encoded_vars)} {}

// Methods
[[nodiscard]] auto get_timestamp() const -> epoch_time_ms_t { return m_timestamp; }

[[nodiscard]] auto get_utc_offset() const -> UtcOffset { return m_utc_offset; }

[[nodiscard]] auto get_logtype() const -> std::string const& { return m_logtype; }

[[nodiscard]] auto get_dict_vars() const -> std::vector<std::string> const& {
Expand All @@ -42,7 +47,8 @@ class LogEvent {

private:
// Variables
epoch_time_ms_t m_timestamp;
epoch_time_ms_t m_timestamp{0};
UtcOffset m_utc_offset{0};
std::string m_logtype;
std::vector<std::string> m_dict_vars;
std::vector<encoded_variable_t> m_encoded_vars;
Expand Down
Loading
Loading