Skip to content

Commit

Permalink
Rename TableReader -> PackedStreamReader and do associated variable r…
Browse files Browse the repository at this point in the history
…enaming
  • Loading branch information
gibber9809 committed Aug 13, 2024
1 parent 2c68328 commit 88337df
Show file tree
Hide file tree
Showing 10 changed files with 276 additions and 268 deletions.
62 changes: 31 additions & 31 deletions components/core/src/clp_s/ArchiveReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ void ArchiveReader::open(string_view archives_dir, string_view archive_id) {
m_schema_map = ReaderUtils::read_schemas(archive_path_str);

m_table_metadata_file_reader.open(archive_path_str + constants::cArchiveTableMetadataFile);
m_table_reader.open_tables(archive_path_str + constants::cArchiveTablesFile);
m_stream_reader.open_packed_streams(archive_path_str + constants::cArchiveTablesFile);
}

void ArchiveReader::read_metadata() {
Expand All @@ -38,7 +38,7 @@ void ArchiveReader::read_metadata() {
cDecompressorFileReadBufferCapacity
);

m_table_reader.read_metadata(m_table_metadata_decompressor);
m_stream_reader.read_metadata(m_table_metadata_decompressor);

size_t num_schemas;
if (auto error = m_table_metadata_decompressor.try_read_numeric_value(num_schemas);
Expand All @@ -51,24 +51,24 @@ void ArchiveReader::read_metadata() {
SchemaReader::SchemaMetadata prev_metadata{};
int32_t prev_schema_id{};
for (size_t i = 0; i < num_schemas; ++i) {
size_t table_id;
size_t table_offset;
size_t stream_id;
size_t stream_offset;
int32_t schema_id;
size_t num_messages;

if (auto error = m_table_metadata_decompressor.try_read_numeric_value(table_id);
if (auto error = m_table_metadata_decompressor.try_read_numeric_value(stream_id);
ErrorCodeSuccess != error)
{
throw OperationFailed(error, __FILENAME__, __LINE__);
}

if (auto error = m_table_metadata_decompressor.try_read_numeric_value(table_offset);
if (auto error = m_table_metadata_decompressor.try_read_numeric_value(stream_offset);
ErrorCodeSuccess != error)
{
throw OperationFailed(error, __FILENAME__, __LINE__);
}

if (table_offset > m_table_reader.get_uncompressed_table_size(table_id)) {
if (stream_offset > m_stream_reader.get_uncompressed_stream_size(stream_id)) {
throw OperationFailed(ErrorCodeCorrupt, __FILENAME__, __LINE__);
}

Expand All @@ -86,25 +86,25 @@ void ArchiveReader::read_metadata() {

if (prev_metadata_initialized) {
size_t uncompressed_size{0};
if (table_id != prev_metadata.table_id) {
if (stream_id != prev_metadata.stream_id) {
uncompressed_size
= m_table_reader.get_uncompressed_table_size(prev_metadata.table_id)
- prev_metadata.table_offset;
= m_stream_reader.get_uncompressed_stream_size(prev_metadata.stream_id)
- prev_metadata.stream_offset;
} else {
uncompressed_size = table_offset - prev_metadata.table_offset;
uncompressed_size = stream_offset - prev_metadata.stream_offset;
}
prev_metadata.uncompressed_size = uncompressed_size;
m_id_to_schema_metadata[prev_schema_id] = prev_metadata;
} else {
prev_metadata_initialized = true;
}
prev_metadata = {table_id, table_offset, num_messages, 0};
prev_metadata = {stream_id, stream_offset, num_messages, 0};
prev_schema_id = schema_id;
m_schema_ids.push_back(schema_id);
}
prev_metadata.uncompressed_size
= m_table_reader.get_uncompressed_table_size(prev_metadata.table_id)
- prev_metadata.table_offset;
= m_stream_reader.get_uncompressed_stream_size(prev_metadata.stream_id)
- prev_metadata.stream_offset;
m_id_to_schema_metadata[prev_schema_id] = prev_metadata;
m_table_metadata_decompressor.close();
}
Expand Down Expand Up @@ -134,9 +134,9 @@ SchemaReader& ArchiveReader::read_schema_table(
);

auto& schema_metadata = m_id_to_schema_metadata[schema_id];
auto table_buffer = read_table(schema_metadata.table_id, true);
auto stream_buffer = read_stream(schema_metadata.stream_id, true);
m_schema_reader
.load(table_buffer, schema_metadata.table_offset, schema_metadata.uncompressed_size);
.load(stream_buffer, schema_metadata.stream_offset, schema_metadata.uncompressed_size);
return m_schema_reader;
}

Expand All @@ -147,10 +147,10 @@ std::vector<std::shared_ptr<SchemaReader>> ArchiveReader::read_all_tables() {
auto schema_reader = std::make_shared<SchemaReader>();
initialize_schema_reader(*schema_reader, schema_id, true, true);
auto& schema_metadata = m_id_to_schema_metadata[schema_id];
auto table_buffer = read_table(schema_metadata.table_id, false);
auto stream_buffer = read_stream(schema_metadata.stream_id, false);
schema_reader->load(
table_buffer,
schema_metadata.table_offset,
stream_buffer,
schema_metadata.stream_offset,
schema_metadata.uncompressed_size
);
readers.push_back(std::move(schema_reader));
Expand Down Expand Up @@ -325,28 +325,28 @@ void ArchiveReader::close() {
m_array_dict->close();
m_timestamp_dict->close();

m_table_reader.close();
m_stream_reader.close();
m_table_metadata_file_reader.close();

m_id_to_schema_metadata.clear();
m_schema_ids.clear();
m_cur_table_id = 0;
m_table_buffer.reset();
m_table_buffer_size = 0ULL;
m_cur_stream_id = 0;
m_stream_buffer.reset();
m_stream_buffer_size = 0ULL;
}

std::shared_ptr<char[]> ArchiveReader::read_table(size_t table_id, bool reuse_buffer) {
if (nullptr != m_table_buffer && m_cur_table_id == table_id) {
return m_table_buffer;
std::shared_ptr<char[]> ArchiveReader::read_stream(size_t stream_id, bool reuse_buffer) {
if (nullptr != m_stream_buffer && m_cur_stream_id == stream_id) {
return m_stream_buffer;
}

if (false == reuse_buffer) {
m_table_buffer.reset();
m_table_buffer_size = 0;
m_stream_buffer.reset();
m_stream_buffer_size = 0;
}

m_table_reader.read_table(table_id, m_table_buffer, m_table_buffer_size);
m_cur_table_id = table_id;
return m_table_buffer;
m_stream_reader.read_stream(stream_id, m_stream_buffer, m_stream_buffer_size);
m_cur_stream_id = stream_id;
return m_stream_buffer;
}
} // namespace clp_s
22 changes: 11 additions & 11 deletions components/core/src/clp_s/ArchiveReader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@
#include <boost/filesystem.hpp>

#include "DictionaryReader.hpp"
#include "PackedStreamReader.hpp"
#include "ReaderUtils.hpp"
#include "SchemaReader.hpp"
#include "TableReader.hpp"
#include "TimestampDictionaryReader.hpp"
#include "Utils.hpp"

Expand Down Expand Up @@ -176,16 +176,16 @@ class ArchiveReader {
);

/**
* Reads a table with given ID from the table reader. If read_table is called multiple times in
* a row for the same table_id a cached buffer is returned. This function allows the caller to
* Reads a table with given ID from the table reader. If read_stream is called multiple times in
* a row for the same stream_id a cached buffer is returned. This function allows the caller to
* ask for the same buffer to be reused to read multiple different tables: this can save memory
* allocations, but can only be used when tables are read one at a time.
* @param table_id
* @param stream_id
* @param reuse_buffer when true the same buffer is reused across invocations, overwriting data
* returned previous calls to read_table
* @return a buffer containing the decompressed table identified by table_id
* returned previous calls to read_stream
* @return a buffer containing the decompressed stream identified by stream_id
*/
std::shared_ptr<char[]> read_table(size_t table_id, bool reuse_buffer);
std::shared_ptr<char[]> read_stream(size_t stream_id, bool reuse_buffer);

bool m_is_open;
std::string m_archive_id;
Expand All @@ -199,13 +199,13 @@ class ArchiveReader {
std::vector<int32_t> m_schema_ids;
std::map<int32_t, SchemaReader::SchemaMetadata> m_id_to_schema_metadata;

TableReader m_table_reader;
PackedStreamReader m_stream_reader;
FileReader m_table_metadata_file_reader;
ZstdDecompressor m_table_metadata_decompressor;
SchemaReader m_schema_reader;
std::shared_ptr<char[]> m_table_buffer{};
size_t m_table_buffer_size{0ULL};
size_t m_cur_table_id{0ULL};
std::shared_ptr<char[]> m_stream_buffer{};
size_t m_stream_buffer_size{0ULL};
size_t m_cur_stream_id{0ULL};
};
} // namespace clp_s

Expand Down
28 changes: 14 additions & 14 deletions components/core/src/clp_s/ArchiveWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,26 +144,26 @@ size_t ArchiveWriter::store_tables() {
m_table_metadata_compressor.open(m_table_metadata_file_writer, m_compression_level);

/**
* Table metadata schema
* # num tables <64 bit>
* Packed stream metadata schema
* # num packed streams <64 bit>
* # [offset into file <64 bit> uncompressed size <64 bit>]+
* # num schemas <64 bit>
* # [table id <64 bit> offset into stream <64 bit> schema id <32 bit> num messages <64 bit>]+
* # [stream id <64 bit> offset into stream <64 bit> schema id <32 bit> num messages <64 bit>]+
*
* Schema tables are packed into a series of compression streams. Each of those compression
* streams is identified by a 64 bit table id. In the first half of the metadata we identify
* streams is identified by a 64 bit stream id. In the first half of the metadata we identify
* how many streams there are, and the offset into the file where each compression stream can
* be found. In the second half of the metadata we record how many schema tables there are,
* which compression stream they belong to, the offset into that compression stream where
* they can be found, and how many messages that schema table contains.
*
* We buffer the first half of the metadata in the "table_metadata" vector, and the second half
* We buffer the first half of the metadata in the "stream_metadata" vector, and the second half
* of the metadata in the "schema_metadata" vector as we compress the tables. The metadata is
* flushed once all of the schema tables have been compressed.
*/
using schema_map_it = decltype(m_id_to_schema_writer)::iterator;
std::vector<schema_map_it> schemas;
std::vector<std::tuple<size_t, size_t>> table_metadata;
std::vector<std::tuple<size_t, size_t>> stream_metadata;
std::vector<std::tuple<size_t, size_t, int32_t, size_t>> schema_metadata;

schema_metadata.reserve(m_id_to_schema_writer.size());
Expand All @@ -178,13 +178,13 @@ size_t ArchiveWriter::store_tables() {
std::sort(schemas.begin(), schemas.end(), comp);

size_t current_stream_offset = 0;
size_t current_table_id = 0;
size_t current_stream_id = 0;
size_t current_table_file_offset = 0;
m_tables_compressor.open(m_tables_file_writer, m_compression_level);
for (auto it : schemas) {
it->second->store(m_tables_compressor);
schema_metadata.emplace_back(
current_table_id,
current_stream_id,
current_stream_offset,
it->first,
it->second->get_num_messages()
Expand All @@ -193,10 +193,10 @@ size_t ArchiveWriter::store_tables() {
delete it->second;

if (current_stream_offset > m_min_table_size || schemas.size() == schema_metadata.size()) {
table_metadata.emplace_back(current_table_file_offset, current_stream_offset);
stream_metadata.emplace_back(current_table_file_offset, current_stream_offset);
m_tables_compressor.close();
current_stream_offset = 0;
++current_table_id;
++current_stream_id;
current_table_file_offset = m_tables_file_writer.get_pos();

if (schemas.size() != schema_metadata.size()) {
Expand All @@ -205,15 +205,15 @@ size_t ArchiveWriter::store_tables() {
}
}

m_table_metadata_compressor.write_numeric_value(table_metadata.size());
for (auto& [file_offset, uncompressed_size] : table_metadata) {
m_table_metadata_compressor.write_numeric_value(stream_metadata.size());
for (auto& [file_offset, uncompressed_size] : stream_metadata) {
m_table_metadata_compressor.write_numeric_value(file_offset);
m_table_metadata_compressor.write_numeric_value(uncompressed_size);
}

m_table_metadata_compressor.write_numeric_value(schema_metadata.size());
for (auto& [table_id, stream_offset, schema_id, num_messages] : schema_metadata) {
m_table_metadata_compressor.write_numeric_value(table_id);
for (auto& [stream_id, stream_offset, schema_id, num_messages] : schema_metadata) {
m_table_metadata_compressor.write_numeric_value(stream_id);
m_table_metadata_compressor.write_numeric_value(stream_offset);
m_table_metadata_compressor.write_numeric_value(schema_id);
m_table_metadata_compressor.write_numeric_value(num_messages);
Expand Down
4 changes: 2 additions & 2 deletions components/core/src/clp_s/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ set(
JsonParser.cpp
JsonParser.hpp
JsonSerializer.hpp
PackedStreamReader.cpp
PackedStreamReader.hpp
ParsedMessage.hpp
ReaderUtils.cpp
ReaderUtils.hpp
Expand All @@ -78,8 +80,6 @@ set(
SchemaTree.hpp
SchemaWriter.cpp
SchemaWriter.hpp
TableReader.cpp
TableReader.hpp
TimestampDictionaryReader.cpp
TimestampDictionaryReader.hpp
TimestampDictionaryWriter.cpp
Expand Down
Loading

0 comments on commit 88337df

Please sign in to comment.