Skip to content

Commit

Permalink
clp-s: Add support for chunking output into different files during ti…
Browse files Browse the repository at this point in the history
…mestamp-ordered decompression (#451)

Co-authored-by: wraymo <37269683+wraymo@users.noreply.github.com>
  • Loading branch information
gibber9809 and wraymo authored Jun 25, 2024
1 parent 94509a8 commit 01d5737
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 12 deletions.
13 changes: 13 additions & 0 deletions components/core/src/clp_s/CommandLineArguments.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -287,11 +287,19 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) {
extraction_options.add(input_options);

po::options_description decompression_options("Decompression Options");
// clang-format off
decompression_options.add_options()(
"ordered",
po::bool_switch(&m_ordered_decompression),
"Enable decompression in ascending timestamp order for this archive"
)(
"ordered-chunk-size",
po::value<size_t>(&m_ordered_chunk_size)
->default_value(m_ordered_chunk_size),
"Number of records to include in each output file when decompressing records "
"in ascending timestamp order"
);
// clang-format on
extraction_options.add(decompression_options);

po::positional_options_description positional_options;
Expand Down Expand Up @@ -336,6 +344,11 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) {
if (m_output_dir.empty()) {
throw std::invalid_argument("No output directory specified");
}

if (0 != m_ordered_chunk_size && false == m_ordered_decompression) {
throw std::invalid_argument("ordered-chunk-size must be used with ordered argument"
);
}
} else if ((char)Command::Search == command_input) {
std::string archives_dir;
std::string query;
Expand Down
3 changes: 3 additions & 0 deletions components/core/src/clp_s/CommandLineArguments.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ class CommandLineArguments {

bool get_ordered_decompression() const { return m_ordered_decompression; }

size_t get_ordered_chunk_size() const { return m_ordered_chunk_size; }

private:
// Methods
/**
Expand Down Expand Up @@ -170,6 +172,7 @@ class CommandLineArguments {
size_t m_max_document_size{512ULL * 1024 * 1024}; // 512 MB
bool m_structurize_arrays{false};
bool m_ordered_decompression{false};
size_t m_ordered_chunk_size{0};

// Metadata db variables
std::optional<clp::GlobalMetadataDBConfig> m_metadata_db_config;
Expand Down
66 changes: 56 additions & 10 deletions components/core/src/clp_s/JsonConstructor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ JsonConstructor::JsonConstructor(JsonConstructorOption const& option)
: m_output_dir(option.output_dir),
m_archives_dir(option.archives_dir),
m_ordered{option.ordered},
m_archive_id(option.archive_id) {
m_archive_id(option.archive_id),
m_ordered_chunk_size(option.ordered_chunk_size) {
std::error_code error_code;
if (false == std::filesystem::create_directory(option.output_dir, error_code) && error_code) {
throw OperationFailed(
Expand Down Expand Up @@ -44,24 +45,25 @@ JsonConstructor::JsonConstructor(JsonConstructorOption const& option)
}

void JsonConstructor::store() {
FileWriter writer;
// TODO: change this when doing chunking
writer.open(m_output_dir + "/original", FileWriter::OpenMode::CreateIfNonexistentForAppending);

m_archive_reader = std::make_unique<ArchiveReader>();
m_archive_reader->open(m_archives_dir, m_archive_id);
m_archive_reader->read_dictionaries_and_metadata();
if (false == m_ordered) {
FileWriter writer;
writer.open(
m_output_dir + "/original",
FileWriter::OpenMode::CreateIfNonexistentForAppending
);
m_archive_reader->store(writer);

writer.close();
} else {
construct_in_order(writer);
construct_in_order();
}
m_archive_reader->close();

writer.close();
}

void JsonConstructor::construct_in_order(FileWriter& writer) {
void JsonConstructor::construct_in_order() {
std::string buffer;
auto tables = m_archive_reader->read_all_tables();
using ReaderPointer = std::shared_ptr<SchemaReader>;
Expand All @@ -72,15 +74,59 @@ void JsonConstructor::construct_in_order(FileWriter& writer) {
// Clear tables vector so that memory gets deallocated after we have marshalled all records for
// a given table
tables.clear();

epochtime_t first_timestamp{0};
epochtime_t last_timestamp{0};
size_t num_records_marshalled{0};
auto src_path = std::filesystem::path(m_output_dir) / m_archive_id;
FileWriter writer;
writer.open(src_path, FileWriter::OpenMode::CreateForWriting);

auto finalize_chunk = [&](bool open_new_writer) {
writer.close();
std::string new_file_name = src_path.string() + "_" + std::to_string(first_timestamp) + "_"
+ std::to_string(last_timestamp) + ".jsonl";
auto new_file_path = std::filesystem::path(new_file_name);
std::error_code ec;
std::filesystem::rename(src_path, new_file_path, ec);
if (ec) {
throw OperationFailed(ErrorCodeFailure, __FILE__, __LINE__, ec.message());
}

if (open_new_writer) {
writer.open(src_path, FileWriter::OpenMode::CreateForWriting);
}
};

while (false == record_queue.empty()) {
ReaderPointer next = record_queue.top();
record_queue.pop();
last_timestamp = next->get_next_timestamp();
if (0 == num_records_marshalled) {
first_timestamp = last_timestamp;
}
next->get_next_message(buffer);
if (false == next->done()) {
record_queue.emplace(std::move(next));
}
writer.write(buffer.c_str(), buffer.length());
num_records_marshalled += 1;

if (0 != m_ordered_chunk_size && num_records_marshalled >= m_ordered_chunk_size) {
finalize_chunk(true);
num_records_marshalled = 0;
}
}

if (num_records_marshalled > 0) {
finalize_chunk(false);
} else {
writer.close();
std::error_code ec;
std::filesystem::remove(src_path, ec);
if (ec) {
throw OperationFailed(ErrorCodeFailure, __FILE__, __LINE__, ec.message());
}
}
writer.close();
}
} // namespace clp_s
5 changes: 3 additions & 2 deletions components/core/src/clp_s/JsonConstructor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ struct JsonConstructorOption {
std::string archive_id;
std::string output_dir;
bool ordered;
size_t ordered_chunk_size;
};

class JsonConstructor {
Expand Down Expand Up @@ -55,14 +56,14 @@ class JsonConstructor {
/**
* Reads all of the tables from m_archive_reader and writes all of the records
* they contain to writer in timestamp order.
* @param writer
*/
void construct_in_order(FileWriter& writer);
void construct_in_order();

std::string m_archives_dir;
std::string m_archive_id;
std::string m_output_dir;
bool m_ordered{false};
size_t m_ordered_chunk_size{0};

std::unique_ptr<ArchiveReader> m_archive_reader;
};
Expand Down
1 change: 1 addition & 0 deletions components/core/src/clp_s/clp-s.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ int main(int argc, char const* argv[]) {
option.output_dir = command_line_arguments.get_output_dir();
option.ordered = command_line_arguments.get_ordered_decompression();
option.archives_dir = archives_dir;
option.ordered_chunk_size = command_line_arguments.get_ordered_chunk_size();
try {
auto const& archive_id = command_line_arguments.get_archive_id();
if (false == archive_id.empty()) {
Expand Down

0 comments on commit 01d5737

Please sign in to comment.