Skip to content

Commit

Permalink
[INLONG-9293][SDK] Optimize the problem that the more inlong grouids …
Browse files Browse the repository at this point in the history
…there are, the more memory is consumed. (#9295)
  • Loading branch information
doleyzi authored Nov 16, 2023
1 parent 52498f1 commit ab3f2cc
Show file tree
Hide file tree
Showing 12 changed files with 87 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ enum SdkCode {
kInvalidBid = 12,
kFailGetBufferPool = 13,
kFailGetSendBuf = 14,
kFailWriteToBuf = 15,
kMsgEmpty = 15,
kErrorCURL = 16,
kErrorParseJson = 17,
kFailGetRevGroup = 18,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,18 @@ struct SdkMsg {
std::string user_client_ip_;

std::string data_pack_format_attr_;

std::string inlong_group_id_;
std::string inlong_stream_id_;

SdkMsg(const std::string &mmsg, const std::string &mclient_ip,
int64_t mreport_time, UserCallBack mcb, const std::string &attr,
const std::string &u_ip, int64_t u_time)
const std::string &u_ip, int64_t u_time,const std::string& inlong_group_id,const std::string& inlong_stream_id)
: msg_(mmsg), client_ip_(mclient_ip), report_time_(mreport_time),
cb_(mcb), user_report_time_(u_time), user_client_ip_(u_ip),
data_pack_format_attr_(attr) {}
data_pack_format_attr_(attr),
inlong_group_id_(inlong_group_id),
inlong_stream_id_(inlong_stream_id){}
};
using SdkMsgPtr = std::shared_ptr<SdkMsg>;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ int32_t ApiImp::SendBase(const std::string inlong_group_id,
ProxyManager::GetInstance()->CheckBidConf(inlong_group_id, true);

auto recv_group =
recv_manager_->GetRecvGroup(inlong_group_id, inlong_stream_id);
recv_manager_->GetRecvGroup(inlong_group_id);
if (recv_group == nullptr) {
LOG_ERROR("fail to get recv group, inlong_group_id:"
<< inlong_group_id << " inlong_stream_id:" << inlong_stream_id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,25 +26,22 @@

namespace inlong {
const uint32_t DEFAULT_PACK_ATTR = 400;
RecvGroup::RecvGroup(const std::string &inlong_group_id,
const std::string &inlong_stream_id,
std::shared_ptr<SendManager> send_manager)
: cur_len_(0), inlong_group_id_(inlong_group_id),
inlong_stream_id_(inlong_stream_id), groupId_num_(0), streamId_num_(0),
RecvGroup::RecvGroup(const std::string &group_key,std::shared_ptr<SendManager> send_manager)
: cur_len_(0), groupId_num_(0), streamId_num_(0),
msg_type_(SdkConfig::getInstance()->msg_type_),
data_capacity_(SdkConfig::getInstance()->buf_size_),
send_manager_(send_manager) {
send_manager_(send_manager),group_key_(group_key) {
data_capacity_ = std::max(SdkConfig::getInstance()->max_msg_size_,
SdkConfig::getInstance()->pack_size_);
data_capacity_ = data_capacity_ + DEFAULT_PACK_ATTR;

pack_buf_ = new char[data_capacity_];
memset(pack_buf_, 0x0, data_capacity_);
topic_desc_ =
"groupId=" + inlong_group_id_ + "&streamId=" + inlong_stream_id_;
data_time_ = 0;
last_pack_time_ = Utils::getCurrentMsTime();
max_recv_size_ = SdkConfig::getInstance()->recv_buf_size_;

LOG_INFO("RecvGroup:"<<group_key_<<",data_capacity:"<<data_capacity_<<",max_recv_size:"<<max_recv_size_);
}

RecvGroup::~RecvGroup() {
Expand All @@ -64,23 +61,23 @@ int32_t RecvGroup::SendData(const std::string &msg, const std::string &groupId,
return SdkCode::kRecvBufferFull;
}

AddMsg(msg, client_ip, report_time, call_back);
AddMsg(msg, client_ip, report_time, call_back,groupId,streamId);

return SdkCode::kSuccess;
}

int32_t RecvGroup::DoDispatchMsg() {
last_pack_time_ = Utils::getCurrentMsTime();
std::lock_guard<std::mutex> lck(mutex_);
if (inlong_group_id_.empty()) {
if (group_key_.empty()) {
LOG_ERROR("groupId is empty, check!!");
return SdkCode::kInvalidInput;
}
if (msgs_.empty()) {
LOG_ERROR("no msg in msg_set, check!");
return SdkCode::kFailGetRevGroup;
return SdkCode::kMsgEmpty;
}
auto send_group = send_manager_->GetSendGroup(inlong_group_id_);
auto send_group = send_manager_->GetSendGroup(group_key_);
if (send_group == nullptr) {
LOG_ERROR("failed to get send_buf, something gets wrong, checkout!");
return SdkCode::kFailGetSendBuf;
Expand All @@ -93,38 +90,54 @@ int32_t RecvGroup::DoDispatchMsg() {
}

uint32_t total_length = 0;
std::vector<SdkMsgPtr> msgs_to_dispatch;
uint64_t max_tid_size = 0;
std::unordered_map<std::string, std::vector<SdkMsgPtr>> msgs_to_dispatch;
std::unordered_map<std::string, uint64_t> tid_stat;
while (!msgs_.empty()) {
SdkMsgPtr msg = msgs_.front();
if (msg->msg_.size() + total_length + constants::ATTR_LENGTH >
SdkConfig::getInstance()->pack_size_) {
break;
if (msg->msg_.size() + max_tid_size + constants::ATTR_LENGTH > SdkConfig::getInstance()->pack_size_) {
if (!msgs_to_dispatch.empty()) {
break;
}
}
msgs_to_dispatch.push_back(msg);
std::string msg_key = msg->inlong_group_id_ + msg->inlong_stream_id_;
msgs_to_dispatch[msg_key].push_back(msg);
msgs_.pop();

total_length = msg->msg_.size() + total_length + constants::ATTR_LENGTH;

if (tid_stat.find(msg_key) == tid_stat.end()) {
tid_stat[msg_key] = 0;
}
tid_stat[msg_key] = tid_stat[msg_key] + msg->msg_.size() + constants::ATTR_LENGTH;

max_tid_size = std::max(tid_stat[msg_key], max_tid_size);
}

cur_len_ = cur_len_ - total_length;

std::shared_ptr<SendBuffer> send_buffer = BuildSendBuf(msgs_to_dispatch);
for (auto it : msgs_to_dispatch) {
std::shared_ptr<SendBuffer> send_buffer = BuildSendBuf(it.second);

ResetPackBuf();
ResetPackBuf();

if (send_buffer == nullptr) {
CallbalkToUsr(msgs_to_dispatch);
return SdkCode::kSuccess;
}
if (send_buffer == nullptr) {
CallbalkToUsr(it.second);
continue;
}

int ret = send_group->PushData(send_buffer);
if (ret != SdkCode::kSuccess) {
CallbalkToUsr(msgs_to_dispatch);
int ret = send_group->PushData(send_buffer);
if (ret != SdkCode::kSuccess) {
CallbalkToUsr(it.second);
}
}

return SdkCode::kSuccess;
}

void RecvGroup::AddMsg(const std::string &msg, std::string client_ip,
int64_t report_time, UserCallBack call_back) {
int64_t report_time, UserCallBack call_back,const std::string &groupId,
const std::string &streamId) {
if (Utils::isLegalTime(report_time))
data_time_ = report_time;
else {
Expand All @@ -142,19 +155,11 @@ void RecvGroup::AddMsg(const std::string &msg, std::string client_ip,
"&__addcol2__ip=" + client_ip;
msgs_.push(std::make_shared<SdkMsg>(msg, client_ip, data_time_, call_back,
data_pack_format_attr, user_client_ip,
user_report_time));
user_report_time,groupId,streamId));

cur_len_ += msg.size() + constants::ATTR_LENGTH;
}

bool RecvGroup::ShouldPack(int32_t msg_len) {
if (0 == cur_len_ || msgs_.empty())
return false;
if (msg_len + cur_len_ > SdkConfig::getInstance()->pack_size_)
return true;
return false;
}

bool RecvGroup::PackMsg(std::vector<SdkMsgPtr> &msgs, char *pack_data,
uint32_t &out_len, uint32_t uniq_id) {
if (pack_data == nullptr) {
Expand Down Expand Up @@ -220,7 +225,8 @@ bool RecvGroup::PackMsg(std::vector<SdkMsgPtr> &msgs, char *pack_data,
streamId_num_ == 0) {
groupId_num = 0;
streamId_num = 0;
groupId_streamId_char = topic_desc_;
groupId_streamId_char = "groupId=" + msgs[0]->inlong_group_id_ +
"&streamId=" + msgs[0]->inlong_stream_id_;
char_groupId_flag = 0x4;
} else {
groupId_num = groupId_num_;
Expand All @@ -240,7 +246,8 @@ bool RecvGroup::PackMsg(std::vector<SdkMsgPtr> &msgs, char *pack_data,
"&node1ip=" + SdkConfig::getInstance()->local_ip_ +
"&rtime1=" + std::to_string(Utils::getCurrentMsTime());
} else {
attr = topic_desc_;
attr = "groupId=" + msgs[0]->inlong_group_id_ +
"&streamId=" + msgs[0]->inlong_stream_id_;
}
*(uint16_t *)bodyBegin = htons(attr.size());
bodyBegin += sizeof(uint16_t);
Expand Down Expand Up @@ -290,7 +297,9 @@ bool RecvGroup::PackMsg(std::vector<SdkMsgPtr> &msgs, char *pack_data,

// attr
std::string attr;
attr = topic_desc_;
attr = "groupId=" + msgs[0]->inlong_group_id_ +
"&streamId=" + msgs[0]->inlong_stream_id_;

attr += "&dt=" + std::to_string(data_time_);
attr += "&mid=" + std::to_string(uniq_id);
if (isSnappy)
Expand Down Expand Up @@ -354,8 +363,8 @@ RecvGroup::BuildSendBuf(std::vector<SdkMsgPtr> &msgs) {
}
send_buffer->setLen(len);
send_buffer->setMsgCnt(msg_cnt);
send_buffer->setInlongGroupId(inlong_group_id_);
send_buffer->setStreamId(inlong_stream_id_);
send_buffer->setInlongGroupId(msgs[0]->inlong_group_id_);
send_buffer->setStreamId(msgs[0]->inlong_stream_id_);
send_buffer->setUniqId(uniq_id);
send_buffer->setIsPacked(true);
for (auto it : msgs) {
Expand All @@ -368,7 +377,7 @@ RecvGroup::BuildSendBuf(std::vector<SdkMsgPtr> &msgs) {
void RecvGroup::CallbalkToUsr(std::vector<SdkMsgPtr> &msgs) {
for (auto &it : msgs) {
if (it->cb_) {
it->cb_(inlong_group_id_.data(), inlong_stream_id_.data(),
it->cb_(it->inlong_group_id_.data(), it->inlong_stream_id_.data(),
it->msg_.data(), it->msg_.size(), it->user_report_time_,
it->user_client_ip_.data());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,29 +38,26 @@ class RecvGroup {
uint32_t cur_len_;
AtomicInt pack_err_;
uint64_t data_time_;
std::string inlong_group_id_;
std::string inlong_stream_id_;
uint16_t groupId_num_;
uint16_t streamId_num_;
std::string topic_desc_;
uint32_t msg_type_;
mutable std::mutex mutex_;

std::shared_ptr<SendManager> send_manager_;
uint64_t last_pack_time_;

uint64_t max_recv_size_;
std::string group_key_;

bool ShouldPack(int32_t msg_len);
int32_t DoDispatchMsg();
void AddMsg(const std::string &msg, std::string client_ip,
int64_t report_time, UserCallBack call_back);
int64_t report_time, UserCallBack call_back,const std::string &groupId,
const std::string &streamId);
bool IsZipAndOperate(std::string &res, uint32_t real_cur_len);
inline void ResetPackBuf() { memset(pack_buf_, 0x0, data_capacity_); }

public:
RecvGroup(const std::string &groupId, const std::string &streamId,
std::shared_ptr<SendManager> send_manager);
RecvGroup(const std::string &group_key,std::shared_ptr<SendManager> send_manager);
~RecvGroup();

int32_t SendData(const std::string &msg, const std::string &groupId,
Expand All @@ -72,8 +69,6 @@ class RecvGroup {

char *data() const { return pack_buf_; }

std::string groupId() const { return inlong_group_id_; }

std::shared_ptr<SendBuffer> BuildSendBuf(std::vector<SdkMsgPtr> &msgs);
void CallbalkToUsr(std::vector<SdkMsgPtr> &msgs);
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ SendGroup::SendGroup(std::string send_group_key)
if (max_send_queue_num_ <= 0) {
max_send_queue_num_ = kDefaultQueueSize;
}
LOG_INFO("SendGroup: " << send_group_key_
<< ", max send queue num: " << max_send_queue_num_);
LOG_INFO("SendGroup:" << send_group_key_
<< ",max send queue num:" << max_send_queue_num_);
dispatch_interval_ = SdkConfig::getInstance()->dispatch_interval_send_;
load_balance_interval_ = SdkConfig::getInstance()->load_balance_interval_;
heart_beat_interval_ =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ int32_t ProxyManager::GetProxyByClusterId(const std::string &cluster_id,
proxy_info_vec = it->second;
return SdkCode::kSuccess;
}
std::string ProxyManager::GetSendGroupKey(const std::string &groupid) {
std::string ProxyManager::GetGroupKey(const std::string &groupid) {
if (SdkConfig::getInstance()->enable_isolation_) {
return groupid;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class ProxyManager {
int32_t GetProxyByGroupid(const std::string &inlong_group_id, ProxyInfoVec &proxy_info_vec);
int32_t GetProxyByClusterId(const std::string &cluster_id,
ProxyInfoVec &proxy_info_vec);
std::string GetSendGroupKey(const std::string &groupid);
std::string GetGroupKey(const std::string &groupid);
bool HasProxy(const std::string &inlong_group_id);
bool CheckGroupid(const std::string &groupid);
bool CheckClusterId(const std::string &cluster_id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,17 @@

#include "recv_manager.h"
#include "../utils/utils.h"
#include "proxy_manager.h"

namespace inlong {
RecvManager::RecvManager(std::shared_ptr<SendManager> send_manager)
: work_(asio::make_work_guard(io_context_)), send_manager_(send_manager),
exit_flag_(false) {
dispatch_interval_ = SdkConfig::getInstance()->dispatch_interval_zip_;

max_groupid_streamid_num_ = SdkConfig::getInstance()->max_group_id_num_ *
SdkConfig::getInstance()->max_stream_id_num_;
max_groupid_streamid_num_ =
std::max(SdkConfig::getInstance()->max_group_id_num_,
SdkConfig::getInstance()->max_stream_id_num_);
LOG_INFO("max_groupid_streamid_num " <<max_groupid_streamid_num_);

check_timer_ = std::make_shared<asio::steady_timer>(io_context_);
Expand Down Expand Up @@ -54,10 +56,13 @@ RecvManager::~RecvManager() {
}
}
void RecvManager::Run() { io_context_.run(); }
RecvGroupPtr RecvManager::GetRecvGroup(const std::string &groupId,
const std::string &streamId) {
RecvGroupPtr RecvManager::GetRecvGroup(const std::string &groupId) {
std::lock_guard<std::mutex> lck(mutex_);
auto it = recv_group_map_.find(groupId + streamId);
std::string group_key = ProxyManager::GetInstance()->GetGroupKey(groupId);
if (group_key.empty()) {
return nullptr;
}
auto it = recv_group_map_.find(group_key);
if (it != recv_group_map_.end()) {
return it->second;
} else {
Expand All @@ -66,8 +71,8 @@ RecvGroupPtr RecvManager::GetRecvGroup(const std::string &groupId,
}

RecvGroupPtr recv_group =
std::make_shared<RecvGroup>(groupId, streamId, send_manager_);
recv_group_map_.emplace(groupId + streamId, recv_group);
std::make_shared<RecvGroup>(group_key, send_manager_);
recv_group_map_.emplace(group_key, recv_group);
return recv_group;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class RecvManager : noncopyable {
RecvManager(std::shared_ptr<SendManager> send_manager);
~RecvManager();
void DispatchData(std::error_code error);
RecvGroupPtr GetRecvGroup(const std::string &bid, const std::string &tid);
RecvGroupPtr GetRecvGroup(const std::string &bid);
};
} // namespace inlong

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,16 @@ SendManager::SendManager() : send_group_idx_(0) {
<< SdkConfig::getInstance()->inlong_group_ids_[i]
<< " send group num:"
<< SdkConfig::getInstance()->per_groupid_thread_nums_);
std::string send_group_key = ProxyManager::GetInstance()->GetSendGroupKey(
std::string send_group_key = ProxyManager::GetInstance()->GetGroupKey(
SdkConfig::getInstance()->inlong_group_ids_[i]);
AddSendGroup(send_group_key);
}
}

SendGroupPtr SendManager::GetSendGroup(const std::string &group_id) {
std::string send_group_key =
ProxyManager::GetInstance()->GetSendGroupKey(group_id);
SendGroupPtr send_group_ptr = DoGetSendGroup(send_group_key);
SendGroupPtr SendManager::GetSendGroup(const std::string &group_key) {
SendGroupPtr send_group_ptr = DoGetSendGroup(group_key);
if (send_group_ptr == nullptr) {
AddSendGroup(send_group_key);
AddSendGroup(group_key);
}
return send_group_ptr;
}
Expand Down
Loading

0 comments on commit ab3f2cc

Please sign in to comment.