Skip to content

Commit

Permalink
clp-core: Refactor FileReader to use RAII. (#496)
Browse files Browse the repository at this point in the history
  • Loading branch information
haiqi96 authored Aug 14, 2024
1 parent f05264e commit a89ff14
Show file tree
Hide file tree
Showing 14 changed files with 101 additions and 302 deletions.
33 changes: 19 additions & 14 deletions components/core/src/clp/DictionaryReader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,8 @@ class DictionaryReader {

// Variables
bool m_is_open;
FileReader m_dictionary_file_reader;
FileReader m_segment_index_file_reader;
std::unique_ptr<FileReader> m_dictionary_file_reader;
std::unique_ptr<FileReader> m_segment_index_file_reader;
#if USE_PASSTHROUGH_COMPRESSION
streaming_compression::passthrough::Decompressor m_dictionary_decompressor;
streaming_compression::passthrough::Decompressor m_segment_index_decompressor;
Expand All @@ -133,14 +133,19 @@ void DictionaryReader<DictionaryIdType, EntryType>::open(

constexpr size_t cDecompressorFileReadBufferCapacity = 64 * 1024; // 64 KB

open_dictionary_for_reading(
dictionary_path,
segment_index_path,
cDecompressorFileReadBufferCapacity,
m_dictionary_file_reader,
m_dictionary_decompressor,
m_segment_index_file_reader,
m_segment_index_decompressor
m_dictionary_file_reader = make_unique<FileReader>(dictionary_path);

// Skip header and then open the decompressor
m_dictionary_file_reader->seek_from_begin(sizeof(uint64_t));
m_dictionary_decompressor.open(*m_dictionary_file_reader, cDecompressorFileReadBufferCapacity);

m_segment_index_file_reader = make_unique<FileReader>(segment_index_path);

// Skip header and then open the decompressor
m_segment_index_file_reader->seek_from_begin(sizeof(uint64_t));
m_segment_index_decompressor.open(
*m_segment_index_file_reader,
cDecompressorFileReadBufferCapacity
);

m_is_open = true;
Expand All @@ -153,9 +158,9 @@ void DictionaryReader<DictionaryIdType, EntryType>::close() {
}

m_segment_index_decompressor.close();
m_segment_index_file_reader.close();
m_segment_index_file_reader.reset();
m_dictionary_decompressor.close();
m_dictionary_file_reader.close();
m_dictionary_file_reader.reset();

m_num_segments_read_from_index = 0;
m_entries.clear();
Expand All @@ -170,7 +175,7 @@ void DictionaryReader<DictionaryIdType, EntryType>::read_new_entries() {
}

// Read dictionary header
auto num_dictionary_entries = read_dictionary_header(m_dictionary_file_reader);
auto num_dictionary_entries = read_dictionary_header(*m_dictionary_file_reader);

// Validate dictionary header
if (num_dictionary_entries < m_entries.size()) {
Expand All @@ -190,7 +195,7 @@ void DictionaryReader<DictionaryIdType, EntryType>::read_new_entries() {
}

// Read segment index header
auto num_segments = read_segment_index_header(m_segment_index_file_reader);
auto num_segments = read_segment_index_header(*m_segment_index_file_reader);

// Validate segment index header
if (num_segments < m_num_segments_read_from_index) {
Expand Down
94 changes: 0 additions & 94 deletions components/core/src/clp/DictionaryWriter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,9 @@

#include <string>
#include <unordered_map>
#include <unordered_set>
#include <vector>

#include "ArrayBackedPosIntSet.hpp"
#include "Defs.h"
#include "dictionary_utils.hpp"
#include "FileWriter.hpp"
#include "spdlog_with_specializations.hpp"
#include "streaming_compression/passthrough/Compressor.hpp"
Expand Down Expand Up @@ -63,18 +60,6 @@ class DictionaryWriter {
*/
void write_header_and_flush_to_disk();

/**
* Opens dictionary, loads entries, and then sets it up for writing
* @param dictionary_path
* @param segment_index_path
* @param max_id
*/
void open_and_preload(
std::string const& dictionary_path,
std::string const& segment_index_path,
variable_dictionary_id_t max_id
);

/**
* Adds the given segment and IDs to the segment index
* @param segment_id
Expand Down Expand Up @@ -191,85 +176,6 @@ void DictionaryWriter<DictionaryIdType, EntryType>::write_header_and_flush_to_di
m_dictionary_file_writer.flush();
}

template <typename DictionaryIdType, typename EntryType>
void DictionaryWriter<DictionaryIdType, EntryType>::open_and_preload(
std::string const& dictionary_path,
std::string const& segment_index_path,
variable_dictionary_id_t const max_id
) {
if (m_is_open) {
throw OperationFailed(ErrorCode_NotReady, __FILENAME__, __LINE__);
}

m_max_id = max_id;

FileReader dictionary_file_reader;
FileReader segment_index_file_reader;
#if USE_PASSTHROUGH_COMPRESSION
streaming_compression::passthrough::Decompressor dictionary_decompressor;
streaming_compression::passthrough::Decompressor segment_index_decompressor;
#elif USE_ZSTD_COMPRESSION
streaming_compression::zstd::Decompressor dictionary_decompressor;
streaming_compression::zstd::Decompressor segment_index_decompressor;
#else
static_assert(false, "Unsupported compression mode.");
#endif
constexpr size_t cDecompressorFileReadBufferCapacity = 64 * 1024; // 64 KB
open_dictionary_for_reading(
dictionary_path,
segment_index_path,
cDecompressorFileReadBufferCapacity,
dictionary_file_reader,
dictionary_decompressor,
segment_index_file_reader,
segment_index_decompressor
);

auto num_dictionary_entries = read_dictionary_header(dictionary_file_reader);
if (num_dictionary_entries > m_max_id) {
SPDLOG_ERROR("DictionaryWriter ran out of IDs.");
throw OperationFailed(ErrorCode_OutOfBounds, __FILENAME__, __LINE__);
}
// Loads entries from the given dictionary file
EntryType entry;
for (size_t i = 0; i < num_dictionary_entries; ++i) {
entry.clear();
entry.read_from_file(dictionary_decompressor);
auto const& str_value = entry.get_value();
if (m_value_to_id.count(str_value)) {
SPDLOG_ERROR("Entry's value already exists in dictionary");
throw OperationFailed(ErrorCode_Corrupt, __FILENAME__, __LINE__);
}

m_value_to_id[str_value] = entry.get_id();
;
m_data_size += entry.get_data_size();
}

m_next_id = num_dictionary_entries;

segment_index_decompressor.close();
segment_index_file_reader.close();
dictionary_decompressor.close();
dictionary_file_reader.close();

m_dictionary_file_writer.open(
dictionary_path,
FileWriter::OpenMode::CREATE_IF_NONEXISTENT_FOR_SEEKABLE_WRITING
);
// Open compressor
m_dictionary_compressor.open(m_dictionary_file_writer);

m_segment_index_file_writer.open(
segment_index_path,
FileWriter::OpenMode::CREATE_IF_NONEXISTENT_FOR_SEEKABLE_WRITING
);
// Open compressor
m_segment_index_compressor.open(m_segment_index_file_writer);

m_is_open = true;
}

template <typename DictionaryIdType, typename EntryType>
void DictionaryWriter<DictionaryIdType, EntryType>::index_segment(
segment_id_t segment_id,
Expand Down
72 changes: 18 additions & 54 deletions components/core/src/clp/FileReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,26 @@
using std::string;

namespace clp {
FileReader::FileReader(string const& path) : m_file{fopen(path.c_str(), "rb")} {
if (nullptr == m_file) {
if (ENOENT == errno) {
throw OperationFailed(ErrorCode_FileNotFound, __FILE__, __LINE__);
}
throw OperationFailed(ErrorCode_errno, __FILE__, __LINE__);
}
m_path = path;
}

FileReader::~FileReader() {
close();
if (nullptr != m_file) {
// NOTE: We don't check errors for fclose since it seems the only reason it could fail is
// if it was interrupted by a signal
fclose(m_file);
}
free(m_getdelim_buf);
}

ErrorCode FileReader::try_read(char* buf, size_t num_bytes_to_read, size_t& num_bytes_read) {
if (nullptr == m_file) {
return ErrorCode_NotInit;
}
if (nullptr == buf) {
return ErrorCode_BadParam;
}
Expand All @@ -40,10 +51,6 @@ ErrorCode FileReader::try_read(char* buf, size_t num_bytes_to_read, size_t& num_
}

ErrorCode FileReader::try_seek_from_begin(size_t pos) {
if (nullptr == m_file) {
return ErrorCode_NotInit;
}

int retval = fseeko(m_file, pos, SEEK_SET);
if (0 != retval) {
return ErrorCode_errno;
Expand All @@ -53,10 +60,6 @@ ErrorCode FileReader::try_seek_from_begin(size_t pos) {
}

ErrorCode FileReader::try_get_pos(size_t& pos) {
if (nullptr == m_file) {
return ErrorCode_NotInit;
}

pos = ftello(m_file);
if ((off_t)-1 == pos) {
return ErrorCode_errno;
Expand All @@ -65,49 +68,14 @@ ErrorCode FileReader::try_get_pos(size_t& pos) {
return ErrorCode_Success;
}

ErrorCode FileReader::try_open(string const& path) {
// Cleanup in case caller forgot to call close before calling this function
close();

m_file = fopen(path.c_str(), "rb");
if (nullptr == m_file) {
if (ENOENT == errno) {
return ErrorCode_FileNotFound;
}
return ErrorCode_errno;
}
m_path = path;

return ErrorCode_Success;
}

void FileReader::open(string const& path) {
ErrorCode error_code = try_open(path);
if (ErrorCode_Success != error_code) {
if (ErrorCode_FileNotFound == error_code) {
throw "File not found: " + boost::filesystem::weakly_canonical(path).string() + "\n";
} else {
throw OperationFailed(error_code, __FILENAME__, __LINE__);
}
}
}

void FileReader::close() {
if (m_file != nullptr) {
// NOTE: We don't check errors for fclose since it seems the only reason it could fail is if
// it was interrupted by a signal
fclose(m_file);
m_file = nullptr;
}
}

ErrorCode
FileReader::try_read_to_delimiter(char delim, bool keep_delimiter, bool append, string& str) {
assert(nullptr != m_file);

if (false == append) {
str.clear();
}

// NOTE: If `m_getdelim_buf` is a null pointer or if `m_getdelim_buf_len` is insufficient in
// size, `getdelim` will malloc or realloc enough memory, respectively, to hold the characters.
ssize_t num_bytes_read = getdelim(&m_getdelim_buf, &m_getdelim_buf_len, delim, m_file);
if (num_bytes_read < 1) {
if (ferror(m_file)) {
Expand All @@ -125,10 +93,6 @@ FileReader::try_read_to_delimiter(char delim, bool keep_delimiter, bool append,
}

ErrorCode FileReader::try_fstat(struct stat& stat_buffer) {
if (nullptr == m_file) {
throw OperationFailed(ErrorCode_NotInit, __FILENAME__, __LINE__);
}

auto return_value = fstat(fileno(m_file), &stat_buffer);
if (0 != return_value) {
return ErrorCode_errno;
Expand Down
33 changes: 4 additions & 29 deletions components/core/src/clp/FileReader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,23 +25,21 @@ class FileReader : public ReaderInterface {
char const* what() const noexcept override { return "FileReader operation failed"; }
};

FileReader() : m_file(nullptr), m_getdelim_buf_len(0), m_getdelim_buf(nullptr) {}
FileReader(std::string const& path);

~FileReader();

// Methods implementing the ReaderInterface
/**
* Tries to get the current position of the read head in the file
* @param pos Position of the read head in the file
* @return ErrorCode_NotInit if the file is not open
* @return ErrorCode_errno on error
* @return ErrorCode_Success on success
*/
ErrorCode try_get_pos(size_t& pos) override;
/**
* Tries to seek from the beginning of the file to the given position
* @param pos
* @return ErrorCode_NotInit if the file is not open
* @return ErrorCode_errno on error
* @return ErrorCode_Success on success
*/
Expand All @@ -52,7 +50,6 @@ class FileReader : public ReaderInterface {
* @param buf
* @param num_bytes_to_read The number of bytes to try and read
* @param num_bytes_read The actual number of bytes read
* @return ErrorCode_NotInit if the file is not open
* @return ErrorCode_BadParam if buf is invalid
* @return ErrorCode_errno on error
* @return ErrorCode_EndOfFile on EOF
Expand All @@ -73,28 +70,6 @@ class FileReader : public ReaderInterface {
ErrorCode
try_read_to_delimiter(char delim, bool keep_delimiter, bool append, std::string& str) override;

// Methods
bool is_open() const { return m_file != nullptr; }

/**
* Tries to open a file
* @param path
* @return ErrorCode_Success on success
* @return ErrorCode_FileNotFound if the file was not found
* @return ErrorCode_errno otherwise
*/
ErrorCode try_open(std::string const& path);
/**
* Opens a file
* @param path
* @throw FileReader::OperationFailed on failure
*/
void open(std::string const& path);
/**
* Closes the file if it's open
*/
void close();

[[nodiscard]] std::string const& get_path() const { return m_path; }

/**
Expand All @@ -106,9 +81,9 @@ class FileReader : public ReaderInterface {
ErrorCode try_fstat(struct stat& stat_buffer);

private:
FILE* m_file;
size_t m_getdelim_buf_len;
char* m_getdelim_buf;
FILE* m_file{nullptr};
size_t m_getdelim_buf_len{0};
char* m_getdelim_buf{nullptr};
std::string m_path;
};
} // namespace clp
Expand Down
Loading

0 comments on commit a89ff14

Please sign in to comment.