From 329edf66e8d368de3070ba266fcf653060fd1541 Mon Sep 17 00:00:00 2001 From: Bingran Hu Date: Mon, 23 Dec 2024 00:06:46 -0500 Subject: [PATCH 1/5] feat(core-clp): Add LZMA `Compressor` implementation and LZMA dependency. (#614) --- components/core/.clang-format | 2 +- components/core/CMakeLists.txt | 26 +- .../clp/streaming_compression/Constants.hpp | 1 + .../streaming_compression/lzma/Compressor.cpp | 203 ++++++++++++++++ .../streaming_compression/lzma/Compressor.hpp | 230 ++++++++++++++++++ .../streaming_compression/lzma/Constants.hpp | 15 ++ .../core/tests/test-StreamingCompression.cpp | 110 +++++---- .../install-prebuilt-packages.sh | 3 +- .../core/tools/scripts/lib_install/liblzma.sh | 66 +++++ .../scripts/lib_install/macos/install-all.sh | 1 + .../install-packages-from-source.sh | 1 + .../ubuntu-focal/install-prebuilt-packages.sh | 1 + .../install-packages-from-source.sh | 1 + .../ubuntu-jammy/install-prebuilt-packages.sh | 1 + 14 files changed, 615 insertions(+), 46 deletions(-) create mode 100644 components/core/src/clp/streaming_compression/lzma/Compressor.cpp create mode 100644 components/core/src/clp/streaming_compression/lzma/Compressor.hpp create mode 100644 components/core/src/clp/streaming_compression/lzma/Constants.hpp create mode 100755 components/core/tools/scripts/lib_install/liblzma.sh diff --git a/components/core/.clang-format b/components/core/.clang-format index ff65adbae..4d0d3a87c 100644 --- a/components/core/.clang-format +++ b/components/core/.clang-format @@ -4,7 +4,7 @@ IncludeCategories: # NOTE: A header is grouped by first matching regex # Library headers. Update when adding new libraries. # NOTE: clang-format retains leading white-space on a line in violation of the YAML spec. - - Regex: "<(absl|antlr4|archive|boost|bsoncxx|catch2|curl|date|fmt|json|log_surgeon|mongocxx\ + - Regex: "<(absl|antlr4|archive|boost|bsoncxx|catch2|curl|date|fmt|json|log_surgeon|lzma|mongocxx\ |msgpack|mysql|openssl|outcome|regex_utils|simdjson|spdlog|sqlite3|string_utils|yaml-cpp|zstd)" Priority: 3 # C system headers diff --git a/components/core/CMakeLists.txt b/components/core/CMakeLists.txt index ce74f04cc..0995a0afb 100644 --- a/components/core/CMakeLists.txt +++ b/components/core/CMakeLists.txt @@ -11,8 +11,11 @@ set(CMAKE_EXPORT_COMPILE_COMMANDS ON) # Set general compressor set(GENERAL_COMPRESSOR "zstd" CACHE STRING "The general-purpose compressor used as the 2nd-stage compressor") -set_property(CACHE GENERAL_COMPRESSOR PROPERTY STRINGS passthrough zstd) -if ("${GENERAL_COMPRESSOR}" STREQUAL "passthrough") +set_property(CACHE GENERAL_COMPRESSOR PROPERTY STRINGS lzma passthrough zstd) +if ("${GENERAL_COMPRESSOR}" STREQUAL "lzma") + add_definitions(-DUSE_LZMA_COMPRESSION=1) + message(STATUS "Using Lempel–Ziv–Markov chain Algorithm compression") +elseif ("${GENERAL_COMPRESSOR}" STREQUAL "passthrough") add_definitions(-DUSE_PASSTHROUGH_COMPRESSION=1) message(STATUS "Using passthrough compression") elseif ("${GENERAL_COMPRESSOR}" STREQUAL "zstd") @@ -224,6 +227,21 @@ else() message(FATAL_ERROR "Could not find ${CLP_LIBS_STRING} libraries for ZStd") endif() +# Find and setup LZMA Library +# TODO: Add a script in ./cmake/Modules to properly import LZMA in find_package()'s module mode +if(CLP_USE_STATIC_LIBS) + set(LIBLZMA_USE_STATIC_LIBS ON) +endif() +find_package(LibLZMA REQUIRED) +if(LIBLZMA_FOUND) + message(STATUS "Found Lzma ${LIBLZMA_VERSION_STRING}") + message(STATUS "Lzma library location: ${LIBLZMA_LIBRARIES}") + message(STATUS "Lzma Include Dir: ${LIBLZMA_INCLUDE_DIRS}") +else() + message(FATAL_ERROR "Could not find ${CLP_LIBS_STRING} libraries for Lzma") +endif() +include_directories(${LIBLZMA_INCLUDE_DIRS}) + # sqlite dependencies set(sqlite_DYNAMIC_LIBS "dl;m;pthread") include(cmake/Modules/FindLibraryDependencies.cmake) @@ -516,6 +534,9 @@ set(SOURCE_FILES_unitTest src/clp/streaming_compression/Compressor.hpp src/clp/streaming_compression/Constants.hpp src/clp/streaming_compression/Decompressor.hpp + src/clp/streaming_compression/lzma/Compressor.cpp + src/clp/streaming_compression/lzma/Compressor.hpp + src/clp/streaming_compression/lzma/Constants.hpp src/clp/streaming_compression/passthrough/Compressor.cpp src/clp/streaming_compression/passthrough/Compressor.hpp src/clp/streaming_compression/passthrough/Decompressor.cpp @@ -608,6 +629,7 @@ target_link_libraries(unitTest clp::regex_utils clp::string_utils yaml-cpp::yaml-cpp + ${LIBLZMA_LIBRARIES} ZStd::ZStd ) target_compile_features(unitTest diff --git a/components/core/src/clp/streaming_compression/Constants.hpp b/components/core/src/clp/streaming_compression/Constants.hpp index 4649c2e98..080f3a20b 100644 --- a/components/core/src/clp/streaming_compression/Constants.hpp +++ b/components/core/src/clp/streaming_compression/Constants.hpp @@ -7,6 +7,7 @@ namespace clp::streaming_compression { enum class CompressorType : uint8_t { ZSTD = 0x10, + LZMA = 0x20, Passthrough = 0xFF, }; } // namespace clp::streaming_compression diff --git a/components/core/src/clp/streaming_compression/lzma/Compressor.cpp b/components/core/src/clp/streaming_compression/lzma/Compressor.cpp new file mode 100644 index 000000000..34c1a0e2b --- /dev/null +++ b/components/core/src/clp/streaming_compression/lzma/Compressor.cpp @@ -0,0 +1,203 @@ +#include "Compressor.hpp" + +#include +#include +#include +#include + +#include +#include + +#include "../../ErrorCode.hpp" +#include "../../FileWriter.hpp" +#include "../../TraceableException.hpp" +#include "../../type_utils.hpp" + +namespace clp::streaming_compression::lzma { +auto Compressor::open(FileWriter& file_writer) -> void { + if (nullptr != m_compressed_stream_file_writer) { + throw OperationFailed(ErrorCode_NotReady, __FILENAME__, __LINE__); + } + + m_lzma_stream.detach_input(); + if (false + == m_lzma_stream.attach_output( + m_compressed_stream_block_buffer.data(), + m_compressed_stream_block_buffer.size() + )) + { + throw OperationFailed(ErrorCode_Failure, __FILENAME__, __LINE__); + } + m_compressed_stream_file_writer = &file_writer; + m_uncompressed_stream_pos = 0; +} + +auto Compressor::close() -> void { + if (nullptr == m_compressed_stream_file_writer) { + throw OperationFailed(ErrorCode_NotInit, __FILENAME__, __LINE__); + } + + if (m_lzma_stream.avail_in() > 0) { + throw OperationFailed(ErrorCode_Failure, __FILENAME__, __LINE__); + } + + flush_lzma(LZMA_FINISH); + m_lzma_stream.end_and_detach_output(); + m_compressed_stream_file_writer = nullptr; +} + +auto Compressor::write(char const* data, size_t data_length) -> void { + if (nullptr == m_compressed_stream_file_writer) { + throw OperationFailed(ErrorCode_NotInit, __FILENAME__, __LINE__); + } + if (false + == m_lzma_stream + .attach_input(clp::size_checked_pointer_cast(data), data_length)) + { + throw OperationFailed(ErrorCode_BadParam, __FILENAME__, __LINE__); + } + encode_lzma(); + m_lzma_stream.detach_input(); + m_uncompressed_stream_pos += data_length; +} + +auto Compressor::flush() -> void { + if (nullptr == m_compressed_stream_file_writer) { + throw OperationFailed(ErrorCode_NotInit, __FILENAME__, __LINE__); + } + flush_lzma(LZMA_SYNC_FLUSH); +} + +auto Compressor::try_get_pos(size_t& pos) const -> ErrorCode { + if (nullptr == m_compressed_stream_file_writer) { + return ErrorCode_NotInit; + } + pos = m_uncompressed_stream_pos; + return ErrorCode_Success; +} + +auto Compressor::encode_lzma() -> void { + while (m_lzma_stream.avail_in() > 0) { + if (0 == m_lzma_stream.avail_out()) { + flush_stream_output_block_buffer(); + } + auto const rc = m_lzma_stream.lzma_code(LZMA_RUN); + switch (rc) { + case LZMA_OK: + break; + case LZMA_BUF_ERROR: + SPDLOG_ERROR("LZMA compressor input stream is corrupt. No encoding " + "progress can be made."); + throw OperationFailed(ErrorCode_Failure, __FILENAME__, __LINE__); + default: + SPDLOG_ERROR( + "lzma_code() returned an unexpected value - {}.", + static_cast(rc) + ); + throw OperationFailed(ErrorCode_Failure, __FILENAME__, __LINE__); + } + } +} + +auto Compressor::flush_lzma(lzma_action flush_action) -> void { + if (false == LzmaStream::is_flush_action(flush_action)) { + SPDLOG_ERROR( + "lzma_code() supplied with invalid flush action - {}.", + static_cast(flush_action) + ); + throw OperationFailed(ErrorCode_BadParam, __FILENAME__, __LINE__); + } + + bool flushed{false}; + while (false == flushed) { + if (0 == m_lzma_stream.avail_out()) { + flush_stream_output_block_buffer(); + } + auto const rc = m_lzma_stream.lzma_code(flush_action); + switch (rc) { + case LZMA_OK: + break; + case LZMA_STREAM_END: + // NOTE: flush may not have completed if a multithreaded encoder is using action + // LZMA_FULL_BARRIER. For now, we skip this check. + flushed = true; + break; + case LZMA_BUF_ERROR: + // NOTE: this can happen if we are using LZMA_FULL_FLUSH or LZMA_FULL_BARRIER. These + // two actions keeps encoding input data alongside flushing buffered encoded data. + SPDLOG_ERROR("LZMA compressor input stream is corrupt. No encoding " + "progress can be made."); + throw OperationFailed(ErrorCode_Failure, __FILENAME__, __LINE__); + default: + SPDLOG_ERROR( + "lzma_code() returned an unexpected value - {}.", + static_cast(rc) + ); + throw OperationFailed(ErrorCode_Failure, __FILENAME__, __LINE__); + } + } + flush_stream_output_block_buffer(); +} + +auto Compressor::flush_stream_output_block_buffer() -> void { + if (cCompressedStreamBlockBufferSize == m_lzma_stream.avail_out()) { + return; + } + m_compressed_stream_file_writer->write( + clp::size_checked_pointer_cast(m_compressed_stream_block_buffer.data()), + cCompressedStreamBlockBufferSize - m_lzma_stream.avail_out() + ); + if (false + == m_lzma_stream.attach_output( + m_compressed_stream_block_buffer.data(), + m_compressed_stream_block_buffer.size() + )) + { + throw OperationFailed(ErrorCode_Failure, __FILENAME__, __LINE__); + } +} + +Compressor::LzmaStream::LzmaStream(int compression_level, size_t dict_size, lzma_check check) { + lzma_options_lzma options; + if (0 != lzma_lzma_preset(&options, compression_level)) { + SPDLOG_ERROR("Failed to initialize LZMA options' compression level."); + throw OperationFailed(ErrorCode_BadParam, __FILENAME__, __LINE__); + } + options.dict_size = dict_size; + std::array filters{{ + {.id = LZMA_FILTER_LZMA2, .options = &options}, + {.id = LZMA_VLI_UNKNOWN, .options = nullptr}, + }}; + + auto const rc = lzma_stream_encoder(&m_stream, filters.data(), check); + if (LZMA_OK == rc) { + return; + } + + char const* msg{nullptr}; + switch (rc) { + case LZMA_MEM_ERROR: + msg = "Memory allocation failed"; + break; + + case LZMA_OPTIONS_ERROR: + msg = "Specified preset is not supported"; + break; + + case LZMA_UNSUPPORTED_CHECK: + msg = "Specified integrity check is not supported"; + break; + + case LZMA_PROG_ERROR: + msg = "Input arguments are not sane"; + break; + + default: + msg = "Unknown error"; + break; + } + + SPDLOG_ERROR("Error initializing the encoder: {} (error code {})", msg, static_cast(rc)); + throw OperationFailed(ErrorCode_BadParam, __FILENAME__, __LINE__); +} +} // namespace clp::streaming_compression::lzma diff --git a/components/core/src/clp/streaming_compression/lzma/Compressor.hpp b/components/core/src/clp/streaming_compression/lzma/Compressor.hpp new file mode 100644 index 000000000..de665eaf6 --- /dev/null +++ b/components/core/src/clp/streaming_compression/lzma/Compressor.hpp @@ -0,0 +1,230 @@ +#ifndef CLP_STREAMING_COMPRESSION_LZMA_COMPRESSOR_HPP +#define CLP_STREAMING_COMPRESSION_LZMA_COMPRESSOR_HPP + +#include +#include + +#include + +#include "../../Array.hpp" +#include "../../ErrorCode.hpp" +#include "../../FileWriter.hpp" +#include "../../TraceableException.hpp" +#include "../Compressor.hpp" +#include "Constants.hpp" + +namespace clp::streaming_compression::lzma { +/** + * Implements a LZMA compressor that compresses byte input data to a file. + */ +class Compressor : public ::clp::streaming_compression::Compressor { +public: + // Types + class OperationFailed : public TraceableException { + public: + // Constructors + OperationFailed(ErrorCode error_code, char const* const filename, int line_number) + : TraceableException(error_code, filename, line_number) {} + + // Methods + [[nodiscard]] auto what() const noexcept -> char const* override { + return "streaming_compression::lzma::Compressor operation failed"; + } + }; + + // Constructors + Compressor() : Compressor{cDefaultCompressionLevel, cDefaultDictionarySize, LZMA_CHECK_CRC64} {} + + Compressor(int compression_level, size_t dict_size, lzma_check check) + : m_lzma_stream{compression_level, dict_size, check} {} + + // Destructor + ~Compressor() override = default; + + // Delete copy constructor and assignment operator + Compressor(Compressor const&) = delete; + auto operator=(Compressor const&) -> Compressor& = delete; + + // Default move constructor and assignment operator + Compressor(Compressor&&) noexcept = default; + auto operator=(Compressor&&) noexcept -> Compressor& = default; + + // Methods implementing the WriterInterface + /** + * Writes the given data to the compressor + * @param data + * @param data_length + */ + auto write(char const* data, size_t data_length) -> void override; + + /** + * Writes any internally buffered data to file and ends the current frame + * + * Forces all the encoded data buffered by LZMA to be available at output + */ + auto flush() -> void override; + + /** + * Tries to get the current position of the write head + * @param pos Position of the write head + * @return ErrorCode_NotInit if the compressor is not open + * @return ErrorCode_Success on success + */ + auto try_get_pos(size_t& pos) const -> ErrorCode override; + + // Methods implementing the Compressor interface + /** + * Closes the compressor + */ + auto close() -> void override; + + /** + * Open the compression stream for encoding to the file_writer. + * + * @param file_writer + */ + auto open(FileWriter& file_writer) -> void override; + +private: + /** + * Wrapper class around lzma_stream providing easier usage. + */ + class LzmaStream { + public: + /** + * Initializes an LZMA compression encoder and its streams. + * + * @param compression_level Compression preset level in the range [0-9] where the higher + * numbers use increasingly more memory for greater compression ratios. + * @param dict_size Max amount of recently processed uncompressed bytes to keep in the + * memory. + * @param check Type of check to verify the integrity of the uncompressed data. + * LZMA_CHECK_CRC64 is the default in the xz command line tool. If the .xz file needs to be + * decompressed with XZ-Embedded, use LZMA_CHECK_CRC32 instead. + * + * @throw `OperationFailed` `ErrorCode_BadParam` if the LZMA options are invalid or the + * encoder fails to initialize. + */ + LzmaStream(int compression_level, size_t dict_size, lzma_check check); + + // Destructor + ~LzmaStream() = default; + + // Delete copy constructor and assignment operator + LzmaStream(LzmaStream const&) = delete; + auto operator=(LzmaStream const&) -> LzmaStream& = delete; + + // Default move constructor and assignment operator + LzmaStream(LzmaStream&&) noexcept = default; + auto operator=(LzmaStream&&) noexcept -> LzmaStream& = default; + + /** + * Attaches a pre-allocated block buffer to the encoder's input stream. + * + * @return false if the data buffer is null. + * @return true on success. + */ + [[nodiscard]] auto attach_input(uint8_t const* data_ptr, size_t data_length) -> bool { + if (nullptr == data_ptr) { + return false; + } + m_stream.next_in = data_ptr; + m_stream.avail_in = data_length; + return true; + } + + /** + * Attaches a pre-allocated block buffer to the encoder's output stream. + * + * @return false if the data buffer is null or empty. + * @return true on success. + */ + [[nodiscard]] auto attach_output(uint8_t* data_ptr, size_t data_length) -> bool { + if (nullptr == data_ptr || 0 == data_length) { + return false; + } + m_stream.next_out = data_ptr; + m_stream.avail_out = data_length; + return true; + } + + [[nodiscard]] auto avail_in() const -> size_t { return m_stream.avail_in; } + + [[nodiscard]] auto avail_out() const -> size_t { return m_stream.avail_out; } + + /** + * Unset the internal fields of the encoder's input stream. + */ + auto detach_input() -> void { + m_stream.next_in = nullptr; + m_stream.avail_in = 0; + } + + /** + * End the LZMA stream and unset the internal fields of the encoder's output stream. + */ + auto end_and_detach_output() -> void { + lzma_end(&m_stream); + m_stream.next_out = nullptr; + m_stream.avail_out = 0; + } + + [[nodiscard]] static auto is_flush_action(lzma_action action) -> bool { + return LZMA_SYNC_FLUSH == action || LZMA_FULL_FLUSH == action + || LZMA_FULL_BARRIER == action || LZMA_FINISH == action; + } + + [[nodiscard]] auto lzma_code(lzma_action action) -> lzma_ret { + return ::lzma_code(&m_stream, action); + } + + private: + lzma_stream m_stream = LZMA_STREAM_INIT; + }; + + static constexpr size_t cCompressedStreamBlockBufferSize{4096}; // 4KiB + + /** + * Invokes lzma_code() repeatedly with LZMA_RUN until the input is exhausted + * + * At the end of the workflow, the last bytes of encoded data may still be buffered in the LZMA + * stream and thus not immediately available at the output block buffer. + * + * Assumes input stream and output block buffer are both in valid states. + * @throw `OperationFailed` if LZMA returns an unexpected error value + */ + auto encode_lzma() -> void; + + /** + * Invokes lzma_code() repeatedly with the given flushing action until all encoded data is made + * available at the output block buffer + * + * Once flushing starts, the workflow action needs to stay the same until flushing is signaled + * complete by LZMA (aka LZMA_STREAM_END is reached). + * See also: https://github.com/tukaani-project/xz/blob/master/src/liblzma/api/lzma/base.h#L274 + * + * Assumes input stream and output block buffer are both in valid states. + * @param flush_action + * @throw `OperationFailed` if the provided action is not an LZMA flush + * action, or if LZMA returns an unexpected error value + */ + auto flush_lzma(lzma_action flush_action) -> void; + + /** + * Flushes the current compressed data in the output block buffer to the output file handler. + * + * Also resets the output block buffer to receive new data. + */ + auto flush_stream_output_block_buffer() -> void; + + // Variables + FileWriter* m_compressed_stream_file_writer{nullptr}; + + // Compressed stream variables + Array m_compressed_stream_block_buffer{cCompressedStreamBlockBufferSize}; + LzmaStream m_lzma_stream; + size_t m_uncompressed_stream_pos{0}; +}; +} // namespace clp::streaming_compression::lzma + +#endif // CLP_STREAMING_COMPRESSION_LZMA_COMPRESSOR_HPP diff --git a/components/core/src/clp/streaming_compression/lzma/Constants.hpp b/components/core/src/clp/streaming_compression/lzma/Constants.hpp new file mode 100644 index 000000000..4e261187a --- /dev/null +++ b/components/core/src/clp/streaming_compression/lzma/Constants.hpp @@ -0,0 +1,15 @@ +#ifndef STREAMING_COMPRESSION_LZMA_CONSTANTS_HPP +#define STREAMING_COMPRESSION_LZMA_CONSTANTS_HPP + +#include + +#include + +namespace clp::streaming_compression::lzma { +constexpr int cDefaultCompressionLevel{3}; +constexpr int cMinCompressionLevel{0}; +constexpr int cMaxCompressionLevel{9}; +constexpr uint32_t cDefaultDictionarySize{LZMA_DICT_SIZE_DEFAULT}; +} // namespace clp::streaming_compression::lzma + +#endif // STREAMING_COMPRESSION_LZMA_CONSTANTS_HPP diff --git a/components/core/tests/test-StreamingCompression.cpp b/components/core/tests/test-StreamingCompression.cpp index 0fbae9e3a..9f0df9306 100644 --- a/components/core/tests/test-StreamingCompression.cpp +++ b/components/core/tests/test-StreamingCompression.cpp @@ -4,6 +4,8 @@ #include #include #include +#include +#include #include #include @@ -15,6 +17,7 @@ #include "../src/clp/ReadOnlyMemoryMappedFile.hpp" #include "../src/clp/streaming_compression/Compressor.hpp" #include "../src/clp/streaming_compression/Decompressor.hpp" +#include "../src/clp/streaming_compression/lzma/Compressor.hpp" #include "../src/clp/streaming_compression/passthrough/Compressor.hpp" #include "../src/clp/streaming_compression/passthrough/Decompressor.hpp" #include "../src/clp/streaming_compression/zstd/Compressor.hpp" @@ -25,56 +28,48 @@ using clp::ErrorCode_Success; using clp::FileWriter; using clp::streaming_compression::Compressor; using clp::streaming_compression::Decompressor; +using std::string; +using std::string_view; -TEST_CASE("StreamingCompression", "[StreamingCompression]") { - // Initialize constants - constexpr size_t cBufferSize{128L * 1024 * 1024}; // 128MB - constexpr auto cCompressionChunkSizes = std::to_array( - {cBufferSize / 100, - cBufferSize / 50, - cBufferSize / 25, - cBufferSize / 10, - cBufferSize / 5, - cBufferSize / 2, - cBufferSize} - ); - constexpr size_t cAlphabetLength{26}; - std::string const compressed_file_path{"test_streaming_compressed_file.bin"}; - - // Initialize compression devices - std::unique_ptr compressor; - std::unique_ptr decompressor; - - SECTION("ZStd single phase compression") { - compressor = std::make_unique(); - decompressor = std::make_unique(); - } - - SECTION("Passthrough compression") { - compressor = std::make_unique(); - decompressor = std::make_unique(); - } +namespace { +constexpr string_view cCompressedFilePath{"test_streaming_compressed_file.bin"}; +constexpr size_t cBufferSize{128L * 1024 * 1024}; // 128MB +constexpr auto cCompressionChunkSizes = std::to_array( + {0, + cBufferSize / 100, + cBufferSize / 50, + cBufferSize / 25, + cBufferSize / 10, + cBufferSize / 5, + cBufferSize / 2, + cBufferSize} +); - // Initialize buffers - Array uncompressed_buffer{cBufferSize}; - for (size_t i{0}; i < cBufferSize; ++i) { - uncompressed_buffer.at(i) = static_cast(('a' + (i % cAlphabetLength))); - } +auto compress(std::unique_ptr compressor, char const* src) -> void; - Array decompressed_buffer{cBufferSize}; +auto decompress_and_compare( + std::unique_ptr decompressor, + Array const& uncompressed_buffer, + Array& decompressed_buffer +) -> void; - // Compress +auto compress(std::unique_ptr compressor, char const* src) -> void { FileWriter file_writer; - file_writer.open(compressed_file_path, FileWriter::OpenMode::CREATE_FOR_WRITING); + file_writer.open(string(cCompressedFilePath), FileWriter::OpenMode::CREATE_FOR_WRITING); compressor->open(file_writer); for (auto const chunk_size : cCompressionChunkSizes) { - compressor->write(uncompressed_buffer.data(), chunk_size); + compressor->write(src, chunk_size); } compressor->close(); file_writer.close(); +} - // Decompress and compare - clp::ReadOnlyMemoryMappedFile const memory_mapped_compressed_file{compressed_file_path}; +auto decompress_and_compare( + std::unique_ptr decompressor, + Array const& uncompressed_buffer, + Array& decompressed_buffer +) -> void { + clp::ReadOnlyMemoryMappedFile const memory_mapped_compressed_file{string(cCompressedFilePath)}; auto const compressed_file_view{memory_mapped_compressed_file.get_view()}; decompressor->open(compressed_file_view.data(), compressed_file_view.size()); @@ -98,7 +93,6 @@ TEST_CASE("StreamingCompression", "[StreamingCompression]") { num_uncompressed_bytes += chunk_size; } - // Sanity check REQUIRE( (std::accumulate( cCompressionChunkSizes.cbegin(), @@ -107,7 +101,39 @@ TEST_CASE("StreamingCompression", "[StreamingCompression]") { ) == num_uncompressed_bytes) ); +} +} // namespace + +TEST_CASE("StreamingCompression", "[StreamingCompression]") { + constexpr size_t cAlphabetLength{26}; + + std::unique_ptr compressor; + std::unique_ptr decompressor; + + Array decompressed_buffer{cBufferSize}; + Array uncompressed_buffer{cBufferSize}; + for (size_t i{0}; i < cBufferSize; ++i) { + uncompressed_buffer.at(i) = static_cast(('a' + (i % cAlphabetLength))); + } + + SECTION("ZStd single phase compression") { + compressor = std::make_unique(); + compress(std::move(compressor), uncompressed_buffer.data()); + decompressor = std::make_unique(); + decompress_and_compare(std::move(decompressor), uncompressed_buffer, decompressed_buffer); + } + + SECTION("Passthrough compression") { + compressor = std::make_unique(); + compress(std::move(compressor), uncompressed_buffer.data()); + decompressor = std::make_unique(); + decompress_and_compare(std::move(decompressor), uncompressed_buffer, decompressed_buffer); + } + + SECTION("LZMA compression") { + compressor = std::make_unique(); + compress(std::move(compressor), uncompressed_buffer.data()); + } - // Cleanup - boost::filesystem::remove(compressed_file_path); + boost::filesystem::remove(string(cCompressedFilePath)); } diff --git a/components/core/tools/scripts/lib_install/centos-stream-9/install-prebuilt-packages.sh b/components/core/tools/scripts/lib_install/centos-stream-9/install-prebuilt-packages.sh index 66ea4ac4f..c51a521c1 100755 --- a/components/core/tools/scripts/lib_install/centos-stream-9/install-prebuilt-packages.sh +++ b/components/core/tools/scripts/lib_install/centos-stream-9/install-prebuilt-packages.sh @@ -18,4 +18,5 @@ dnf install -y \ libzstd-devel \ make \ mariadb-connector-c-devel \ - openssl-devel + openssl-devel \ + xz-devel diff --git a/components/core/tools/scripts/lib_install/liblzma.sh b/components/core/tools/scripts/lib_install/liblzma.sh new file mode 100755 index 000000000..a73ff79b9 --- /dev/null +++ b/components/core/tools/scripts/lib_install/liblzma.sh @@ -0,0 +1,66 @@ +#!/usr/bin/env bash + +# Exit on any error +set -e + +# Error on undefined variable +set -u + +# Dependencies: +# - curl +# - make +# - gcc +# NOTE: Dependencies should be installed outside the script to allow the script to be largely distro-agnostic + +for cmd in curl make gcc; do + if ! $cmd --version >/dev/null 2>&1; then + echo "Error: Required dependency '$cmd' not found" + exit 1 + fi +done + +cUsage="Usage: ${BASH_SOURCE[0]} [ <.deb output directory>]" +if [ "$#" -lt 1 ] ; then + echo $cUsage + exit +fi +version=$1 + +package_name=liblzma +temp_dir=/tmp/${package_name}-installation +deb_output_dir=${temp_dir} +if [[ "$#" -gt 1 ]] ; then + deb_output_dir="$(readlink -f "$2")" + if [ ! -d ${deb_output_dir} ] ; then + echo "${deb_output_dir} does not exist or is not a directory" + exit + fi +fi + +# Note: we won't check if the package already exists + +# Get number of cpu cores +num_cpus=$(grep -c ^processor /proc/cpuinfo) + +# Download +mkdir -p $temp_dir +cd $temp_dir +extracted_dir=${temp_dir}/xz-${version} +if [ ! -e ${extracted_dir} ] ; then + tar_filename=xz-${version}.tar.gz + if [ ! -e ${tar_filename} ] ; then + curl -fsSL https://github.com/tukaani-project/xz/releases/download/v${version}/${tar_filename} -o ${tar_filename} + fi + tar -xf ${tar_filename} +fi + +# Build +cd ${extracted_dir} +mkdir build +cd build +cmake -DCMAKE_POSITION_INDEPENDENT_CODE=TRUE ../ +make -j${num_cpus} +make install liblzma + +# Clean up +rm -rf $temp_dir diff --git a/components/core/tools/scripts/lib_install/macos/install-all.sh b/components/core/tools/scripts/lib_install/macos/install-all.sh index 97e41903d..cb24dd054 100755 --- a/components/core/tools/scripts/lib_install/macos/install-all.sh +++ b/components/core/tools/scripts/lib_install/macos/install-all.sh @@ -21,6 +21,7 @@ brew install \ mongo-cxx-driver \ msgpack-cxx \ spdlog \ + xz \ zstd # Install pkg-config if it isn't already installed diff --git a/components/core/tools/scripts/lib_install/ubuntu-focal/install-packages-from-source.sh b/components/core/tools/scripts/lib_install/ubuntu-focal/install-packages-from-source.sh index 035c5f4da..839f6d3c3 100755 --- a/components/core/tools/scripts/lib_install/ubuntu-focal/install-packages-from-source.sh +++ b/components/core/tools/scripts/lib_install/ubuntu-focal/install-packages-from-source.sh @@ -14,6 +14,7 @@ lib_install_scripts_dir=$script_dir/.. "$lib_install_scripts_dir"/fmtlib.sh 8.0.1 "$lib_install_scripts_dir"/libarchive.sh 3.5.1 +"$lib_install_scripts_dir"/liblzma.sh 5.4.6 "$lib_install_scripts_dir"/lz4.sh 1.8.2 "$lib_install_scripts_dir"/mongocxx.sh 3.10.2 "$lib_install_scripts_dir"/msgpack.sh 7.0.0 diff --git a/components/core/tools/scripts/lib_install/ubuntu-focal/install-prebuilt-packages.sh b/components/core/tools/scripts/lib_install/ubuntu-focal/install-prebuilt-packages.sh index 8997ffe01..3ea3b3ed5 100755 --- a/components/core/tools/scripts/lib_install/ubuntu-focal/install-prebuilt-packages.sh +++ b/components/core/tools/scripts/lib_install/ubuntu-focal/install-prebuilt-packages.sh @@ -20,6 +20,7 @@ DEBIAN_FRONTEND=noninteractive apt-get install --no-install-recommends -y \ jq \ libcurl4 \ libcurl4-openssl-dev \ + liblzma-dev \ libmariadb-dev \ libssl-dev \ make \ diff --git a/components/core/tools/scripts/lib_install/ubuntu-jammy/install-packages-from-source.sh b/components/core/tools/scripts/lib_install/ubuntu-jammy/install-packages-from-source.sh index 035c5f4da..839f6d3c3 100755 --- a/components/core/tools/scripts/lib_install/ubuntu-jammy/install-packages-from-source.sh +++ b/components/core/tools/scripts/lib_install/ubuntu-jammy/install-packages-from-source.sh @@ -14,6 +14,7 @@ lib_install_scripts_dir=$script_dir/.. "$lib_install_scripts_dir"/fmtlib.sh 8.0.1 "$lib_install_scripts_dir"/libarchive.sh 3.5.1 +"$lib_install_scripts_dir"/liblzma.sh 5.4.6 "$lib_install_scripts_dir"/lz4.sh 1.8.2 "$lib_install_scripts_dir"/mongocxx.sh 3.10.2 "$lib_install_scripts_dir"/msgpack.sh 7.0.0 diff --git a/components/core/tools/scripts/lib_install/ubuntu-jammy/install-prebuilt-packages.sh b/components/core/tools/scripts/lib_install/ubuntu-jammy/install-prebuilt-packages.sh index 4a71a122c..ea055ffdf 100755 --- a/components/core/tools/scripts/lib_install/ubuntu-jammy/install-prebuilt-packages.sh +++ b/components/core/tools/scripts/lib_install/ubuntu-jammy/install-prebuilt-packages.sh @@ -17,6 +17,7 @@ DEBIAN_FRONTEND=noninteractive apt-get install --no-install-recommends -y \ jq \ libcurl4 \ libcurl4-openssl-dev \ + liblzma-dev \ libmariadb-dev \ libssl-dev \ openjdk-11-jdk \ From 61f9902b2926510eb048bc69838f7fe8ee656251 Mon Sep 17 00:00:00 2001 From: kirkrodrigues <2454684+kirkrodrigues@users.noreply.github.com> Date: Fri, 3 Jan 2025 13:39:13 -0500 Subject: [PATCH 2/5] build(boost): Update Boost's source URL (fixes #649). (#650) --- components/core/tools/scripts/lib_install/install-boost.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/components/core/tools/scripts/lib_install/install-boost.sh b/components/core/tools/scripts/lib_install/install-boost.sh index 40232caf8..2733e9886 100755 --- a/components/core/tools/scripts/lib_install/install-boost.sh +++ b/components/core/tools/scripts/lib_install/install-boost.sh @@ -29,7 +29,7 @@ cd $temp_dir # Download source tar_filename=boost_${version_with_underscores}.tar.gz -curl -fsSL https://boostorg.jfrog.io/artifactory/main/release/${version}/source/${tar_filename} -o ${tar_filename} +curl -fsSL https://archives.boost.io/release/${version}/source/${tar_filename} -o ${tar_filename} tar xzf ${tar_filename} cd boost_${version_with_underscores} From e1f70e7a64be8adffbe0921faf2770561fea6f56 Mon Sep 17 00:00:00 2001 From: Lin Zhihao <59785146+LinZhihao-723@users.noreply.github.com> Date: Mon, 6 Jan 2025 10:44:40 -0500 Subject: [PATCH 3/5] fix(ffi): Disallow input MessagePack maps that contain non-string keys or array values that contain unsupported types. (#570) Co-authored-by: kirkrodrigues <2454684+kirkrodrigues@users.noreply.github.com> --- .../core/src/clp/ffi/ir_stream/Serializer.cpp | 226 ++++++++++++------ .../core/src/clp/ffi/ir_stream/Serializer.hpp | 8 + .../core/tests/test-ir_encoding_methods.cpp | 97 ++++++++ 3 files changed, 255 insertions(+), 76 deletions(-) diff --git a/components/core/src/clp/ffi/ir_stream/Serializer.cpp b/components/core/src/clp/ffi/ir_stream/Serializer.cpp index b29cf0492..295ab356c 100644 --- a/components/core/src/clp/ffi/ir_stream/Serializer.cpp +++ b/components/core/src/clp/ffi/ir_stream/Serializer.cpp @@ -16,6 +16,7 @@ #include "../../ir/types.hpp" #include "../../time_types.hpp" +#include "../../TransactionManager.hpp" #include "../../type_utils.hpp" #include "../encoding_methods.hpp" #include "../SchemaTree.hpp" @@ -145,6 +146,16 @@ template vector& output_buf ) -> bool; +/** + * Checks whether the given msgpack array can be serialized into the key-value pair IR format. + * @param array + * @return true if the array is serializable. + * @return false if: + * - Any value inside the array has an unsupported type (i.e., `BIN` or `EXT`). + * - Any value inside the array has type `MAP` and the map has non-string keys. + */ +[[nodiscard]] auto is_msgpack_array_serializable(msgpack::object const& array) -> bool; + auto get_schema_tree_node_type_from_msgpack_val(msgpack::object const& val ) -> optional { optional ret_val; @@ -225,11 +236,56 @@ auto serialize_value_array( string& logtype_buf, vector& output_buf ) -> bool { + if (false == is_msgpack_array_serializable(val)) { + return false; + } std::ostringstream oss; oss << val; logtype_buf.clear(); return serialize_clp_string(oss.str(), logtype_buf, output_buf); } + +auto is_msgpack_array_serializable(msgpack::object const& array) -> bool { + vector validation_stack{&array}; + while (false == validation_stack.empty()) { + auto const* curr{validation_stack.back()}; + validation_stack.pop_back(); + if (msgpack::type::MAP == curr->type) { + // Validate map + // NOLINTNEXTLINE(cppcoreguidelines-pro-type-union-access) + auto const& as_map{curr->via.map}; + for (auto const& [key, value] : span{as_map.ptr, as_map.size}) { + if (msgpack::type::STR != key.type) { + return false; + } + if (msgpack::type::MAP == value.type || msgpack::type::ARRAY == value.type) { + validation_stack.push_back(&value); + } + } + continue; + } + + // Validate array + // NOLINTNEXTLINE(cppcoreguidelines-pro-type-union-access) + auto const& as_array{curr->via.array}; + for (auto const& obj : span{as_array.ptr, as_array.size}) { + switch (obj.type) { + case msgpack::type::BIN: + case msgpack::type::EXT: + // Unsupported types + return false; + case msgpack::type::ARRAY: + case msgpack::type::MAP: + validation_stack.push_back(&obj); + break; + default: + break; + } + } + } + + return true; +} } // namespace template @@ -282,86 +338,11 @@ auto Serializer::serialize_msgpack_map(msgpack::object_map c return true; } - m_schema_tree.take_snapshot(); m_schema_tree_node_buf.clear(); m_key_group_buf.clear(); m_value_group_buf.clear(); - // Traverse the map using DFS iteratively - bool failure{false}; - vector dfs_stack; - dfs_stack.emplace_back( - SchemaTree::cRootId, - span{msgpack_map.ptr, msgpack_map.size} - ); - while (false == dfs_stack.empty()) { - auto& curr{dfs_stack.back()}; - if (false == curr.has_next_child()) { - // Visited all children, so pop node - dfs_stack.pop_back(); - continue; - } - - // Convert the current value's type to its corresponding schema-tree node type - auto const& [key, val]{curr.get_next_child()}; - auto const opt_schema_tree_node_type{get_schema_tree_node_type_from_msgpack_val(val)}; - if (false == opt_schema_tree_node_type.has_value()) { - failure = true; - break; - } - auto const schema_tree_node_type{opt_schema_tree_node_type.value()}; - - SchemaTree::NodeLocator const locator{ - curr.get_schema_tree_node_id(), - key.as(), - schema_tree_node_type - }; - - // Get the schema-tree node that corresponds with the current kv-pair, or add it if it - // doesn't exist. - auto opt_schema_tree_node_id{m_schema_tree.try_get_node_id(locator)}; - if (false == opt_schema_tree_node_id.has_value()) { - opt_schema_tree_node_id.emplace(m_schema_tree.insert_node(locator)); - if (false == serialize_schema_tree_node(locator)) { - failure = true; - break; - } - } - auto const schema_tree_node_id{opt_schema_tree_node_id.value()}; - - if (msgpack::type::MAP == val.type) { - // Serialize map - // NOLINTNEXTLINE(cppcoreguidelines-pro-type-union-access) - auto const& inner_map{val.via.map}; - auto const inner_map_size(inner_map.size); - if (0 == inner_map_size) { - // Value is an empty map, so we can serialize it immediately - if (false == serialize_key(schema_tree_node_id)) { - failure = true; - break; - } - serialize_value_empty_object(m_value_group_buf); - } else { - // Add map for DFS iteration - dfs_stack.emplace_back( - schema_tree_node_id, - span{inner_map.ptr, inner_map_size} - ); - } - } else { - // Serialize primitive - if (false - == (serialize_key(schema_tree_node_id) && serialize_val(val, schema_tree_node_type) - )) - { - failure = true; - break; - } - } - } - - if (failure) { - m_schema_tree.revert(); + if (false == serialize_msgpack_map_using_dfs(msgpack_map)) { return false; } @@ -485,6 +466,92 @@ auto Serializer::serialize_val( return true; } +template +auto Serializer::serialize_msgpack_map_using_dfs( + msgpack::object_map const& msgpack_map +) -> bool { + m_schema_tree.take_snapshot(); + TransactionManager revert_manager{ + []() noexcept -> void {}, + [&]() noexcept -> void { m_schema_tree.revert(); } + }; + + vector dfs_stack; + dfs_stack.emplace_back( + SchemaTree::cRootId, + span{msgpack_map.ptr, msgpack_map.size} + ); + while (false == dfs_stack.empty()) { + auto& curr{dfs_stack.back()}; + if (false == curr.has_next_child()) { + // Visited all children, so pop node + dfs_stack.pop_back(); + continue; + } + + // Convert the current value's type to its corresponding schema-tree node type + auto const& [key, val]{curr.get_next_child()}; + if (msgpack::type::STR != key.type) { + // A map containing non-string keys is not serializable + return false; + } + + auto const opt_schema_tree_node_type{get_schema_tree_node_type_from_msgpack_val(val)}; + if (false == opt_schema_tree_node_type.has_value()) { + return false; + } + auto const schema_tree_node_type{opt_schema_tree_node_type.value()}; + + SchemaTree::NodeLocator const locator{ + curr.get_schema_tree_node_id(), + key.as(), + schema_tree_node_type + }; + + // Get the schema-tree node that corresponds with the current kv-pair, or add it if it + // doesn't exist. + auto opt_schema_tree_node_id{m_schema_tree.try_get_node_id(locator)}; + if (false == opt_schema_tree_node_id.has_value()) { + opt_schema_tree_node_id.emplace(m_schema_tree.insert_node(locator)); + if (false == serialize_schema_tree_node(locator)) { + return false; + } + } + auto const schema_tree_node_id{opt_schema_tree_node_id.value()}; + + if (msgpack::type::MAP == val.type) { + // Serialize map + // NOLINTNEXTLINE(cppcoreguidelines-pro-type-union-access) + auto const& inner_map{val.via.map}; + auto const inner_map_size(inner_map.size); + if (0 != inner_map_size) { + // Add map for DFS iteration + dfs_stack.emplace_back( + schema_tree_node_id, + span{inner_map.ptr, inner_map_size} + ); + } else { + // Value is an empty map, so we can serialize it immediately + if (false == serialize_key(schema_tree_node_id)) { + return false; + } + serialize_value_empty_object(m_value_group_buf); + } + continue; + } + + // Serialize primitive + if (false + == (serialize_key(schema_tree_node_id) && serialize_val(val, schema_tree_node_type))) + { + return false; + } + } + + revert_manager.mark_success(); + return true; +} + // Explicitly declare template specializations so that we can define the template methods in this // file template auto Serializer::create( @@ -524,4 +591,11 @@ template auto Serializer::serialize_val( msgpack::object const& val, SchemaTree::Node::Type schema_tree_node_type ) -> bool; + +template auto Serializer::serialize_msgpack_map_using_dfs( + msgpack::object_map const& msgpack_map +) -> bool; +template auto Serializer::serialize_msgpack_map_using_dfs( + msgpack::object_map const& msgpack_map +) -> bool; } // namespace clp::ffi::ir_stream diff --git a/components/core/src/clp/ffi/ir_stream/Serializer.hpp b/components/core/src/clp/ffi/ir_stream/Serializer.hpp index 14077ffba..edc6bf0cb 100644 --- a/components/core/src/clp/ffi/ir_stream/Serializer.hpp +++ b/components/core/src/clp/ffi/ir_stream/Serializer.hpp @@ -116,6 +116,14 @@ class Serializer { [[nodiscard]] auto serialize_val(msgpack::object const& val, SchemaTree::Node::Type schema_tree_node_type) -> bool; + /** + * Serializes the given msgpack map using a depth-first search (DFS). + * @param msgpack_map + * @return Whether serialization succeeded. + */ + [[nodiscard]] auto serialize_msgpack_map_using_dfs(msgpack::object_map const& msgpack_map + ) -> bool; + UtcOffset m_curr_utc_offset{0}; Buffer m_ir_buf; SchemaTree m_schema_tree; diff --git a/components/core/tests/test-ir_encoding_methods.cpp b/components/core/tests/test-ir_encoding_methods.cpp index 347dadb7a..578ae49f7 100644 --- a/components/core/tests/test-ir_encoding_methods.cpp +++ b/components/core/tests/test-ir_encoding_methods.cpp @@ -1,8 +1,11 @@ #include #include +#include #include +#include #include #include +#include #include #include #include @@ -208,6 +211,21 @@ template */ [[nodiscard]] auto count_num_leaves(nlohmann::json const& root) -> size_t; +/** + * Unpacks the given bytes into a msgpack object and asserts that serializing it into the KV-pair IR + * format fails. + * @tparam encoded_variable_t + * @param buffer A buffer containing a msgpack byte sequence that cannot be serialized into the + * KV-pair IR format. + * @param serializer + * @return Whether serialization failed and the underlying IR buffer remains empty. + */ +template +[[nodiscard]] auto unpack_and_assert_serialization_failure( + std::stringstream& buffer, + Serializer& serializer +) -> bool; + template [[nodiscard]] auto serialize_log_events( vector const& log_events, @@ -357,6 +375,29 @@ auto count_num_leaves(nlohmann::json const& root) -> size_t { return num_leaves; } + +template +auto unpack_and_assert_serialization_failure( + std::stringstream& buffer, + Serializer& serializer +) -> bool { + REQUIRE(serializer.get_ir_buf_view().empty()); + string msgpack_bytes{buffer.str()}; + buffer.str({}); + buffer.clear(); + auto const msgpack_obj_handle{msgpack::unpack(msgpack_bytes.data(), msgpack_bytes.size())}; + auto const msgpack_obj{msgpack_obj_handle.get()}; + REQUIRE((msgpack::type::MAP == msgpack_obj.type)); + if (serializer.serialize_msgpack_map(msgpack_obj.via.map)) { + // Serialization should fail + return false; + } + if (false == serializer.get_ir_buf_view().empty()) { + // Serialization buffer should be empty + return false; + } + return true; +} } // namespace /** @@ -1332,3 +1373,59 @@ TEMPLATE_TEST_CASE( output_buf )); } + +// NOLINTNEXTLINE(readability-function-cognitive-complexity) +TEMPLATE_TEST_CASE( + "ffi_ir_stream_Serializer_serialize_invalid_msgpack", + "[clp][ffi][ir_stream][Serializer]", + four_byte_encoded_variable_t, + eight_byte_encoded_variable_t +) { + auto result{Serializer::create()}; + REQUIRE((false == result.has_error())); + + std::stringstream msgpack_serialization_buffer; + auto& serializer{result.value()}; + serializer.clear_ir_buf(); + + auto assert_invalid_serialization = [&](T invalid_value) -> bool { + std::map const invalid_map{{"valid_key", invalid_value}}; + msgpack::pack(msgpack_serialization_buffer, invalid_map); + return unpack_and_assert_serialization_failure(msgpack_serialization_buffer, serializer); + }; + + std::map const map_with_integer_keys{{0, 0}, {1, 1}, {2, 2}}; + REQUIRE(assert_invalid_serialization(map_with_integer_keys)); + + std::map const map_with_invalid_submap{ + {"valid_key", map_with_integer_keys} + }; + REQUIRE(assert_invalid_serialization(map_with_invalid_submap)); + + std::tuple> const array_with_invalid_type{0, {0x00, 0x00, 0x00}}; + REQUIRE(assert_invalid_serialization(array_with_invalid_type)); + + std::tuple const subarray_with_invalid_type{ + 0, + array_with_invalid_type + }; + REQUIRE(assert_invalid_serialization(subarray_with_invalid_type)); + + std::tuple const array_with_invalid_map{ + 0, + map_with_integer_keys + }; + REQUIRE(assert_invalid_serialization(array_with_invalid_map)); + + std::tuple const subarray_with_invalid_map{ + 0, + array_with_invalid_map + }; + REQUIRE(assert_invalid_serialization(subarray_with_invalid_map)); + + std::tuple const array_with_invalid_submap{ + 0, + map_with_invalid_submap + }; + REQUIRE(assert_invalid_serialization(array_with_invalid_submap)); +} From 818be9e2eef61a5b33d81798ebef45a4b5b882e9 Mon Sep 17 00:00:00 2001 From: Eden Zhang <49173122+Eden-D-Zhang@users.noreply.github.com> Date: Wed, 8 Jan 2025 10:03:42 -0500 Subject: [PATCH 4/5] refactor(core-clp): Remove unused functions from `src/clp/Utils.*`. (#654) --- components/core/src/clp/Utils.cpp | 51 ---------------------------- components/core/src/clp/Utils.hpp | 22 ------------ components/core/tests/test-Utils.cpp | 40 ---------------------- 3 files changed, 113 deletions(-) diff --git a/components/core/src/clp/Utils.cpp b/components/core/src/clp/Utils.cpp index f487a3880..1c0fc05ca 100644 --- a/components/core/src/clp/Utils.cpp +++ b/components/core/src/clp/Utils.cpp @@ -88,57 +88,6 @@ ErrorCode create_directory_structure(string const& path, mode_t mode) { return ErrorCode_Success; } -string get_parent_directory_path(string const& path) { - string dirname = get_unambiguous_path(path); - - size_t last_slash_pos = dirname.find_last_of('/'); - if (0 == last_slash_pos) { - dirname = "/"; - } else if (string::npos == last_slash_pos) { - dirname = "."; - } else { - dirname.resize(last_slash_pos); - } - - return dirname; -} - -string get_unambiguous_path(string const& path) { - string unambiguous_path; - if (path.empty()) { - return unambiguous_path; - } - - // Break path into components - vector path_components; - boost::split(path_components, path, boost::is_any_of("/"), boost::token_compress_on); - - // Remove ambiguous components - list unambiguous_components; - size_t num_components_to_ignore = 0; - for (size_t i = path_components.size(); i-- > 0;) { - if (".." == path_components[i]) { - ++num_components_to_ignore; - } else if ("." == path_components[i] || path_components[i].empty()) { - // Do nothing - } else if (num_components_to_ignore > 0) { - --num_components_to_ignore; - } else { - unambiguous_components.emplace_front(path_components[i]); - } - } - - // Assemble unambiguous path from leading slash (if any) and the unambiguous components - if ('/' == path[0]) { - unambiguous_path += '/'; - } - if (!unambiguous_components.empty()) { - unambiguous_path += boost::join(unambiguous_components, "/"); - } - - return unambiguous_path; -} - ErrorCode read_list_of_paths(string const& list_path, vector& paths) { unique_ptr file_reader; try { diff --git a/components/core/src/clp/Utils.hpp b/components/core/src/clp/Utils.hpp index de7f81aae..3238e551b 100644 --- a/components/core/src/clp/Utils.hpp +++ b/components/core/src/clp/Utils.hpp @@ -35,28 +35,6 @@ ErrorCode create_directory(std::string const& path, mode_t mode, bool exist_ok); */ ErrorCode create_directory_structure(std::string const& path, mode_t mode); -/** - * Gets the parent directory path for a given path - * Corner cases: - * - get_dirname("abc") = "." - * - get_dirname(".") = "." - * - get_dirname("..") = "." - * - get_dirname("/") = "/" - * - get_dirname("/.") = "/" - * - get_dirname("/..") = "/" - * - get_dirname("/abc") = "/" - * @param path - * @return Parent directory path - */ -std::string get_parent_directory_path(std::string const& path); - -/** - * Removes ".", "..", and consecutive "/" from a given path and returns the result - * @param path The given path - * @return The unambiguous path - */ -std::string get_unambiguous_path(std::string const& path); - /** * Read a list of paths from a file * @param list_path diff --git a/components/core/tests/test-Utils.cpp b/components/core/tests/test-Utils.cpp index 603fb4be0..21d070f92 100644 --- a/components/core/tests/test-Utils.cpp +++ b/components/core/tests/test-Utils.cpp @@ -14,8 +14,6 @@ using clp::create_directory_structure; using clp::ErrorCode_Success; -using clp::get_parent_directory_path; -using clp::get_unambiguous_path; using std::string; TEST_CASE("create_directory_structure", "[create_directory_structure]") { @@ -44,41 +42,3 @@ TEST_CASE("create_directory_structure", "[create_directory_structure]") { REQUIRE(0 == rmdir("/tmp/5807")); } - -TEST_CASE("get_parent_directory_path", "[get_parent_directory_path]") { - // Corner cases - // Anything without a slash should return "." - REQUIRE(get_parent_directory_path(".") == "."); - REQUIRE(get_parent_directory_path("..") == "."); - REQUIRE(get_parent_directory_path("abc") == "."); - // A single slash, at the beginning, should always return "/" - REQUIRE(get_parent_directory_path("/") == "/"); - REQUIRE(get_parent_directory_path("/.") == "/"); - REQUIRE(get_parent_directory_path("/..") == "/"); - REQUIRE(get_parent_directory_path("/abc") == "/"); - - // Normal cases - REQUIRE(get_parent_directory_path("//abc/./def//../def/.///") == "/abc"); -} - -TEST_CASE("get_unambiguous_path", "[get_unambiguous_path]") { - // Base cases (should not modify anything) - REQUIRE(get_unambiguous_path("/") == "/"); - REQUIRE(get_unambiguous_path("abc") == "abc"); - REQUIRE(get_unambiguous_path("/abc") == "/abc"); - REQUIRE(get_unambiguous_path("/abc/def") == "/abc/def"); - - // Corner cases - REQUIRE(get_unambiguous_path(".").empty()); - REQUIRE(get_unambiguous_path("..").empty()); - REQUIRE(get_unambiguous_path("////") == "/"); - REQUIRE(get_unambiguous_path("/./.././//../") == "/"); - REQUIRE(get_unambiguous_path("./.././//../").empty()); - REQUIRE(get_unambiguous_path("/abc/def/.././../") == "/"); - REQUIRE(get_unambiguous_path("abc/def/.././../").empty()); - - // Normal cases - REQUIRE(get_unambiguous_path("/abc///def/../ghi/./") == "/abc/ghi"); - REQUIRE(get_unambiguous_path("abc///def/../ghi/./") == "abc/ghi"); - REQUIRE(get_unambiguous_path("../abc///def/../ghi/./") == "abc/ghi"); -} From 5d3b67145876d90defa45aeab0a37e9ad48aaddf Mon Sep 17 00:00:00 2001 From: Devin Gibson Date: Wed, 8 Jan 2025 15:50:36 -0500 Subject: [PATCH 5/5] feat(clp-s): Add support for ingesting logs from S3. (#639) Co-authored-by: wraymo <37269683+wraymo@users.noreply.github.com> --- components/core/CMakeLists.txt | 4 +- components/core/src/clp_s/ArchiveReader.cpp | 23 ++- components/core/src/clp_s/ArchiveReader.hpp | 9 +- components/core/src/clp_s/ArchiveWriter.cpp | 5 +- components/core/src/clp_s/ArchiveWriter.hpp | 1 - components/core/src/clp_s/CMakeLists.txt | 27 +++- .../core/src/clp_s/CommandLineArguments.cpp | 148 ++++++++++++++---- .../core/src/clp_s/CommandLineArguments.hpp | 13 +- components/core/src/clp_s/InputConfig.cpp | 88 +++++++++++ components/core/src/clp_s/InputConfig.hpp | 105 +++++++++++++ components/core/src/clp_s/JsonConstructor.cpp | 17 +- components/core/src/clp_s/JsonConstructor.hpp | 7 +- .../core/src/clp_s/JsonFileIterator.cpp | 19 +-- .../core/src/clp_s/JsonFileIterator.hpp | 16 +- components/core/src/clp_s/JsonParser.cpp | 61 +++++--- components/core/src/clp_s/JsonParser.hpp | 8 +- components/core/src/clp_s/ReaderUtils.cpp | 83 ++++++++-- components/core/src/clp_s/ReaderUtils.hpp | 14 +- components/core/src/clp_s/Utils.cpp | 134 ++++++++++++++-- components/core/src/clp_s/Utils.hpp | 50 +++++- .../core/src/clp_s/ZstdDecompressor.cpp | 12 +- components/core/src/clp_s/clp-s.cpp | 75 +++------ .../core/src/clp_s/search/kql/CMakeLists.txt | 1 + .../core/tests/test-clp_s-end_to_end.cpp | 14 +- 24 files changed, 717 insertions(+), 217 deletions(-) create mode 100644 components/core/src/clp_s/InputConfig.cpp create mode 100644 components/core/src/clp_s/InputConfig.hpp diff --git a/components/core/CMakeLists.txt b/components/core/CMakeLists.txt index 0995a0afb..f07a7db19 100644 --- a/components/core/CMakeLists.txt +++ b/components/core/CMakeLists.txt @@ -275,6 +275,8 @@ set(SOURCE_FILES_clp_s_unitTest src/clp_s/FileReader.hpp src/clp_s/FileWriter.cpp src/clp_s/FileWriter.hpp + src/clp_s/InputConfig.cpp + src/clp_s/InputConfig.hpp src/clp_s/JsonConstructor.cpp src/clp_s/JsonConstructor.hpp src/clp_s/JsonFileIterator.cpp @@ -613,7 +615,7 @@ target_include_directories(unitTest target_link_libraries(unitTest PRIVATE absl::flat_hash_map - Boost::filesystem Boost::iostreams Boost::program_options Boost::regex + Boost::filesystem Boost::iostreams Boost::program_options Boost::regex Boost::url ${CURL_LIBRARIES} fmt::fmt kql diff --git a/components/core/src/clp_s/ArchiveReader.cpp b/components/core/src/clp_s/ArchiveReader.cpp index 7c68b301d..738e8e645 100644 --- a/components/core/src/clp_s/ArchiveReader.cpp +++ b/components/core/src/clp_s/ArchiveReader.cpp @@ -4,20 +4,30 @@ #include #include "archive_constants.hpp" +#include "InputConfig.hpp" #include "ReaderUtils.hpp" using std::string_view; namespace clp_s { -void ArchiveReader::open(string_view archives_dir, string_view archive_id) { +void ArchiveReader::open(Path const& archive_path, NetworkAuthOption const& network_auth) { if (m_is_open) { throw OperationFailed(ErrorCodeNotReady, __FILENAME__, __LINE__); } m_is_open = true; - m_archive_id = archive_id; - std::filesystem::path archive_path{archives_dir}; - archive_path /= m_archive_id; - auto const archive_path_str = archive_path.string(); + + if (false == get_archive_id_from_path(archive_path, m_archive_id)) { + throw OperationFailed(ErrorCodeBadParam, __FILENAME__, __LINE__); + } + + if (InputSource::Filesystem != archive_path.source) { + throw OperationFailed(ErrorCodeBadParam, __FILENAME__, __LINE__); + } + + if (false == std::filesystem::is_directory(archive_path.path)) { + throw OperationFailed(ErrorCodeBadParam, __FILENAME__, __LINE__); + } + auto const archive_path_str = archive_path.path; m_var_dict = ReaderUtils::get_variable_dictionary_reader(archive_path_str); m_log_dict = ReaderUtils::get_log_type_dictionary_reader(archive_path_str); @@ -198,8 +208,9 @@ BaseColumnReader* ArchiveReader::append_reader_column(SchemaReader& reader, int3 column_reader = new DateStringColumnReader(column_id, m_timestamp_dict); break; // No need to push columns without associated object readers into the SchemaReader. - case NodeType::Object: + case NodeType::Metadata: case NodeType::NullValue: + case NodeType::Object: case NodeType::StructuredArray: case NodeType::Unknown: break; diff --git a/components/core/src/clp_s/ArchiveReader.hpp b/components/core/src/clp_s/ArchiveReader.hpp index 6b437dfd2..9e492720b 100644 --- a/components/core/src/clp_s/ArchiveReader.hpp +++ b/components/core/src/clp_s/ArchiveReader.hpp @@ -7,9 +7,8 @@ #include #include -#include - #include "DictionaryReader.hpp" +#include "InputConfig.hpp" #include "PackedStreamReader.hpp" #include "ReaderUtils.hpp" #include "SchemaReader.hpp" @@ -32,10 +31,10 @@ class ArchiveReader { /** * Opens an archive for reading. - * @param archives_dir - * @param archive_id + * @param archive_path + * @param network_auth */ - void open(std::string_view archives_dir, std::string_view archive_id); + void open(Path const& archive_path, NetworkAuthOption const& network_auth); /** * Reads the dictionaries and metadata. diff --git a/components/core/src/clp_s/ArchiveWriter.cpp b/components/core/src/clp_s/ArchiveWriter.cpp index d627479de..2a60013a9 100644 --- a/components/core/src/clp_s/ArchiveWriter.cpp +++ b/components/core/src/clp_s/ArchiveWriter.cpp @@ -270,9 +270,10 @@ void ArchiveWriter::initialize_schema_writer(SchemaWriter* writer, Schema const& case NodeType::DateString: writer->append_column(new DateStringColumnWriter(id)); break; - case NodeType::StructuredArray: - case NodeType::Object: + case NodeType::Metadata: case NodeType::NullValue: + case NodeType::Object: + case NodeType::StructuredArray: case NodeType::Unknown: break; } diff --git a/components/core/src/clp_s/ArchiveWriter.hpp b/components/core/src/clp_s/ArchiveWriter.hpp index 82a0122bc..a76d15daf 100644 --- a/components/core/src/clp_s/ArchiveWriter.hpp +++ b/components/core/src/clp_s/ArchiveWriter.hpp @@ -4,7 +4,6 @@ #include #include -#include #include #include diff --git a/components/core/src/clp_s/CMakeLists.txt b/components/core/src/clp_s/CMakeLists.txt index 9ca0c947e..f1fa3857c 100644 --- a/components/core/src/clp_s/CMakeLists.txt +++ b/components/core/src/clp_s/CMakeLists.txt @@ -2,6 +2,17 @@ add_subdirectory(search/kql) set( CLP_SOURCES + ../clp/aws/AwsAuthenticationSigner.cpp + ../clp/aws/AwsAuthenticationSigner.hpp + ../clp/BoundedReader.cpp + ../clp/BoundedReader.hpp + ../clp/CurlDownloadHandler.cpp + ../clp/CurlDownloadHandler.hpp + ../clp/CurlEasyHandle.hpp + ../clp/CurlGlobalInstance.cpp + ../clp/CurlGlobalInstance.hpp + ../clp/CurlOperationFailed.hpp + ../clp/CurlStringList.hpp ../clp/cli_utils.cpp ../clp/cli_utils.hpp ../clp/database_utils.cpp @@ -28,11 +39,15 @@ set( ../clp/ffi/Value.hpp ../clp/FileDescriptor.cpp ../clp/FileDescriptor.hpp + ../clp/FileReader.cpp + ../clp/FileReader.hpp ../clp/GlobalMetadataDB.hpp ../clp/GlobalMetadataDBConfig.cpp ../clp/GlobalMetadataDBConfig.hpp ../clp/GlobalMySQLMetadataDB.cpp ../clp/GlobalMySQLMetadataDB.hpp + ../clp/hash_utils.cpp + ../clp/hash_utils.hpp ../clp/ir/EncodedTextAst.cpp ../clp/ir/EncodedTextAst.hpp ../clp/ir/parsing.cpp @@ -43,18 +58,24 @@ set( ../clp/MySQLParamBindings.hpp ../clp/MySQLPreparedStatement.cpp ../clp/MySQLPreparedStatement.hpp + ../clp/NetworkReader.cpp + ../clp/NetworkReader.hpp ../clp/networking/socket_utils.cpp ../clp/networking/socket_utils.hpp ../clp/ReaderInterface.cpp ../clp/ReaderInterface.hpp ../clp/ReadOnlyMemoryMappedFile.cpp ../clp/ReadOnlyMemoryMappedFile.hpp + ../clp/spdlog_with_specializations.hpp ../clp/streaming_archive/ArchiveMetadata.cpp ../clp/streaming_archive/ArchiveMetadata.hpp ../clp/streaming_compression/zstd/Decompressor.cpp ../clp/streaming_compression/zstd/Decompressor.hpp + ../clp/Thread.cpp + ../clp/Thread.hpp ../clp/TraceableException.hpp ../clp/time_types.hpp + ../clp/type_utils.hpp ../clp/utf8_utils.cpp ../clp/utf8_utils.hpp ../clp/WriterInterface.cpp @@ -89,6 +110,8 @@ set( FileReader.hpp FileWriter.cpp FileWriter.hpp + InputConfig.cpp + InputConfig.hpp JsonConstructor.cpp JsonConstructor.hpp JsonFileIterator.cpp @@ -226,12 +249,14 @@ target_link_libraries( clp-s PRIVATE absl::flat_hash_map - Boost::filesystem Boost::iostreams Boost::program_options + Boost::iostreams Boost::program_options Boost::regex Boost::url + ${CURL_LIBRARIES} clp::string_utils kql MariaDBClient::MariaDBClient ${MONGOCXX_TARGET} msgpack-cxx + OpenSSL::Crypto simdjson spdlog::spdlog yaml-cpp::yaml-cpp diff --git a/components/core/src/clp_s/CommandLineArguments.cpp b/components/core/src/clp_s/CommandLineArguments.cpp index 4218d9d60..e6cd07163 100644 --- a/components/core/src/clp_s/CommandLineArguments.cpp +++ b/components/core/src/clp_s/CommandLineArguments.cpp @@ -1,8 +1,11 @@ #include "CommandLineArguments.hpp" +#include #include +#include #include +#include #include #include "../clp/cli_utils.hpp" @@ -14,6 +17,10 @@ namespace po = boost::program_options; namespace clp_s { namespace { +// Authorization method constants +constexpr std::string_view cNoAuth{"none"}; +constexpr std::string_view cS3Auth{"s3"}; + /** * Read a list of newline-delimited paths from a file and put them into a vector passed by reference * TODO: deduplicate this code with the version in clp @@ -55,6 +62,55 @@ bool read_paths_from_file( } return true; } + +/** + * Validates and populates network authorization options. + * @param auth_method + * @param network_auth + * @throws std::invalid_argument if the authorization option is invalid + */ +void validate_network_auth(std::string_view auth_method, NetworkAuthOption& auth) { + if (cS3Auth == auth_method) { + auth.method = AuthMethod::S3PresignedUrlV4; + } else if (cNoAuth != auth_method) { + throw std::invalid_argument(fmt::format("Invalid authentication type \"{}\"", auth_method)); + } +} + +/** + * Validates and populates archive paths. + * @param archive_path + * @param archive_id + * @param archive_paths + * @throws std::invalid_argument on any error + */ +void validate_archive_paths( + std::string_view archive_path, + std::string_view archive_id, + std::vector& archive_paths +) { + if (archive_path.empty()) { + throw std::invalid_argument("No archive path specified"); + } + + if (false == archive_id.empty()) { + auto archive_fs_path = std::filesystem::path(archive_path) / archive_id; + std::error_code ec; + if (false == std::filesystem::exists(archive_fs_path, ec) || ec) { + throw std::invalid_argument("Requested archive does not exist"); + } + archive_paths.emplace_back(clp_s::Path{ + .source = clp_s::InputSource::Filesystem, + .path = archive_fs_path.string() + }); + } else if (false == get_input_archives_for_raw_path(archive_path, archive_paths)) { + throw std::invalid_argument("Invalid archive path"); + } + + if (archive_paths.empty()) { + throw std::invalid_argument("No archive paths specified"); + } +} } // namespace CommandLineArguments::ParsingResult @@ -133,6 +189,7 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) { if (Command::Compress == m_command) { po::options_description compression_positional_options; + std::vector input_paths; // clang-format off compression_positional_options.add_options()( "archives-dir", @@ -140,7 +197,7 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) { "output directory" )( "input-paths", - po::value>(&m_file_paths)->value_name("PATHS"), + po::value>(&input_paths)->value_name("PATHS"), "input paths" ); // clang-format on @@ -151,6 +208,7 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) { constexpr std::string_view cJsonFileType{"json"}; constexpr std::string_view cKeyValueIrFileType{"kv-ir"}; std::string file_type{cJsonFileType}; + std::string auth{cNoAuth}; // clang-format off compression_options.add_options()( "compression-level", @@ -209,6 +267,14 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) { "file-type", po::value(&file_type)->value_name("FILE_TYPE")->default_value(file_type), "The type of file being compressed (json or kv-ir)" + )( + "auth", + po::value(&auth) + ->value_name("AUTH_METHOD") + ->default_value(auth), + "Type of authentication required for network requests (s3 | none). Authentication" + " with s3 requires the AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY environment" + " variables." ); // clang-format on @@ -252,13 +318,19 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) { } if (false == input_path_list_file_path.empty()) { - if (false == read_paths_from_file(input_path_list_file_path, m_file_paths)) { + if (false == read_paths_from_file(input_path_list_file_path, input_paths)) { SPDLOG_ERROR("Failed to read paths from {}", input_path_list_file_path); return ParsingResult::Failure; } } - if (m_file_paths.empty()) { + for (auto const& path : input_paths) { + if (false == get_input_files_for_raw_path(path, m_input_paths)) { + throw std::invalid_argument(fmt::format("Invalid input path \"{}\".", path)); + } + } + + if (m_input_paths.empty()) { throw std::invalid_argument("No input paths specified."); } @@ -286,6 +358,8 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) { throw std::invalid_argument("Unknown FILE_TYPE: " + file_type); } + validate_network_auth(auth, m_network_auth); + // Parse and validate global metadata DB config if (false == metadata_db_config_file_path.empty()) { clp::GlobalMetadataDBConfig metadata_db_config; @@ -310,11 +384,12 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) { } } else if ((char)Command::Extract == command_input) { po::options_description extraction_options; + std::string archive_path; // clang-format off extraction_options.add_options()( - "archives-dir", - po::value(&m_archives_dir), - "The directory containing the archives" + "archive-path", + po::value(&archive_path), + "Path to a directory containing archives, or the path to a single archive" )( "output-dir", po::value(&m_output_dir), @@ -322,15 +397,9 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) { ); // clang-format on - po::options_description input_options("Input Options"); - input_options.add_options()( - "archive-id", - po::value(&m_archive_id)->value_name("ID"), - "ID of the archive to decompress" - ); - extraction_options.add(input_options); - po::options_description decompression_options("Decompression Options"); + std::string auth{cNoAuth}; + std::string archive_id; // clang-format off decompression_options.add_options()( "ordered", @@ -343,6 +412,19 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) { ->value_name("SIZE"), "Chunk size (B) for each output file when decompressing records in log order." " When set to 0, no chunking is performed." + )( + "archive-id", + po::value(&archive_id)->value_name("ID"), + "Limit decompression to the archive with the given ID in a subdirectory of" + " archive-path" + )( + "auth", + po::value(&auth) + ->value_name("AUTH_METHOD") + ->default_value(auth), + "Type of authentication required for network requests (s3 | none). Authentication" + " with s3 requires the AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY environment" + " variables." ); // clang-format on extraction_options.add(decompression_options); @@ -362,7 +444,7 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) { extraction_options.add(output_metadata_options); po::positional_options_description positional_options; - positional_options.add("archives-dir", 1); + positional_options.add("archive-path", 1); positional_options.add("output-dir", 1); std::vector unrecognized_options @@ -390,16 +472,15 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) { po::options_description visible_options; visible_options.add(general_options); - visible_options.add(input_options); visible_options.add(decompression_options); visible_options.add(output_metadata_options); std::cerr << visible_options << std::endl; return ParsingResult::InfoCommand; } - if (m_archives_dir.empty()) { - throw std::invalid_argument("No archives directory specified"); - } + validate_archive_paths(archive_path, archive_id, m_input_paths); + + validate_network_auth(auth, m_network_auth); if (m_output_dir.empty()) { throw std::invalid_argument("No output directory specified"); @@ -430,11 +511,12 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) { po::options_description search_options; std::string output_handler_name; + std::string archive_path; // clang-format off search_options.add_options()( - "archives-dir", - po::value(&m_archives_dir), - "The directory containing the archives" + "archive-path", + po::value(&archive_path), + "Path to a directory containing archives, or the path to a single archive" )( "query,q", po::value(&m_query), @@ -448,12 +530,14 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) { ); // clang-format on po::positional_options_description positional_options; - positional_options.add("archives-dir", 1); + positional_options.add("archive-path", 1); positional_options.add("query", 1); positional_options.add("output-handler", 1); positional_options.add("output-handler-args", -1); po::options_description match_options("Match Controls"); + std::string auth{cNoAuth}; + std::string archive_id; // clang-format off match_options.add_options()( "tge", @@ -469,8 +553,8 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) { "Ignore case distinctions between values in the query and the compressed data" )( "archive-id", - po::value(&m_archive_id)->value_name("ID"), - "Limit search to the archive with the given ID" + po::value(&archive_id)->value_name("ID"), + "Limit search to the archive with the given ID in a subdirectory of archive-path" )( "projection", po::value>(&m_projection_columns) @@ -479,6 +563,14 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) { "Project only the given set of columns for matching results. This option must be" " specified after all positional options. Values that are objects or structured" " arrays are currently unsupported." + )( + "auth", + po::value(&auth) + ->value_name("AUTH_METHOD") + ->default_value(auth), + "Type of authentication required for network requests (s3 | none). Authentication" + " with s3 requires the AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY environment" + " variables." ); // clang-format on search_options.add(match_options); @@ -630,9 +722,9 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) { return ParsingResult::InfoCommand; } - if (m_archives_dir.empty()) { - throw std::invalid_argument("No archives directory specified"); - } + validate_archive_paths(archive_path, archive_id, m_input_paths); + + validate_network_auth(auth, m_network_auth); if (m_query.empty()) { throw std::invalid_argument("No query specified"); diff --git a/components/core/src/clp_s/CommandLineArguments.hpp b/components/core/src/clp_s/CommandLineArguments.hpp index 47c244646..17ee77369 100644 --- a/components/core/src/clp_s/CommandLineArguments.hpp +++ b/components/core/src/clp_s/CommandLineArguments.hpp @@ -12,6 +12,7 @@ #include "../clp/GlobalMetadataDBConfig.hpp" #include "../reducer/types.hpp" #include "Defs.hpp" +#include "InputConfig.hpp" namespace clp_s { class CommandLineArguments { @@ -51,7 +52,9 @@ class CommandLineArguments { Command get_command() const { return m_command; } - std::vector const& get_file_paths() const { return m_file_paths; } + std::vector const& get_input_paths() const { return m_input_paths; } + + NetworkAuthOption const& get_network_auth() const { return m_network_auth; } std::string const& get_archives_dir() const { return m_archives_dir; } @@ -87,8 +90,6 @@ class CommandLineArguments { bool get_ignore_case() const { return m_ignore_case; } - std::string const& get_archive_id() const { return m_archive_id; } - std::optional const& get_metadata_db_config() const { return m_metadata_db_config; } @@ -177,7 +178,8 @@ class CommandLineArguments { Command m_command; // Compression and decompression variables - std::vector m_file_paths; + std::vector m_input_paths; + NetworkAuthOption m_network_auth{}; std::string m_archives_dir; std::string m_output_dir; std::string m_timestamp_key; @@ -213,9 +215,6 @@ class CommandLineArguments { bool m_ignore_case{false}; std::vector m_projection_columns; - // Decompression and search variables - std::string m_archive_id; - // Search aggregation variables std::string m_reducer_host; int m_reducer_port{-1}; diff --git a/components/core/src/clp_s/InputConfig.cpp b/components/core/src/clp_s/InputConfig.cpp new file mode 100644 index 000000000..83c19a5c3 --- /dev/null +++ b/components/core/src/clp_s/InputConfig.cpp @@ -0,0 +1,88 @@ +#include "InputConfig.hpp" + +#include +#include +#include +#include + +#include "Utils.hpp" + +namespace clp_s { +auto get_source_for_path(std::string_view const path) -> InputSource { + try { + return std::filesystem::exists(path) ? InputSource::Filesystem : InputSource::Network; + } catch (std::exception const& e) { + return InputSource::Network; + } +} + +auto get_path_object_for_raw_path(std::string_view const path) -> Path { + return Path{.source = get_source_for_path(path), .path = std::string{path}}; +} + +auto get_input_files_for_raw_path(std::string_view const path, std::vector& files) -> bool { + return get_input_files_for_path(get_path_object_for_raw_path(path), files); +} + +auto get_input_files_for_path(Path const& path, std::vector& files) -> bool { + if (InputSource::Network == path.source) { + files.emplace_back(path); + return true; + } + + if (false == std::filesystem::is_directory(path.path)) { + files.emplace_back(path); + return true; + } + + std::vector file_paths; + if (false == FileUtils::find_all_files_in_directory(path.path, file_paths)) { + return false; + } + + for (auto& file : file_paths) { + files.emplace_back(Path{.source = InputSource::Filesystem, .path = std::move(file)}); + } + return true; +} + +auto get_input_archives_for_raw_path(std::string_view const path, std::vector& archives) + -> bool { + return get_input_archives_for_path(get_path_object_for_raw_path(path), archives); +} + +auto get_input_archives_for_path(Path const& path, std::vector& archives) -> bool { + if (InputSource::Network == path.source) { + archives.emplace_back(path); + return true; + } + + if (false == std::filesystem::is_directory(path.path)) { + archives.emplace_back(path); + return true; + } + + std::vector archive_paths; + if (false == FileUtils::find_all_archives_in_directory(path.path, archive_paths)) { + return false; + } + + for (auto& archive : archive_paths) { + archives.emplace_back(Path{.source = InputSource::Filesystem, .path = std::move(archive)}); + } + return true; +} + +auto get_archive_id_from_path(Path const& archive_path, std::string& archive_id) -> bool { + switch (archive_path.source) { + case InputSource::Network: + return UriUtils::get_last_uri_component(archive_path.path, archive_id); + case InputSource::Filesystem: + return FileUtils::get_last_non_empty_path_component(archive_path.path, archive_id); + default: + return false; + } + return true; +} + +} // namespace clp_s diff --git a/components/core/src/clp_s/InputConfig.hpp b/components/core/src/clp_s/InputConfig.hpp new file mode 100644 index 000000000..3672bb0ae --- /dev/null +++ b/components/core/src/clp_s/InputConfig.hpp @@ -0,0 +1,105 @@ +#ifndef CLP_S_INPUTCONFIG_HPP +#define CLP_S_INPUTCONFIG_HPP + +#include +#include +#include +#include +#include + +namespace clp_s { +// Constants used for input configuration +constexpr char cAwsAccessKeyIdEnvVar[] = "AWS_ACCESS_KEY_ID"; +constexpr char cAwsSecretAccessKeyEnvVar[] = "AWS_SECRET_ACCESS_KEY"; + +/** + * Enum class defining the source of a resource. + */ +enum class InputSource : uint8_t { + Filesystem, + Network +}; + +/** + * Enum class defining the authentication method required for accessing a resource. + */ +enum class AuthMethod : uint8_t { + None, + S3PresignedUrlV4 +}; + +/** + * Struct encapsulating information needed to authenticate network requests. + */ +struct NetworkAuthOption { + AuthMethod method{AuthMethod::None}; +}; + +/** + * Struct representing a resource path with its source type. + */ +struct Path { + InputSource source{InputSource::Filesystem}; + std::string path; +}; + +/** + * Determines the input source for a given raw path or url. + * @param path + * @return the InputSource for the given path + */ +[[nodiscard]] auto get_source_for_path(std::string_view const path) -> InputSource; + +/** + * Determines the input source for a given raw path or url and converts the path into a Path object. + * @param path + * @return a Path object representing the raw path or url + */ +[[nodiscard]] auto get_path_object_for_raw_path(std::string_view const path) -> Path; + +/** + * Recursively collects all file paths from the given raw path, including the path itself. + * @param path + * @param files Returned paths + * @return true on success, false otherwise + */ +auto get_input_files_for_raw_path(std::string_view const path, std::vector& files) -> bool; + +/** + * Recursively collects all file paths that are children of the the given Path, including the Path + * itself. + * @param path + * @param files Returned paths + * @return true on success, false otherwise + */ +[[nodiscard]] auto get_input_files_for_path(Path const& path, std::vector& files) -> bool; + +/** + * Collects all archives that are children of the given raw path, including the path itself. + * @param path + * @param archives Returned archives + * @return true on success, false otherwise + */ +[[nodiscard]] auto +get_input_archives_for_raw_path(std::string_view const path, std::vector& archives) -> bool; + +/** + * Collects all archives from the given Path, including the Path itself. + * @param path + * @param archives Returned archives + * @return true on success, false otherwise + */ +[[nodiscard]] auto +get_input_archives_for_path(Path const& path, std::vector& archives) -> bool; + +/** + * Determines the archive id of an archive based on the archive path. + * @param path + * @param archive_id Returned archive id + * @return true on success, false otherwise + */ +[[nodiscard]] auto +get_archive_id_from_path(Path const& archive_path, std::string& archive_id) -> bool; +} // namespace clp_s + +#endif // CLP_S_INPUTCONFIG_HPP diff --git a/components/core/src/clp_s/JsonConstructor.cpp b/components/core/src/clp_s/JsonConstructor.cpp index 8886f2074..f1363549d 100644 --- a/components/core/src/clp_s/JsonConstructor.cpp +++ b/components/core/src/clp_s/JsonConstructor.cpp @@ -31,22 +31,11 @@ JsonConstructor::JsonConstructor(JsonConstructorOption const& option) : m_option ) ); } - - std::filesystem::path archive_path{m_option.archives_dir}; - archive_path /= m_option.archive_id; - if (false == std::filesystem::is_directory(archive_path)) { - throw OperationFailed( - ErrorCodeFailure, - __FILENAME__, - __LINE__, - fmt::format("'{}' is not a directory", archive_path.c_str()) - ); - } } void JsonConstructor::store() { m_archive_reader = std::make_unique(); - m_archive_reader->open(m_option.archives_dir, m_option.archive_id); + m_archive_reader->open(m_option.archive_path, m_option.network_auth); m_archive_reader->read_dictionaries_and_metadata(); if (m_option.ordered && false == m_archive_reader->has_log_order()) { @@ -84,7 +73,7 @@ void JsonConstructor::construct_in_order() { int64_t first_idx{}; int64_t last_idx{}; size_t chunk_size{}; - auto src_path = std::filesystem::path(m_option.output_dir) / m_option.archive_id; + auto src_path = std::filesystem::path(m_option.output_dir) / m_archive_reader->get_archive_id(); FileWriter writer; writer.open(src_path, FileWriter::OpenMode::CreateForWriting); @@ -123,7 +112,7 @@ void JsonConstructor::construct_in_order() { ), bsoncxx::builder::basic::kvp( constants::results_cache::decompression::cStreamId, - m_option.archive_id + std::string{m_archive_reader->get_archive_id()} ), bsoncxx::builder::basic::kvp( constants::results_cache::decompression::cBeginMsgIx, diff --git a/components/core/src/clp_s/JsonConstructor.hpp b/components/core/src/clp_s/JsonConstructor.hpp index 3d9228a02..533d335b4 100644 --- a/components/core/src/clp_s/JsonConstructor.hpp +++ b/components/core/src/clp_s/JsonConstructor.hpp @@ -11,6 +11,7 @@ #include "DictionaryReader.hpp" #include "ErrorCode.hpp" #include "FileWriter.hpp" +#include "InputConfig.hpp" #include "SchemaReader.hpp" #include "SchemaTree.hpp" #include "TraceableException.hpp" @@ -26,12 +27,12 @@ struct MetadataDbOption { }; struct JsonConstructorOption { - std::string archives_dir; - std::string archive_id; + Path archive_path{}; + NetworkAuthOption network_auth{}; std::string output_dir; bool ordered{false}; size_t target_ordered_chunk_size{}; - std::optional metadata_db; + std::optional metadata_db{std::nullopt}; }; class JsonConstructor { diff --git a/components/core/src/clp_s/JsonFileIterator.cpp b/components/core/src/clp_s/JsonFileIterator.cpp index ad6d16cd0..a0a003d9f 100644 --- a/components/core/src/clp_s/JsonFileIterator.cpp +++ b/components/core/src/clp_s/JsonFileIterator.cpp @@ -7,28 +7,19 @@ namespace clp_s { JsonFileIterator::JsonFileIterator( - std::string const& file_name, + clp::ReaderInterface& reader, size_t max_document_size, size_t buf_size ) : m_buf_size(buf_size), m_max_document_size(max_document_size), - m_buf(new char[buf_size + simdjson::SIMDJSON_PADDING]) { - try { - m_reader.open(file_name); - } catch (FileReader::OperationFailed& e) { - SPDLOG_ERROR("Failed to open {} for reading - {}", file_name, e.what()); - return; - } - + m_buf(new char[buf_size + simdjson::SIMDJSON_PADDING]), + m_reader(reader) { read_new_json(); } JsonFileIterator::~JsonFileIterator() { delete[] m_buf; - if (m_reader.is_open()) { - m_reader.close(); - } } bool JsonFileIterator::read_new_json() { @@ -59,9 +50,9 @@ bool JsonFileIterator::read_new_json() { m_buf_occupied += size_read; m_bytes_read += size_read; - if (ErrorCodeEndOfFile == file_error) { + if (clp::ErrorCode::ErrorCode_EndOfFile == file_error) { m_eof = true; - } else if (ErrorCodeSuccess != file_error) { + } else if (clp::ErrorCode::ErrorCode_Success != file_error) { m_error_code = simdjson::error_code::IO_ERROR; return false; } diff --git a/components/core/src/clp_s/JsonFileIterator.hpp b/components/core/src/clp_s/JsonFileIterator.hpp index b8db3f4f2..5464d56df 100644 --- a/components/core/src/clp_s/JsonFileIterator.hpp +++ b/components/core/src/clp_s/JsonFileIterator.hpp @@ -3,13 +3,13 @@ #include -#include "FileReader.hpp" +#include "../clp/ReaderInterface.hpp" namespace clp_s { class JsonFileIterator { public: /** - * An iterator over a file containing json objects. JSON is parsed + * An iterator over an input stream containing json objects. JSON is parsed * using simdjson::parse_many. This allows simdjson to efficiently find * delimeters between JSON objects, and if enabled parse JSON ahead of time * in another thread while the JSON is being iterated over. @@ -17,12 +17,12 @@ class JsonFileIterator { * The buffer grows automatically if there are JSON objects larger than the buffer size. * The buffer is padded to be SIMDJSON_PADDING bytes larger than the specified size. - * @param file_name the file containing JSON + * @param reader the input stream containing JSON * @param max_document_size the maximum allowed size of a single document * @param buf_size the initial buffer size */ explicit JsonFileIterator( - std::string const& file_name, + clp::ReaderInterface& reader, size_t max_document_size, size_t buf_size = 1024 * 1024 /*1MB default*/ ); @@ -35,12 +35,6 @@ class JsonFileIterator { */ [[nodiscard]] bool get_json(simdjson::ondemand::document_stream::iterator& it); - /** - * Checks if the file is open - * @return true if the file opened successfully - */ - [[nodiscard]] bool is_open() const { return m_reader.is_open(); } - /** * @return number of truncated bytes after json documents */ @@ -86,7 +80,7 @@ class JsonFileIterator { size_t m_buf_occupied{0}; size_t m_max_document_size{0}; char* m_buf{nullptr}; - FileReader m_reader; + clp::ReaderInterface& m_reader; simdjson::ondemand::parser m_parser; simdjson::ondemand::document_stream m_stream; bool m_eof{false}; diff --git a/components/core/src/clp_s/JsonParser.cpp b/components/core/src/clp_s/JsonParser.cpp index c917b1f09..21e3b0cfd 100644 --- a/components/core/src/clp_s/JsonParser.cpp +++ b/components/core/src/clp_s/JsonParser.cpp @@ -2,12 +2,14 @@ #include #include +#include #include #include #include #include #include +#include #include #include @@ -19,11 +21,14 @@ #include "../clp/ffi/utils.hpp" #include "../clp/ffi/Value.hpp" #include "../clp/ir/EncodedTextAst.hpp" +#include "../clp/NetworkReader.hpp" +#include "../clp/ReaderInterface.hpp" #include "../clp/streaming_compression/zstd/Decompressor.hpp" #include "../clp/time_types.hpp" #include "archive_constants.hpp" #include "ErrorCode.hpp" #include "JsonFileIterator.hpp" +#include "JsonParser.hpp" using clp::ffi::ir_stream::Deserializer; using clp::ffi::ir_stream::IRErrorCode; @@ -79,11 +84,9 @@ JsonParser::JsonParser(JsonParserOption const& option) m_max_document_size(option.max_document_size), m_timestamp_key(option.timestamp_key), m_structurize_arrays(option.structurize_arrays), - m_record_log_order(option.record_log_order) { - if (false == FileUtils::validate_path(option.file_paths)) { - exit(1); - } - + m_record_log_order(option.record_log_order), + m_input_paths(option.input_paths), + m_network_auth(option.network_auth) { if (false == m_timestamp_key.empty()) { if (false == clp_s::StringUtils::tokenize_column_descriptor(m_timestamp_key, m_timestamp_column)) @@ -93,10 +96,6 @@ JsonParser::JsonParser(JsonParserOption const& option) } } - for (auto& file_path : option.file_paths) { - FileUtils::find_all_files(file_path, m_file_paths); - } - m_archive_options.archives_dir = option.archives_dir; m_archive_options.compression_level = option.compression_level; m_archive_options.print_archive_stats = option.print_archive_stats; @@ -490,18 +489,19 @@ void JsonParser::parse_line(ondemand::value line, int32_t parent_node_id, std::s } bool JsonParser::parse() { - for (auto& file_path : m_file_paths) { - JsonFileIterator json_file_iterator(file_path, m_max_document_size); - if (false == json_file_iterator.is_open()) { + for (auto const& path : m_input_paths) { + auto reader{ReaderUtils::try_create_reader(path, m_network_auth)}; + if (nullptr == reader) { m_archive_writer->close(); return false; } + JsonFileIterator json_file_iterator(*reader, m_max_document_size); if (simdjson::error_code::SUCCESS != json_file_iterator.get_error()) { SPDLOG_ERROR( "Encountered error - {} - while trying to parse {} after parsing 0 bytes", simdjson::error_message(json_file_iterator.get_error()), - file_path + path.path ); m_archive_writer->close(); return false; @@ -535,7 +535,7 @@ bool JsonParser::parse() { SPDLOG_ERROR( "Encountered non-json-object while trying to parse {} after parsing {} " "bytes", - file_path, + path.path, bytes_consumed_up_to_prev_record ); m_archive_writer->close(); @@ -560,7 +560,7 @@ bool JsonParser::parse() { SPDLOG_ERROR( "Encountered error - {} - while trying to parse {} after parsing {} bytes", error.what(), - file_path, + path.path, bytes_consumed_up_to_prev_record ); m_archive_writer->close(); @@ -594,7 +594,7 @@ bool JsonParser::parse() { SPDLOG_ERROR( "Encountered error - {} - while trying to parse {} after parsing {} bytes", simdjson::error_message(json_file_iterator.get_error()), - file_path, + path.path, bytes_consumed_up_to_prev_record ); m_archive_writer->close(); @@ -604,9 +604,27 @@ bool JsonParser::parse() { SPDLOG_WARN( "Truncated JSON ({} bytes) at end of file {}", json_file_iterator.truncated_bytes(), - file_path.c_str() + path.path ); } + + if (auto network_reader = std::dynamic_pointer_cast(reader); + nullptr != network_reader) + { + if (auto const rc = network_reader->get_curl_ret_code(); + rc.has_value() && CURLcode::CURLE_OK != rc.value()) + { + auto const curl_error_message = network_reader->get_curl_error_msg(); + SPDLOG_ERROR( + "Encountered curl error while ingesting {} - Code: {} - Message: {}", + path.path, + static_cast(rc.value()), + curl_error_message.value_or("Unknown error") + ); + m_archive_writer->close(); + return false; + } + } } return true; } @@ -835,11 +853,16 @@ void JsonParser::parse_kv_log_event(KeyValuePairLogEvent const& kv) { } auto JsonParser::parse_from_ir() -> bool { - for (auto& file_path : m_file_paths) { + for (auto const& path : m_input_paths) { + // TODO: add support for ingesting IR from a network source + if (InputSource::Filesystem != path.source) { + m_archive_writer->close(); + return false; + } clp::streaming_compression::zstd::Decompressor decompressor; size_t curr_pos{}; size_t last_pos{}; - decompressor.open(file_path); + decompressor.open(path.path); auto deserializer_result{Deserializer::create(decompressor, IrUnitHandler{}) }; diff --git a/components/core/src/clp_s/JsonParser.hpp b/components/core/src/clp_s/JsonParser.hpp index a89c746c7..12199df6c 100644 --- a/components/core/src/clp_s/JsonParser.hpp +++ b/components/core/src/clp_s/JsonParser.hpp @@ -23,7 +23,9 @@ #include "DictionaryWriter.hpp" #include "FileReader.hpp" #include "FileWriter.hpp" +#include "InputConfig.hpp" #include "ParsedMessage.hpp" +#include "ReaderUtils.hpp" #include "Schema.hpp" #include "SchemaMap.hpp" #include "SchemaTree.hpp" @@ -37,7 +39,7 @@ using clp::ffi::KeyValuePairLogEvent; namespace clp_s { struct JsonParserOption { - std::vector file_paths; + std::vector input_paths; CommandLineArguments::FileType input_file_type{CommandLineArguments::FileType::Json}; std::string timestamp_key; std::string archives_dir; @@ -50,6 +52,7 @@ struct JsonParserOption { bool record_log_order{true}; bool single_file_archive{false}; std::shared_ptr metadata_db; + NetworkAuthOption network_auth{}; }; class JsonParser { @@ -167,7 +170,8 @@ class JsonParser { int32_t add_metadata_field(std::string_view const field_name, NodeType type); int m_num_messages; - std::vector m_file_paths; + std::vector m_input_paths; + NetworkAuthOption m_network_auth{}; Schema m_current_schema; ParsedMessage m_current_parsed_message; diff --git a/components/core/src/clp_s/ReaderUtils.cpp b/components/core/src/clp_s/ReaderUtils.cpp index a2ab5a34a..88bb31286 100644 --- a/components/core/src/clp_s/ReaderUtils.cpp +++ b/components/core/src/clp_s/ReaderUtils.cpp @@ -1,6 +1,14 @@ #include "ReaderUtils.hpp" +#include + +#include "../clp/aws/AwsAuthenticationSigner.hpp" +#include "../clp/FileReader.hpp" +#include "../clp/NetworkReader.hpp" +#include "../clp/ReaderInterface.hpp" +#include "../clp/spdlog_with_specializations.hpp" #include "archive_constants.hpp" +#include "Utils.hpp" namespace clp_s { std::shared_ptr ReaderUtils::read_schema_tree(std::string const& archives_dir) { @@ -142,22 +150,75 @@ std::shared_ptr ReaderUtils::read_schemas(std::string co return schemas_pointer; } -std::vector ReaderUtils::get_archives(std::string const& archives_dir) { - std::vector archive_paths; +namespace { +std::shared_ptr try_create_file_reader(std::string_view const file_path) { + try { + return std::make_shared(std::string{file_path}); + } catch (clp::FileReader::OperationFailed const& e) { + SPDLOG_ERROR("Failed to open file for reading - {} - {}", file_path, e.what()); + return nullptr; + } +} - if (false == boost::filesystem::is_directory(archives_dir)) { - throw OperationFailed(ErrorCodeBadParam, __FILENAME__, __LINE__); +bool try_sign_url(std::string& url) { + auto const aws_access_key = std::getenv(cAwsAccessKeyIdEnvVar); + auto const aws_secret_access_key = std::getenv(cAwsSecretAccessKeyEnvVar); + if (nullptr == aws_access_key || nullptr == aws_secret_access_key) { + SPDLOG_ERROR( + "{} and {} environment variables not available for presigned url authentication.", + cAwsAccessKeyIdEnvVar, + cAwsSecretAccessKeyEnvVar + ); + return false; } - boost::filesystem::directory_iterator iter(archives_dir); - boost::filesystem::directory_iterator end; - for (; iter != end; ++iter) { - if (boost::filesystem::is_directory(iter->path())) { - archive_paths.push_back(iter->path().string()); + clp::aws::AwsAuthenticationSigner signer{aws_access_key, aws_secret_access_key}; + + try { + clp::aws::S3Url s3_url{url}; + if (auto const rc = signer.generate_presigned_url(s3_url, url); + clp::ErrorCode::ErrorCode_Success != rc) + { + return false; } + } catch (std::exception const& e) { + return false; } - - return archive_paths; + return true; } +std::shared_ptr +try_create_network_reader(std::string_view const url, NetworkAuthOption const& auth) { + std::string request_url{url}; + switch (auth.method) { + case AuthMethod::S3PresignedUrlV4: + if (false == try_sign_url(request_url)) { + return nullptr; + } + break; + case AuthMethod::None: + break; + default: + return nullptr; + } + + try { + return std::make_shared(request_url); + } catch (clp::NetworkReader::OperationFailed const& e) { + SPDLOG_ERROR("Failed to open url for reading - {}", e.what()); + return nullptr; + } +} +} // namespace + +std::shared_ptr +ReaderUtils::try_create_reader(Path const& path, NetworkAuthOption const& network_auth) { + if (InputSource::Filesystem == path.source) { + return try_create_file_reader(path.path); + } else if (InputSource::Network == path.source) { + return try_create_network_reader(path.path, network_auth); + } else { + return nullptr; + } +} } // namespace clp_s diff --git a/components/core/src/clp_s/ReaderUtils.hpp b/components/core/src/clp_s/ReaderUtils.hpp index caa509d6a..4661b5fae 100644 --- a/components/core/src/clp_s/ReaderUtils.hpp +++ b/components/core/src/clp_s/ReaderUtils.hpp @@ -1,7 +1,11 @@ #ifndef CLP_S_READERUTILS_HPP #define CLP_S_READERUTILS_HPP +#include + +#include "../clp/ReaderInterface.hpp" #include "DictionaryReader.hpp" +#include "InputConfig.hpp" #include "Schema.hpp" #include "SchemaReader.hpp" #include "SchemaTree.hpp" @@ -67,11 +71,13 @@ class ReaderUtils { ); /** - * Gets the list of archives in the given archive directory - * @param archives_dir - * @return the list of archives + * Tries to open a clp::ReaderInterface using the given Path and NetworkAuthOption. + * @param path + * @param network_auth + * @return the opened clp::ReaderInterface or nullptr on error */ - static std::vector get_archives(std::string const& archives_dir); + static std::shared_ptr + try_create_reader(Path const& path, NetworkAuthOption const& network_auth); private: /** diff --git a/components/core/src/clp_s/Utils.cpp b/components/core/src/clp_s/Utils.cpp index acee48851..19b564c03 100644 --- a/components/core/src/clp_s/Utils.cpp +++ b/components/core/src/clp_s/Utils.cpp @@ -1,42 +1,52 @@ #include "Utils.hpp" -#include +#include +#include +#include + +#include +#include #include +#include "archive_constants.hpp" + using std::string; using std::string_view; namespace clp_s { -bool FileUtils::find_all_files(std::string const& path, std::vector& file_paths) { +bool FileUtils::find_all_files_in_directory( + std::string const& path, + std::vector& file_paths +) { try { - if (false == boost::filesystem::is_directory(path)) { + if (false == std::filesystem::is_directory(path)) { // path is a file file_paths.push_back(path); return true; } - if (boost::filesystem::is_empty(path)) { + if (std::filesystem::is_empty(path)) { // path is an empty directory return true; } // Iterate directory - boost::filesystem::recursive_directory_iterator iter( + std::filesystem::recursive_directory_iterator iter( path, - boost::filesystem::directory_options::follow_directory_symlink + std::filesystem::directory_options::follow_directory_symlink ); - boost::filesystem::recursive_directory_iterator end; + std::filesystem::recursive_directory_iterator end; for (; iter != end; ++iter) { // Check if current entry is an empty directory or a file - if (boost::filesystem::is_directory(iter->path())) { - if (boost::filesystem::is_empty(iter->path())) { + if (std::filesystem::is_directory(iter->path())) { + if (std::filesystem::is_empty(iter->path())) { iter.disable_recursion_pending(); } } else { file_paths.push_back(iter->path().string()); } } - } catch (boost::filesystem::filesystem_error& exception) { + } catch (std::exception const& exception) { SPDLOG_ERROR( "Failed to find files/directories at '{}' - {}.", path.c_str(), @@ -48,16 +58,106 @@ bool FileUtils::find_all_files(std::string const& path, std::vector return true; } -bool FileUtils::validate_path(std::vector const& paths) { - bool all_paths_exist = true; - for (auto const& path : paths) { - if (false == boost::filesystem::exists(path)) { - SPDLOG_ERROR("'{}' does not exist.", path.c_str()); - all_paths_exist = false; +namespace { +/** + * Determines if a directory is a multi-file archive. + * @param path + * @return true if this directory is a multi-file archive, false otherwise + */ +bool is_multi_file_archive(std::string_view const path) { + for (auto const& entry : std::filesystem::directory_iterator{path}) { + if (entry.is_directory()) { + return false; } + + std::string file_name; + if (false == FileUtils::get_last_non_empty_path_component(entry.path().string(), file_name)) + { + return false; + } + auto formatted_name = fmt::format("/{}", file_name); + if (constants::cArchiveTimestampDictFile == formatted_name + || constants::cArchiveSchemaTreeFile == formatted_name + || constants::cArchiveSchemaMapFile == formatted_name + || constants::cArchiveVarDictFile == formatted_name + || constants::cArchiveLogDictFile == formatted_name + || constants::cArchiveArrayDictFile == formatted_name + || constants::cArchiveTableMetadataFile == formatted_name + || constants::cArchiveTablesFile == formatted_name) + { + continue; + } else { + try { + auto segment_file_number = std::stoi(file_name); + continue; + } catch (std::exception const& e) { + return false; + } + } + } + return true; +} +} // namespace + +bool FileUtils::find_all_archives_in_directory( + std::string_view const path, + std::vector& archive_paths +) { + try { + if (false == std::filesystem::is_directory(path)) { + return false; + } + } catch (std::exception const& e) { + return false; } - return all_paths_exist; + if (is_multi_file_archive(path)) { + archive_paths.emplace_back(path); + return true; + } + + for (auto const& entry : std::filesystem::directory_iterator{path}) { + archive_paths.emplace_back(entry.path().string()); + } + return true; +} + +bool FileUtils::get_last_non_empty_path_component(std::string_view const path, std::string& name) { + std::filesystem::path fs_path; + try { + fs_path = std::filesystem::path{path}.lexically_normal(); + } catch (std::exception const& e) { + return false; + } + + if (fs_path.has_filename() && false == fs_path.filename().string().empty()) { + name = fs_path.filename().string(); + return true; + } + + while (fs_path.has_parent_path()) { + fs_path = fs_path.parent_path(); + if (fs_path.has_filename() && false == fs_path.filename().string().empty()) { + name = fs_path.filename().string(); + return true; + } + } + + return false; +} + +bool UriUtils::get_last_uri_component(std::string_view const uri, std::string& name) { + auto parsed_result = boost::urls::parse_uri(uri); + if (false == parsed_result.has_value()) { + return false; + } + auto parsed_uri = parsed_result.value(); + auto path_segments_view = parsed_uri.segments(); + if (path_segments_view.empty()) { + return false; + } + name = path_segments_view.back(); + return true; } bool StringUtils::get_bounds_of_next_var(string const& msg, size_t& begin_pos, size_t& end_pos) { diff --git a/components/core/src/clp_s/Utils.hpp b/components/core/src/clp_s/Utils.hpp index 553f7e608..0181a6749 100644 --- a/components/core/src/clp_s/Utils.hpp +++ b/components/core/src/clp_s/Utils.hpp @@ -5,26 +5,60 @@ #include #include #include - -#include +#include +#include namespace clp_s { class FileUtils { public: /** - * Find all files in a directory + * Finds all files in a directory * @param path * @param file_paths * @return true if successful, false otherwise */ - static bool find_all_files(std::string const& path, std::vector& file_paths); + static bool + find_all_files_in_directory(std::string const& path, std::vector& file_paths); /** - * Validate if all paths exist - * @param paths - * @return true if all paths exist, false otherwise + * Finds all archives in a directory, including the directory itself + * @param path + * @param archive_paths + * @return true if successful, false otherwise + */ + static bool find_all_archives_in_directory( + std::string_view const path, + std::vector& archive_paths + ); + + /** + * Gets the last non-empty component of a path, accounting for trailing forward slashes. + * + * For example: + * ./foo/bar.baz -> bar.baz + * ./foo/bar.baz/ -> bar.baz + * + * @param path + * @param name Returned component name + * @return true on success, false otherwise + */ + static bool get_last_non_empty_path_component(std::string_view const path, std::string& name); +}; + +class UriUtils { +public: + /** + * Gets the last component of a uri. + * + * For example: + * https://www.something.org/abc-xyz -> abc-xyz + * https://www.something.org/aaa/bbb/abc-xyz?something=something -> abc-xyz + * + * @param uri + * @param name Returned component name + * @return true on success, false otherwise */ - static bool validate_path(std::vector const& paths); + static bool get_last_uri_component(std::string_view const uri, std::string& name); }; class StringUtils { diff --git a/components/core/src/clp_s/ZstdDecompressor.cpp b/components/core/src/clp_s/ZstdDecompressor.cpp index 87d3ae8fa..c6c7f99e7 100644 --- a/components/core/src/clp_s/ZstdDecompressor.cpp +++ b/components/core/src/clp_s/ZstdDecompressor.cpp @@ -3,8 +3,9 @@ #include "ZstdDecompressor.hpp" #include +#include -#include +#include #include namespace clp_s { @@ -202,14 +203,13 @@ ErrorCode ZstdDecompressor::open(std::string const& compressed_file_path) { m_input_type = InputType::MemoryMappedCompressedFile; // Create memory mapping for compressed_file_path, use boost read only memory mapped file - boost::system::error_code boost_error_code; - size_t compressed_file_size - = boost::filesystem::file_size(compressed_file_path, boost_error_code); - if (boost_error_code) { + std::error_code error_code; + size_t compressed_file_size = std::filesystem::file_size(compressed_file_path, error_code); + if (error_code) { SPDLOG_ERROR( "ZstdDecompressor: Unable to obtain file size for '{}' - {}.", compressed_file_path.c_str(), - boost_error_code.message().c_str() + error_code.message().c_str() ); return ErrorCodeFailure; } diff --git a/components/core/src/clp_s/clp-s.cpp b/components/core/src/clp_s/clp-s.cpp index 0f7b5643a..c21a1f3d3 100644 --- a/components/core/src/clp_s/clp-s.cpp +++ b/components/core/src/clp_s/clp-s.cpp @@ -1,3 +1,4 @@ +#include #include #include #include @@ -11,6 +12,7 @@ #include #include +#include "../clp/CurlGlobalInstance.hpp" #include "../clp/GlobalMySQLMetadataDB.hpp" #include "../clp/streaming_archive/ArchiveMetadata.hpp" #include "../reducer/network_utils.hpp" @@ -18,7 +20,6 @@ #include "Defs.hpp" #include "JsonConstructor.hpp" #include "JsonParser.hpp" -#include "ReaderUtils.hpp" #include "search/AddTimestampConditions.hpp" #include "search/ConvertToExists.hpp" #include "search/EmptyExpr.hpp" @@ -87,7 +88,8 @@ bool compress(CommandLineArguments const& command_line_arguments) { } clp_s::JsonParserOption option{}; - option.file_paths = command_line_arguments.get_file_paths(); + option.input_paths = command_line_arguments.get_input_paths(); + option.network_auth = command_line_arguments.get_network_auth(); option.input_file_type = command_line_arguments.get_file_type(); option.archives_dir = archives_dir.string(); option.target_encoded_size = command_line_arguments.get_target_encoded_size(); @@ -281,6 +283,7 @@ int main(int argc, char const* argv[]) { clp_s::TimestampPattern::init(); mongocxx::instance const mongocxx_instance{}; + clp::CurlGlobalInstance const curl_instance{}; CommandLineArguments command_line_arguments("clp-s"); auto parsing_result = command_line_arguments.parse_arguments(argc, argv); @@ -299,37 +302,21 @@ int main(int argc, char const* argv[]) { return 1; } } else if (CommandLineArguments::Command::Extract == command_line_arguments.get_command()) { - auto const& archives_dir = command_line_arguments.get_archives_dir(); - if (false == std::filesystem::is_directory(archives_dir)) { - SPDLOG_ERROR("'{}' is not a directory.", archives_dir); - return 1; - } - clp_s::JsonConstructorOption option{}; option.output_dir = command_line_arguments.get_output_dir(); option.ordered = command_line_arguments.get_ordered_decompression(); - option.archives_dir = archives_dir; option.target_ordered_chunk_size = command_line_arguments.get_target_ordered_chunk_size(); + option.network_auth = command_line_arguments.get_network_auth(); if (false == command_line_arguments.get_mongodb_uri().empty()) { option.metadata_db = {command_line_arguments.get_mongodb_uri(), command_line_arguments.get_mongodb_collection()}; } + try { - auto const& archive_id = command_line_arguments.get_archive_id(); - if (false == archive_id.empty()) { - option.archive_id = archive_id; + for (auto const& archive_path : command_line_arguments.get_input_paths()) { + option.archive_path = archive_path; decompress_archive(option); - } else { - for (auto const& entry : std::filesystem::directory_iterator(archives_dir)) { - if (false == entry.is_directory()) { - // Skip non-directories - continue; - } - - option.archive_id = entry.path().filename(); - decompress_archive(option); - } } } catch (clp_s::TraceableException& e) { SPDLOG_ERROR("{}", e.what()); @@ -348,12 +335,6 @@ int main(int argc, char const* argv[]) { return 1; } - auto const& archives_dir = command_line_arguments.get_archives_dir(); - if (false == std::filesystem::is_directory(archives_dir)) { - SPDLOG_ERROR("'{}' is not a directory.", archives_dir); - return 1; - } - int reducer_socket_fd{-1}; if (command_line_arguments.get_output_handler_type() == CommandLineArguments::OutputHandlerType::Reducer) @@ -369,37 +350,25 @@ int main(int argc, char const* argv[]) { } } - auto const& archive_id = command_line_arguments.get_archive_id(); auto archive_reader = std::make_shared(); - if (false == archive_id.empty()) { - archive_reader->open(archives_dir, archive_id); + for (auto const& archive_path : command_line_arguments.get_input_paths()) { + try { + archive_reader->open(archive_path, command_line_arguments.get_network_auth()); + } catch (std::exception const& e) { + SPDLOG_ERROR("Failed to open archive - {}", e.what()); + return 1; + } if (false - == search_archive(command_line_arguments, archive_reader, expr, reducer_socket_fd)) + == search_archive( + command_line_arguments, + archive_reader, + expr->copy(), + reducer_socket_fd + )) { return 1; } archive_reader->close(); - } else { - for (auto const& entry : std::filesystem::directory_iterator(archives_dir)) { - if (false == entry.is_directory()) { - // Skip non-directories - continue; - } - - auto const archive_id = entry.path().filename().string(); - archive_reader->open(archives_dir, archive_id); - if (false - == search_archive( - command_line_arguments, - archive_reader, - expr->copy(), - reducer_socket_fd - )) - { - return 1; - } - archive_reader->close(); - } } } diff --git a/components/core/src/clp_s/search/kql/CMakeLists.txt b/components/core/src/clp_s/search/kql/CMakeLists.txt index ee36ee124..9dba44a4b 100644 --- a/components/core/src/clp_s/search/kql/CMakeLists.txt +++ b/components/core/src/clp_s/search/kql/CMakeLists.txt @@ -26,3 +26,4 @@ add_library( target_compile_features(kql PRIVATE cxx_std_20) target_include_directories(kql PRIVATE ${ANTLR_KqlParser_OUTPUT_DIR}) target_link_libraries(kql PRIVATE antlr4_static Boost::filesystem) + diff --git a/components/core/tests/test-clp_s-end_to_end.cpp b/components/core/tests/test-clp_s-end_to_end.cpp index 3f138b472..259b46b93 100644 --- a/components/core/tests/test-clp_s-end_to_end.cpp +++ b/components/core/tests/test-clp_s-end_to_end.cpp @@ -9,6 +9,7 @@ #include #include +#include "../src/clp_s/InputConfig.hpp" #include "../src/clp_s/JsonConstructor.hpp" #include "../src/clp_s/JsonParser.hpp" @@ -70,7 +71,10 @@ void compress(bool structurize_arrays) { REQUIRE((std::filesystem::is_directory(cTestEndToEndArchiveDirectory))); clp_s::JsonParserOption parser_option{}; - parser_option.file_paths.push_back(get_test_input_local_path()); + parser_option.input_paths.emplace_back(clp_s::Path{ + .source = clp_s::InputSource::Filesystem, + .path = get_test_input_local_path() + }); parser_option.archives_dir = cTestEndToEndArchiveDirectory; parser_option.target_encoded_size = cDefaultTargetEncodedSize; parser_option.max_document_size = cDefaultMaxDocumentSize; @@ -94,17 +98,19 @@ auto extract() -> std::filesystem::path { REQUIRE(std::filesystem::is_directory(cTestEndToEndOutputDirectory)); clp_s::JsonConstructorOption constructor_option{}; - constructor_option.archives_dir = cTestEndToEndArchiveDirectory; constructor_option.output_dir = cTestEndToEndOutputDirectory; constructor_option.ordered = cDefaultOrdered; constructor_option.target_ordered_chunk_size = cDefaultTargetOrderedChunkSize; - for (auto const& entry : std::filesystem::directory_iterator(constructor_option.archives_dir)) { + for (auto const& entry : std::filesystem::directory_iterator(cTestEndToEndArchiveDirectory)) { if (false == entry.is_directory()) { // Skip non-directories continue; } - constructor_option.archive_id = entry.path().filename(); + constructor_option.archive_path = clp_s::Path{ + .source{clp_s::InputSource::Filesystem}, + .path{entry.path().string()} + }; clp_s::JsonConstructor constructor{constructor_option}; constructor.store(); }