diff --git a/components/core/CMakeLists.txt b/components/core/CMakeLists.txt index 3445d798b..99d3c8469 100644 --- a/components/core/CMakeLists.txt +++ b/components/core/CMakeLists.txt @@ -324,6 +324,8 @@ set(SOURCE_FILES_unitTest src/clp/ffi/search/Subquery.hpp src/clp/ffi/search/WildcardToken.cpp src/clp/ffi/search/WildcardToken.hpp + src/clp/FileDescriptor.cpp + src/clp/FileDescriptor.hpp src/clp/FileReader.cpp src/clp/FileReader.hpp src/clp/FileWriter.cpp @@ -381,6 +383,8 @@ set(SOURCE_FILES_unitTest src/clp/Query.hpp src/clp/ReaderInterface.cpp src/clp/ReaderInterface.hpp + src/clp/ReadOnlyMemoryMappedFile.cpp + src/clp/ReadOnlyMemoryMappedFile.hpp src/clp/spdlog_with_specializations.hpp src/clp/SQLiteDB.cpp src/clp/SQLiteDB.hpp @@ -456,6 +460,7 @@ set(SOURCE_FILES_unitTest tests/test-kql.cpp tests/test-main.cpp tests/test-math_utils.cpp + tests/test-MemoryMappedFile.cpp tests/test-NetworkReader.cpp tests/test-ParserWithUserSchema.cpp tests/test-query_methods.cpp diff --git a/components/core/src/clp/FileDescriptor.cpp b/components/core/src/clp/FileDescriptor.cpp new file mode 100644 index 000000000..2e17bfe05 --- /dev/null +++ b/components/core/src/clp/FileDescriptor.cpp @@ -0,0 +1,63 @@ +#include "FileDescriptor.hpp" + +#include +#include +#include + +#include +#include +#include + +#include "ErrorCode.hpp" +#include "type_utils.hpp" + +namespace clp { +FileDescriptor::FileDescriptor( + std::string_view path, + OpenMode open_mode, + CloseFailureCallback close_failure_callback +) + : m_open_mode{open_mode}, + m_close_failure_callback{close_failure_callback} { + // For newly created files, we enable writing for the owner and reading for everyone. + // Callers can change the created file's permissions as necessary. + constexpr auto cNewFilePermission{S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH}; + auto const flag{enum_to_underlying_type(open_mode)}; + if (0 != (flag & O_CREAT)) { + m_fd = open(path.data(), flag, cNewFilePermission); + } else { + m_fd = open(path.data(), flag); + } + if (-1 == m_fd) { + throw OperationFailed( + ErrorCode_errno, + __FILE__, + __LINE__, + "Failed to open file descriptor for path: " + std::string{path} + ); + } +} + +FileDescriptor::~FileDescriptor() { + if (-1 == m_fd) { + return; + } + if (0 != close(m_fd) && nullptr != m_close_failure_callback) { + m_close_failure_callback(errno); + } +} + +auto FileDescriptor::get_size() const -> size_t { + struct stat stat_result {}; + + if (0 != fstat(m_fd, &stat_result)) { + throw OperationFailed( + ErrorCode_errno, + __FILE__, + __LINE__, + "Failed to stat file using file descriptor." + ); + } + return static_cast(stat_result.st_size); +} +} // namespace clp diff --git a/components/core/src/clp/FileDescriptor.hpp b/components/core/src/clp/FileDescriptor.hpp new file mode 100644 index 000000000..a704326bf --- /dev/null +++ b/components/core/src/clp/FileDescriptor.hpp @@ -0,0 +1,93 @@ +#ifndef CLP_FILEDESCRIPTOR_HPP +#define CLP_FILEDESCRIPTOR_HPP + +#include + +#include +#include +#include +#include + +#include "ErrorCode.hpp" +#include "TraceableException.hpp" + +namespace clp { +/** + * Wrapper for a UNIX file descriptor. + */ +class FileDescriptor { +public: + // Types + /** + * `close` is called in the destructor to close the file descriptor. However, `close` may return + * an error indicated by `errno`. This type alias defines a callback to handle the `close` + * failure in the destructor. + * The signature of the callback: void close_failure_callback(int errno) + */ + using CloseFailureCallback = void (*)(int); + + class OperationFailed : public TraceableException { + public: + OperationFailed( + ErrorCode error_code, + char const* const filename, + int line_number, + std::string msg + ) + : TraceableException{error_code, filename, line_number}, + m_msg{std::move(msg)} {} + + [[nodiscard]] auto what() const noexcept -> char const* override { return m_msg.c_str(); } + + private: + std::string m_msg; + }; + + /** + * A C++ wrapper for Unix oflag that describes the open mode. + */ + // NOLINTNEXTLINE(performance-enum-size) + enum class OpenMode : int { + ReadOnly = O_RDONLY, + CreateForWrite = O_WRONLY | O_CREAT | O_TRUNC, + }; + + // Constructors + FileDescriptor( + std::string_view path, + OpenMode open_mode, + CloseFailureCallback close_failure_callback = nullptr + ); + + // Destructor + ~FileDescriptor(); + + // Disable copy/move constructors/assignment operators + FileDescriptor(FileDescriptor const&) = delete; + FileDescriptor(FileDescriptor&&) = delete; + auto operator=(FileDescriptor const&) -> FileDescriptor& = delete; + auto operator=(FileDescriptor&&) -> FileDescriptor& = delete; + + /** + * @return The raw fd. + */ + [[nodiscard]] auto get_raw_fd() const -> int { return m_fd; } + + /** + * @return The size of the file. + */ + [[nodiscard]] auto get_size() const -> size_t; + + /** + * @return The open mode. + */ + [[nodiscard]] auto get_open_mode() const -> OpenMode { return m_open_mode; } + +private: + int m_fd{-1}; + OpenMode m_open_mode; + CloseFailureCallback m_close_failure_callback{nullptr}; +}; +} // namespace clp + +#endif diff --git a/components/core/src/clp/ReadOnlyMemoryMappedFile.cpp b/components/core/src/clp/ReadOnlyMemoryMappedFile.cpp new file mode 100644 index 000000000..167b675a5 --- /dev/null +++ b/components/core/src/clp/ReadOnlyMemoryMappedFile.cpp @@ -0,0 +1,41 @@ +#include "ReadOnlyMemoryMappedFile.hpp" + +#include + +#include +#include + +#include "ErrorCode.hpp" +#include "FileDescriptor.hpp" + +namespace clp { +ReadOnlyMemoryMappedFile::ReadOnlyMemoryMappedFile(std::string_view path) { + FileDescriptor const fd{path, FileDescriptor::OpenMode::ReadOnly}; + auto const file_size{fd.get_size()}; + if (0 == file_size) { + // `mmap` doesn't allow mapping an empty file, so we don't need to call it. + return; + } + auto* mmap_ptr{mmap(nullptr, file_size, PROT_READ, MAP_PRIVATE, fd.get_raw_fd(), 0)}; + if (MAP_FAILED == mmap_ptr) { + throw OperationFailed( + ErrorCode_errno, + __FILE__, + __LINE__, + "`mmap` failed to map path: " + std::string{path} + ); + } + m_data = mmap_ptr; + m_buf_size = file_size; +} + +ReadOnlyMemoryMappedFile::~ReadOnlyMemoryMappedFile() { + if (0 == m_buf_size) { + // We don't call `mmap` for empty files, so we don't need to call `munmap`. + return; + } + // We skip error checking since the only likely reason for `munmap` to fail is if we give it + // invalid arguments. + munmap(m_data, m_buf_size); +} +} // namespace clp diff --git a/components/core/src/clp/ReadOnlyMemoryMappedFile.hpp b/components/core/src/clp/ReadOnlyMemoryMappedFile.hpp new file mode 100644 index 000000000..5749d3bd2 --- /dev/null +++ b/components/core/src/clp/ReadOnlyMemoryMappedFile.hpp @@ -0,0 +1,66 @@ +#ifndef CLP_READONLYMEMORYMAPPEDFILE_HPP +#define CLP_READONLYMEMORYMAPPEDFILE_HPP + +#include +#include +#include +#include +#include + +#include "ErrorCode.hpp" +#include "TraceableException.hpp" + +namespace clp { +/** + * A class for mapping a read-only file into memory. It maintains the memory buffer created by the + * underlying `mmap` system call and provides methods to get a view of the memory buffer. + */ +class ReadOnlyMemoryMappedFile { +public: + // Types + class OperationFailed : public TraceableException { + public: + OperationFailed( + ErrorCode error_code, + char const* const filename, + int line_number, + std::string msg + ) + : TraceableException{error_code, filename, line_number}, + m_msg{std::move(msg)} {} + + [[nodiscard]] auto what() const noexcept -> char const* override { return m_msg.c_str(); } + + private: + std::string m_msg; + }; + + // Constructors + /** + * @param path The path of the file to map. + */ + explicit ReadOnlyMemoryMappedFile(std::string_view path); + + // Destructor + ~ReadOnlyMemoryMappedFile(); + + // Disable copy/move constructors/assignment operators + ReadOnlyMemoryMappedFile(ReadOnlyMemoryMappedFile const&) = delete; + ReadOnlyMemoryMappedFile(ReadOnlyMemoryMappedFile&&) = delete; + auto operator=(ReadOnlyMemoryMappedFile const&) -> ReadOnlyMemoryMappedFile& = delete; + auto operator=(ReadOnlyMemoryMappedFile&&) -> ReadOnlyMemoryMappedFile& = delete; + + /** + * @return A view of the mapped file in memory. + */ + [[nodiscard]] auto get_view() const -> std::span { + return std::span{static_cast(m_data), m_buf_size}; + } + +private: + void* m_data{nullptr}; + size_t m_buf_size{0}; +}; +} // namespace clp + +#endif diff --git a/components/core/src/clp/clg/CMakeLists.txt b/components/core/src/clp/clg/CMakeLists.txt index 37c5a3710..bed6c11fc 100644 --- a/components/core/src/clp/clg/CMakeLists.txt +++ b/components/core/src/clp/clg/CMakeLists.txt @@ -18,6 +18,8 @@ set( ../ffi/ir_stream/decoding_methods.cpp ../ffi/ir_stream/decoding_methods.hpp ../ffi/ir_stream/decoding_methods.inc + ../FileDescriptor.cpp + ../FileDescriptor.hpp ../FileReader.cpp ../FileReader.hpp ../FileWriter.cpp @@ -57,6 +59,8 @@ set( ../Query.hpp ../ReaderInterface.cpp ../ReaderInterface.hpp + ../ReadOnlyMemoryMappedFile.cpp + ../ReadOnlyMemoryMappedFile.hpp ../spdlog_with_specializations.hpp ../SQLiteDB.cpp ../SQLiteDB.hpp diff --git a/components/core/src/clp/clo/CMakeLists.txt b/components/core/src/clp/clo/CMakeLists.txt index b798e918a..306b6049d 100644 --- a/components/core/src/clp/clo/CMakeLists.txt +++ b/components/core/src/clp/clo/CMakeLists.txt @@ -20,6 +20,8 @@ set( ../ffi/ir_stream/decoding_methods.cpp ../ffi/ir_stream/decoding_methods.hpp ../ffi/ir_stream/decoding_methods.inc + ../FileDescriptor.cpp + ../FileDescriptor.hpp ../FileReader.cpp ../FileReader.hpp ../FileWriter.cpp @@ -49,6 +51,8 @@ set( ../Query.hpp ../ReaderInterface.cpp ../ReaderInterface.hpp + ../ReadOnlyMemoryMappedFile.cpp + ../ReadOnlyMemoryMappedFile.hpp ../spdlog_with_specializations.hpp ../SQLiteDB.cpp ../SQLiteDB.hpp diff --git a/components/core/src/clp/clp/CMakeLists.txt b/components/core/src/clp/clp/CMakeLists.txt index 4ea4f20c6..b8e073dd1 100644 --- a/components/core/src/clp/clp/CMakeLists.txt +++ b/components/core/src/clp/clp/CMakeLists.txt @@ -27,6 +27,8 @@ set( ../ffi/ir_stream/encoding_methods.hpp ../ffi/ir_stream/utils.cpp ../ffi/ir_stream/utils.hpp + ../FileDescriptor.cpp + ../FileDescriptor.hpp ../FileReader.cpp ../FileReader.hpp ../FileWriter.cpp @@ -80,6 +82,8 @@ set( ../Query.hpp ../ReaderInterface.cpp ../ReaderInterface.hpp + ../ReadOnlyMemoryMappedFile.cpp + ../ReadOnlyMemoryMappedFile.hpp ../spdlog_with_specializations.hpp ../SQLiteDB.cpp ../SQLiteDB.hpp diff --git a/components/core/src/clp/make_dictionaries_readable/CMakeLists.txt b/components/core/src/clp/make_dictionaries_readable/CMakeLists.txt index 6dc5334bf..fd62a39fb 100644 --- a/components/core/src/clp/make_dictionaries_readable/CMakeLists.txt +++ b/components/core/src/clp/make_dictionaries_readable/CMakeLists.txt @@ -4,6 +4,8 @@ set( ../dictionary_utils.hpp ../DictionaryEntry.hpp ../DictionaryReader.hpp + ../FileDescriptor.cpp + ../FileDescriptor.hpp ../FileReader.cpp ../FileReader.hpp ../FileWriter.cpp @@ -17,6 +19,8 @@ set( ../ParsedMessage.hpp ../ReaderInterface.cpp ../ReaderInterface.hpp + ../ReadOnlyMemoryMappedFile.cpp + ../ReadOnlyMemoryMappedFile.hpp ../spdlog_with_specializations.hpp ../streaming_compression/Decompressor.hpp ../streaming_compression/passthrough/Decompressor.cpp diff --git a/components/core/src/clp/streaming_compression/zstd/Decompressor.cpp b/components/core/src/clp/streaming_compression/zstd/Decompressor.cpp index 9f320efe6..818379a24 100644 --- a/components/core/src/clp/streaming_compression/zstd/Decompressor.cpp +++ b/components/core/src/clp/streaming_compression/zstd/Decompressor.cpp @@ -2,9 +2,8 @@ #include -#include - #include "../../Defs.h" +#include "../../ReadOnlyMemoryMappedFile.hpp" #include "../../spdlog_with_specializations.hpp" namespace clp::streaming_compression::zstd { @@ -182,10 +181,7 @@ void Decompressor::open(FileReader& file_reader, size_t file_read_buffer_capacit void Decompressor::close() { switch (m_input_type) { case InputType::MemoryMappedCompressedFile: - if (m_memory_mapped_compressed_file.is_open()) { - // An existing file is memory mapped by the decompressor - m_memory_mapped_compressed_file.close(); - } + m_memory_mapped_file.reset(); break; case InputType::File: m_file_read_buffer.reset(); @@ -209,40 +205,12 @@ ErrorCode Decompressor::open(std::string const& compressed_file_path) { } m_input_type = InputType::MemoryMappedCompressedFile; - // Create memory mapping for compressed_file_path, use boost read only - // memory mapped file - boost::system::error_code boost_error_code; - size_t compressed_file_size - = boost::filesystem::file_size(compressed_file_path, boost_error_code); - if (boost_error_code) { - SPDLOG_ERROR( - "streaming_compression::zstd::Decompressor: Unable to obtain file size for " - "'{}' - {}.", - compressed_file_path.c_str(), - boost_error_code.message().c_str() - ); - return ErrorCode_Failure; - } - - boost::iostreams::mapped_file_params memory_map_params; - memory_map_params.path = compressed_file_path; - memory_map_params.flags = boost::iostreams::mapped_file::readonly; - memory_map_params.length = compressed_file_size; - // Try to map it to the same memory location as previous memory mapped - // file - memory_map_params.hint = m_memory_mapped_compressed_file.data(); - m_memory_mapped_compressed_file.open(memory_map_params); - if (!m_memory_mapped_compressed_file.is_open()) { - SPDLOG_ERROR( - "streaming_compression::zstd::Decompressor: Unable to memory map the " - "compressed file with path: {}", - compressed_file_path.c_str() - ); - return ErrorCode_Failure; - } + // Create read-only memory mapping for compressed_file_path + m_memory_mapped_file = std::make_unique(compressed_file_path); + auto const file_view{m_memory_mapped_file->get_view()}; // Configure input stream - m_compressed_stream_block = {m_memory_mapped_compressed_file.data(), compressed_file_size, 0}; + m_compressed_stream_block = {file_view.data(), file_view.size(), 0}; reset_stream(); diff --git a/components/core/src/clp/streaming_compression/zstd/Decompressor.hpp b/components/core/src/clp/streaming_compression/zstd/Decompressor.hpp index 665674373..cc9e90fe4 100644 --- a/components/core/src/clp/streaming_compression/zstd/Decompressor.hpp +++ b/components/core/src/clp/streaming_compression/zstd/Decompressor.hpp @@ -4,10 +4,10 @@ #include #include -#include #include #include "../../FileReader.hpp" +#include "../../ReadOnlyMemoryMappedFile.hpp" #include "../../TraceableException.hpp" #include "../Decompressor.hpp" @@ -125,7 +125,7 @@ class Decompressor : public ::clp::streaming_compression::Decompressor { // Compressed stream variables ZSTD_DStream* m_decompression_stream; - boost::iostreams::mapped_file_source m_memory_mapped_compressed_file; + std::unique_ptr m_memory_mapped_file; FileReader* m_file_reader; size_t m_file_reader_initial_pos; std::unique_ptr m_file_read_buffer; diff --git a/components/core/tests/test-MemoryMappedFile.cpp b/components/core/tests/test-MemoryMappedFile.cpp new file mode 100644 index 000000000..20d433f32 --- /dev/null +++ b/components/core/tests/test-MemoryMappedFile.cpp @@ -0,0 +1,64 @@ +#include +#include +#include +#include +#include + +#include + +#include "../src/clp/FileReader.hpp" +#include "../src/clp/ReaderInterface.hpp" +#include "../src/clp/ReadOnlyMemoryMappedFile.hpp" + +namespace { +/** + * Reads all content from a reader. + * @param reader + * @return The content. + */ +[[nodiscard]] auto read_content(clp::ReaderInterface& reader) -> std::vector; + +[[nodiscard]] auto get_test_dir() -> std::filesystem::path; + +auto read_content(clp::ReaderInterface& reader) -> std::vector { + constexpr size_t cBufferSize{4096}; + std::array read_buf{}; + std::vector buf; + for (bool has_more_content{true}; has_more_content;) { + size_t num_bytes_read{}; + has_more_content = reader.read(read_buf.data(), read_buf.size(), num_bytes_read); + buf.insert(buf.cend(), read_buf.cbegin(), read_buf.cbegin() + num_bytes_read); + } + return buf; +} + +auto get_test_dir() -> std::filesystem::path { + std::filesystem::path const current_file_path{__FILE__}; + return current_file_path.parent_path(); +} +} // namespace + +TEST_CASE("memory_mapped_file_view_basic", "[ReadOnlyMemoryMappedFile]") { + auto const test_input_path{ + get_test_dir() / std::filesystem::path{"test_network_reader_src"} / "random.log" + }; + clp::FileReader file_reader; + file_reader.open(test_input_path.string()); + auto const expected{read_content(file_reader)}; + file_reader.close(); + + clp::ReadOnlyMemoryMappedFile const mmap_file{test_input_path.string()}; + auto const view{mmap_file.get_view()}; + REQUIRE((view.size() == expected.size())); + REQUIRE(std::equal(view.begin(), view.end(), expected.cbegin())); +} + +TEST_CASE("memory_mapped_file_view_empty", "[ReadOnlyMemoryMappedFile]") { + auto const test_input_path{ + get_test_dir() / std::filesystem::path{"test_schema_files"} / "empty_schema.txt" + }; + + clp::ReadOnlyMemoryMappedFile const mmap_file{test_input_path.string()}; + auto const view{mmap_file.get_view()}; + REQUIRE(view.empty()); +} diff --git a/components/core/tests/test-StreamingCompression.cpp b/components/core/tests/test-StreamingCompression.cpp index cad66d028..b43316b3f 100644 --- a/components/core/tests/test-StreamingCompression.cpp +++ b/components/core/tests/test-StreamingCompression.cpp @@ -1,6 +1,7 @@ #include #include +#include #include #include