Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Experiment: avoid copying data back to write buffer from pending data #768

Draft
wants to merge 2 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/Storages/ExternalStream/ExternalStreamCounter.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ class ExternalStreamCounter
inline uint64_t getWriteFailed() const { return write_failed.load(); }
inline uint64_t getMessageSizeLimit() const { return messages_by_row.load(); }
inline uint64_t getMessageRowsLimit() const { return messages_by_row.load(); }
inline uint64_t getOversizedMessageCount() const { return oversized_message_count.load(); }

inline void addToReadBytes(uint64_t bytes) { read_bytes.fetch_add(bytes); }
inline void addToReadCounts(uint64_t counts) { read_counts.fetch_add(counts); }
Expand All @@ -25,6 +26,7 @@ class ExternalStreamCounter
inline void addToWriteFailed(uint64_t amount) { write_failed.fetch_add(amount); }
inline void addToMessagesBySize(uint64_t counts) { messages_by_size.fetch_add(counts); }
inline void addToMessagesByRow(uint64_t counts) { messages_by_row.fetch_add(counts); }
inline void addToOversizedMessageCount(uint64_t counts) { oversized_message_count.fetch_add(counts); }

std::map<String, uint64_t> getCounters() const
{
Expand All @@ -37,6 +39,7 @@ class ExternalStreamCounter
{"WriteFailed", write_failed.load()},
{"MessagesBySize", messages_by_size.load()},
{"MessagesByRow", messages_by_row.load()},
{"OversizedMessageCount", oversized_message_count.load()},
};
}

Expand All @@ -51,6 +54,8 @@ class ExternalStreamCounter
std::atomic<uint64_t> messages_by_size;
/// Number of Kafka messages generated by reaching the `kafka_max_message_rows` limit.
std::atomic<uint64_t> messages_by_row;
/// Number of messages whose size exceeds the `kafka_max_message_size` limit.
std::atomic<uint64_t> oversized_message_count;
};

using ExternalStreamCounterPtr = std::shared_ptr<ExternalStreamCounter>;
Expand Down
66 changes: 47 additions & 19 deletions src/Storages/ExternalStream/Kafka/KafkaSink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,8 @@ KafkaSink::KafkaSink(
{
/// If the buffer_size (kafka_max_message_size) is reached, the buffer will be forced to flush.
wb = std::make_unique<WriteBufferFromKafkaSink>(
[this](char * pos, size_t len, size_t total_len) { addMessageToBatch(pos, len, total_len); },
[this]() { tryCarryOverPendingData(); },
/*on_next=*/ [this](char * pos, size_t len, size_t total_len) { addMessageToBatch(pos, len, total_len); },
/*after_next=*/ [this]() { tryCarryOverPendingData(); },
/*buffer_size=*/ context->getSettingsRef().kafka_max_message_size.value);

const auto & data_format = kafka.dataFormat();
Expand Down Expand Up @@ -235,20 +235,29 @@ KafkaSink::KafkaSink(
/// However, it's still possible that, one single row is still too big and it exceeds that limit. There is nothing we can do about it for now.
void KafkaSink::addMessageToBatch(char * pos, size_t len, size_t total_len)
{
auto pending_size = pending_data.size();
auto pending_size = pending_data.offset();

/// There are complete data to consume.
if (len > 0)
{
StringRef key = message_key_expr ? keys_for_current_batch[current_batch_row++] : "";

nlog::ByteVector payload;

/// Data at pos (which is in the WriteBuffer) will be overwritten, thus it must be kept somewhere else (in `batch_payload`).
auto msg_size = pending_size + len;
nlog::ByteVector payload{msg_size};
if (!oversized_payload.empty()) [[unlikely]]
{
msg_size += oversized_payload.size();
payload.swap(oversized_payload);
external_stream_counter->addToOversizedMessageCount(1);
}
payload.resize(msg_size); /// set the size to the right value

if (pending_size)
memcpy(payload.data(), pending_data.data(), pending_size);
memcpy(payload.data() + pending_size, pos, len);
memcpy(payload.data() + (msg_size - len - pending_size), pending_data.buffer().begin(), pending_size);

memcpy(payload.data() + (msg_size - len) + pending_size, pos, len);

current_batch.push_back(rd_kafka_message_t{
.partition = next_partition,
Expand All @@ -262,7 +271,10 @@ void KafkaSink::addMessageToBatch(char * pos, size_t len, size_t total_len)
batch_payload.push_back(std::move(payload));
++state.outstandings;

pending_data.resize(0);
if (len < total_len)
external_stream_counter->addToMessagesBySize(1);

pending_data.next(); /// reset pending_data
pending_size = 0;
rows_in_current_message = 0;
}
Expand All @@ -271,24 +283,40 @@ void KafkaSink::addMessageToBatch(char * pos, size_t len, size_t total_len)
/// Nothing left
return;

/// There are some remaining incomplete data, copy them to pending_data.
/// There are some remaining incomplete data, a few scenarios need to handle
auto remaining = total_len - len;
pending_data.resize(pending_size + remaining);
memcpy(pending_data.data() + pending_size, pos + len, remaining);
auto * remaining_pos = pos + len;

/// Scenario 1 - There is buffered oversized data, then it should append the remaining data to it.
if (!oversized_payload.empty()) [[unlikely]]
{
auto n = oversized_payload.size();
oversized_payload.resize(n + remaining);
memcpy(oversized_payload.data() + n, remaining_pos, remaining);
return;
}

/// Scenario 2 - No buffered oversized data, but pending data will be full after appending the remaining data.
/// In this case, we move all the data to oversized_payload, so that we don't need to resize pending_data.
/// This is important because keeping pending_data have the same size as `wb` allows us to swap them
/// instead of copying data from pending_data to `wb` again.
if (pending_size + remaining >= pending_data.available()) [[unlikely]]
{
oversized_payload.resize(pending_size + remaining);
memcpy(oversized_payload.data(), pending_data.buffer().begin(), pending_size);
memcpy(oversized_payload.data() + pending_size, remaining_pos, remaining);
pending_data.next(); /// reset pending_data
return;
}

external_stream_counter->addToMessagesBySize(1);
/// Scenario 3 - No buffered oversized data, and the remaining can fit into pending_data.
pending_data.write(remaining_pos, remaining);
}

void KafkaSink::tryCarryOverPendingData()
{
/// If there are pending data and it can be fit into the buffer, then write the data back to the buffer,
/// so that we can use the buffer to limit the message size.
/// If the pending data are too big, that means we get a over-size row.
if (!pending_data.empty() && pending_data.size() < wb->available())
{
wb->write(pending_data.data(), pending_data.size());
pending_data.resize(0);
}
if (pending_data.offset())
wb->swap(pending_data);
}

void KafkaSink::consume(Chunk chunk)
Expand Down
4 changes: 3 additions & 1 deletion src/Storages/ExternalStream/Kafka/KafkaSink.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <Storages/ExternalStream/Kafka/Kafka.h>
#include <Storages/ExternalStream/Kafka/WriteBufferFromKafkaSink.h>
#include <Common/ThreadPool.h>
#include <IO/NullWriteBuffer.h>

namespace DB
{
Expand Down Expand Up @@ -114,7 +115,8 @@ class KafkaSink final : public SinkToStorage

/// For constructing the message batch
UInt64 rows_in_current_message{0};
nlog::ByteVector pending_data;
NullWriteBuffer pending_data;
nlog::ByteVector oversized_payload;
std::vector<rd_kafka_message_t> current_batch;
std::vector<nlog::ByteVector> batch_payload;
std::vector<StringRef> keys_for_current_batch;
Expand Down