Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

clp-core: Refactor FileReader to use RAII. #496

Merged
merged 24 commits into from
Aug 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 19 additions & 14 deletions components/core/src/clp/DictionaryReader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,8 @@ class DictionaryReader {

// Variables
bool m_is_open;
FileReader m_dictionary_file_reader;
FileReader m_segment_index_file_reader;
std::unique_ptr<FileReader> m_dictionary_file_reader;
std::unique_ptr<FileReader> m_segment_index_file_reader;
#if USE_PASSTHROUGH_COMPRESSION
streaming_compression::passthrough::Decompressor m_dictionary_decompressor;
streaming_compression::passthrough::Decompressor m_segment_index_decompressor;
Expand All @@ -133,14 +133,19 @@ void DictionaryReader<DictionaryIdType, EntryType>::open(

constexpr size_t cDecompressorFileReadBufferCapacity = 64 * 1024; // 64 KB

open_dictionary_for_reading(
dictionary_path,
segment_index_path,
cDecompressorFileReadBufferCapacity,
m_dictionary_file_reader,
m_dictionary_decompressor,
m_segment_index_file_reader,
m_segment_index_decompressor
m_dictionary_file_reader = make_unique<FileReader>(dictionary_path);

// Skip header and then open the decompressor
m_dictionary_file_reader->seek_from_begin(sizeof(uint64_t));
m_dictionary_decompressor.open(*m_dictionary_file_reader, cDecompressorFileReadBufferCapacity);

m_segment_index_file_reader = make_unique<FileReader>(segment_index_path);

// Skip header and then open the decompressor
m_segment_index_file_reader->seek_from_begin(sizeof(uint64_t));
m_segment_index_decompressor.open(
*m_segment_index_file_reader,
cDecompressorFileReadBufferCapacity
);

m_is_open = true;
Expand All @@ -153,9 +158,9 @@ void DictionaryReader<DictionaryIdType, EntryType>::close() {
}

m_segment_index_decompressor.close();
m_segment_index_file_reader.close();
m_segment_index_file_reader.reset();
m_dictionary_decompressor.close();
m_dictionary_file_reader.close();
m_dictionary_file_reader.reset();

m_num_segments_read_from_index = 0;
m_entries.clear();
Expand All @@ -170,7 +175,7 @@ void DictionaryReader<DictionaryIdType, EntryType>::read_new_entries() {
}

// Read dictionary header
auto num_dictionary_entries = read_dictionary_header(m_dictionary_file_reader);
auto num_dictionary_entries = read_dictionary_header(*m_dictionary_file_reader);

// Validate dictionary header
if (num_dictionary_entries < m_entries.size()) {
Expand All @@ -190,7 +195,7 @@ void DictionaryReader<DictionaryIdType, EntryType>::read_new_entries() {
}

// Read segment index header
auto num_segments = read_segment_index_header(m_segment_index_file_reader);
auto num_segments = read_segment_index_header(*m_segment_index_file_reader);

// Validate segment index header
if (num_segments < m_num_segments_read_from_index) {
Expand Down
94 changes: 0 additions & 94 deletions components/core/src/clp/DictionaryWriter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,9 @@

#include <string>
#include <unordered_map>
#include <unordered_set>
#include <vector>

#include "ArrayBackedPosIntSet.hpp"
#include "Defs.h"
#include "dictionary_utils.hpp"
#include "FileWriter.hpp"
#include "spdlog_with_specializations.hpp"
#include "streaming_compression/passthrough/Compressor.hpp"
Expand Down Expand Up @@ -63,18 +60,6 @@ class DictionaryWriter {
*/
void write_header_and_flush_to_disk();

/**
* Opens dictionary, loads entries, and then sets it up for writing
* @param dictionary_path
* @param segment_index_path
* @param max_id
*/
void open_and_preload(
std::string const& dictionary_path,
std::string const& segment_index_path,
variable_dictionary_id_t max_id
);

/**
* Adds the given segment and IDs to the segment index
* @param segment_id
Expand Down Expand Up @@ -191,85 +176,6 @@ void DictionaryWriter<DictionaryIdType, EntryType>::write_header_and_flush_to_di
m_dictionary_file_writer.flush();
}

template <typename DictionaryIdType, typename EntryType>
void DictionaryWriter<DictionaryIdType, EntryType>::open_and_preload(
std::string const& dictionary_path,
std::string const& segment_index_path,
variable_dictionary_id_t const max_id
) {
if (m_is_open) {
throw OperationFailed(ErrorCode_NotReady, __FILENAME__, __LINE__);
}

m_max_id = max_id;

FileReader dictionary_file_reader;
FileReader segment_index_file_reader;
#if USE_PASSTHROUGH_COMPRESSION
streaming_compression::passthrough::Decompressor dictionary_decompressor;
streaming_compression::passthrough::Decompressor segment_index_decompressor;
#elif USE_ZSTD_COMPRESSION
streaming_compression::zstd::Decompressor dictionary_decompressor;
streaming_compression::zstd::Decompressor segment_index_decompressor;
#else
static_assert(false, "Unsupported compression mode.");
#endif
constexpr size_t cDecompressorFileReadBufferCapacity = 64 * 1024; // 64 KB
open_dictionary_for_reading(
dictionary_path,
segment_index_path,
cDecompressorFileReadBufferCapacity,
dictionary_file_reader,
dictionary_decompressor,
segment_index_file_reader,
segment_index_decompressor
);

auto num_dictionary_entries = read_dictionary_header(dictionary_file_reader);
if (num_dictionary_entries > m_max_id) {
SPDLOG_ERROR("DictionaryWriter ran out of IDs.");
throw OperationFailed(ErrorCode_OutOfBounds, __FILENAME__, __LINE__);
}
// Loads entries from the given dictionary file
EntryType entry;
for (size_t i = 0; i < num_dictionary_entries; ++i) {
entry.clear();
entry.read_from_file(dictionary_decompressor);
auto const& str_value = entry.get_value();
if (m_value_to_id.count(str_value)) {
SPDLOG_ERROR("Entry's value already exists in dictionary");
throw OperationFailed(ErrorCode_Corrupt, __FILENAME__, __LINE__);
}

m_value_to_id[str_value] = entry.get_id();
;
m_data_size += entry.get_data_size();
}

m_next_id = num_dictionary_entries;

segment_index_decompressor.close();
segment_index_file_reader.close();
dictionary_decompressor.close();
dictionary_file_reader.close();

m_dictionary_file_writer.open(
dictionary_path,
FileWriter::OpenMode::CREATE_IF_NONEXISTENT_FOR_SEEKABLE_WRITING
);
// Open compressor
m_dictionary_compressor.open(m_dictionary_file_writer);

m_segment_index_file_writer.open(
segment_index_path,
FileWriter::OpenMode::CREATE_IF_NONEXISTENT_FOR_SEEKABLE_WRITING
);
// Open compressor
m_segment_index_compressor.open(m_segment_index_file_writer);

m_is_open = true;
}

template <typename DictionaryIdType, typename EntryType>
void DictionaryWriter<DictionaryIdType, EntryType>::index_segment(
segment_id_t segment_id,
Expand Down
72 changes: 18 additions & 54 deletions components/core/src/clp/FileReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,26 @@
using std::string;

namespace clp {
FileReader::FileReader(string const& path) : m_file{fopen(path.c_str(), "rb")} {
if (nullptr == m_file) {
if (ENOENT == errno) {
throw OperationFailed(ErrorCode_FileNotFound, __FILE__, __LINE__);
}
throw OperationFailed(ErrorCode_errno, __FILE__, __LINE__);
}
m_path = path;
}

FileReader::~FileReader() {
close();
if (nullptr != m_file) {
// NOTE: We don't check errors for fclose since it seems the only reason it could fail is
// if it was interrupted by a signal
fclose(m_file);
}
free(m_getdelim_buf);
}

ErrorCode FileReader::try_read(char* buf, size_t num_bytes_to_read, size_t& num_bytes_read) {
if (nullptr == m_file) {
return ErrorCode_NotInit;
}
if (nullptr == buf) {
return ErrorCode_BadParam;
}
Expand All @@ -40,10 +51,6 @@ ErrorCode FileReader::try_read(char* buf, size_t num_bytes_to_read, size_t& num_
}

ErrorCode FileReader::try_seek_from_begin(size_t pos) {
if (nullptr == m_file) {
return ErrorCode_NotInit;
}

int retval = fseeko(m_file, pos, SEEK_SET);
if (0 != retval) {
return ErrorCode_errno;
Expand All @@ -53,10 +60,6 @@ ErrorCode FileReader::try_seek_from_begin(size_t pos) {
}

ErrorCode FileReader::try_get_pos(size_t& pos) {
if (nullptr == m_file) {
return ErrorCode_NotInit;
}

pos = ftello(m_file);
if ((off_t)-1 == pos) {
return ErrorCode_errno;
Expand All @@ -65,49 +68,14 @@ ErrorCode FileReader::try_get_pos(size_t& pos) {
return ErrorCode_Success;
}

ErrorCode FileReader::try_open(string const& path) {
// Cleanup in case caller forgot to call close before calling this function
close();

m_file = fopen(path.c_str(), "rb");
if (nullptr == m_file) {
if (ENOENT == errno) {
return ErrorCode_FileNotFound;
}
return ErrorCode_errno;
}
m_path = path;

return ErrorCode_Success;
}

void FileReader::open(string const& path) {
ErrorCode error_code = try_open(path);
if (ErrorCode_Success != error_code) {
if (ErrorCode_FileNotFound == error_code) {
throw "File not found: " + boost::filesystem::weakly_canonical(path).string() + "\n";
} else {
throw OperationFailed(error_code, __FILENAME__, __LINE__);
}
}
}

void FileReader::close() {
if (m_file != nullptr) {
// NOTE: We don't check errors for fclose since it seems the only reason it could fail is if
// it was interrupted by a signal
fclose(m_file);
m_file = nullptr;
}
}

ErrorCode
FileReader::try_read_to_delimiter(char delim, bool keep_delimiter, bool append, string& str) {
assert(nullptr != m_file);

if (false == append) {
str.clear();
}

// NOTE: If `m_getdelim_buf` is a null pointer or if `m_getdelim_buf_len` is insufficient in
// size, `getdelim` will malloc or realloc enough memory, respectively, to hold the characters.
ssize_t num_bytes_read = getdelim(&m_getdelim_buf, &m_getdelim_buf_len, delim, m_file);
if (num_bytes_read < 1) {
if (ferror(m_file)) {
Expand All @@ -125,10 +93,6 @@ FileReader::try_read_to_delimiter(char delim, bool keep_delimiter, bool append,
}

ErrorCode FileReader::try_fstat(struct stat& stat_buffer) {
if (nullptr == m_file) {
throw OperationFailed(ErrorCode_NotInit, __FILENAME__, __LINE__);
}

auto return_value = fstat(fileno(m_file), &stat_buffer);
if (0 != return_value) {
return ErrorCode_errno;
Expand Down
33 changes: 4 additions & 29 deletions components/core/src/clp/FileReader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,23 +25,21 @@ class FileReader : public ReaderInterface {
char const* what() const noexcept override { return "FileReader operation failed"; }
};

FileReader() : m_file(nullptr), m_getdelim_buf_len(0), m_getdelim_buf(nullptr) {}
FileReader(std::string const& path);
haiqi96 marked this conversation as resolved.
Show resolved Hide resolved

~FileReader();

// Methods implementing the ReaderInterface
/**
* Tries to get the current position of the read head in the file
* @param pos Position of the read head in the file
* @return ErrorCode_NotInit if the file is not open
* @return ErrorCode_errno on error
* @return ErrorCode_Success on success
*/
ErrorCode try_get_pos(size_t& pos) override;
/**
* Tries to seek from the beginning of the file to the given position
* @param pos
* @return ErrorCode_NotInit if the file is not open
* @return ErrorCode_errno on error
* @return ErrorCode_Success on success
*/
Expand All @@ -52,7 +50,6 @@ class FileReader : public ReaderInterface {
* @param buf
* @param num_bytes_to_read The number of bytes to try and read
* @param num_bytes_read The actual number of bytes read
* @return ErrorCode_NotInit if the file is not open
* @return ErrorCode_BadParam if buf is invalid
* @return ErrorCode_errno on error
* @return ErrorCode_EndOfFile on EOF
Expand All @@ -73,28 +70,6 @@ class FileReader : public ReaderInterface {
ErrorCode
try_read_to_delimiter(char delim, bool keep_delimiter, bool append, std::string& str) override;

// Methods
bool is_open() const { return m_file != nullptr; }

/**
* Tries to open a file
* @param path
* @return ErrorCode_Success on success
* @return ErrorCode_FileNotFound if the file was not found
* @return ErrorCode_errno otherwise
*/
ErrorCode try_open(std::string const& path);
/**
* Opens a file
* @param path
* @throw FileReader::OperationFailed on failure
*/
void open(std::string const& path);
/**
* Closes the file if it's open
*/
void close();

[[nodiscard]] std::string const& get_path() const { return m_path; }

/**
Expand All @@ -106,9 +81,9 @@ class FileReader : public ReaderInterface {
ErrorCode try_fstat(struct stat& stat_buffer);

private:
FILE* m_file;
size_t m_getdelim_buf_len;
char* m_getdelim_buf;
FILE* m_file{nullptr};
size_t m_getdelim_buf_len{0};
char* m_getdelim_buf{nullptr};
std::string m_path;
};
} // namespace clp
Expand Down
Loading
Loading