Skip to content

Commit

Permalink
[INLONG-9378][SDK] Optimize proxy configuration update (#9381)
Browse files Browse the repository at this point in the history
  • Loading branch information
doleyzi authored Dec 1, 2023
1 parent 94ce7a5 commit 912dce3
Show file tree
Hide file tree
Showing 3 changed files with 140 additions and 77 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <rapidjson/document.h>

namespace inlong {
const uint64_t MINUTE = 60000;
ProxyManager *ProxyManager::instance_ = new ProxyManager();
ProxyManager::~ProxyManager() {
if (update_conf_thread_.joinable()) {
Expand All @@ -41,6 +42,7 @@ ProxyManager::~ProxyManager() {
}
void ProxyManager::Init() {
timeout_ = SdkConfig::getInstance()->manager_url_timeout_;
last_update_time_ = Utils::getCurrentMsTime();
if (__sync_bool_compare_and_swap(&inited_, false, true)) {
update_conf_thread_ = std::thread(&ProxyManager::Update, this);
}
Expand All @@ -65,8 +67,11 @@ void ProxyManager::Update() {
LOG_INFO("proxylist DoUpdate thread exit");
}
void ProxyManager::DoUpdate() {
update_mutex_.try_lock();
LOG_INFO("start ProxyManager DoUpdate.");
if (!update_mutex_.try_lock()) {
LOG_INFO("DoUpdate try_lock. " << getpid());
return;
}

std::srand(unsigned(std::time(nullptr)));

Expand All @@ -76,80 +81,30 @@ void ProxyManager::DoUpdate() {
return;
}

{
unique_read_lock<read_write_mutex> rdlck(groupid_2_cluster_id_rwmutex_);
for (auto &groupid2cluster : groupid_2_cluster_id_map_) {
std::string url;
if (SdkConfig::getInstance()->enable_manager_url_from_cluster_)
url = SdkConfig::getInstance()->manager_cluster_url_;
else {
url = SdkConfig::getInstance()->manager_url_ + "/" +
groupid2cluster.first;
}
std::string post_data = "ip=" + SdkConfig::getInstance()->local_ip_ +
"&version=" + constants::kVersion +
"&protocolType=" + constants::kProtocolType;
LOG_WARN("get inlong_group_id:" << groupid2cluster.first.c_str()
<< "proxy cfg url " << url.c_str()
<< "post_data:" << post_data.c_str());

std::string meta_data;
int32_t ret;
std::string urlByDNS;
for (int i = 0; i < constants::kMaxRequestTDMTimes; i++) {
HttpRequest request = {url,
timeout_,
SdkConfig::getInstance()->need_auth_,
SdkConfig::getInstance()->auth_id_,
SdkConfig::getInstance()->auth_key_,
post_data};
ret = Utils::requestUrl(meta_data, &request);
if (!ret) {
break;
} // request success
}

if (ret != SdkCode::kSuccess) {
if (groupid_2_proxy_map_.find(groupid2cluster.first) != groupid_2_proxy_map_.end()) {
LOG_WARN("failed to request from manager, use previous " << groupid2cluster.first);
continue;
}
if (!SdkConfig::getInstance()->enable_local_cache_) {
LOG_WARN("failed to request from manager, forbid local cache!");
continue;
}
meta_data = RecoverFromLocalCache(groupid2cluster.first);
if (meta_data.empty()) {
LOG_WARN("local cache is empty!");
continue;
}
}
int retry = constants::MAX_RETRY;
do {
std::unordered_map<std::string, std::string> bid_2_cluster_id =
BuildGroupId2ClusterId();

ProxyInfoVec proxyInfoVec;
ret = ParseAndGet(groupid2cluster.first, meta_data, proxyInfoVec);
if (ret != SdkCode::kSuccess) {
LOG_ERROR("failed to parse groupid:%s json proxy list "
<< groupid2cluster.first.c_str());
continue;
}
if (!proxyInfoVec.empty()) {
unique_write_lock<read_write_mutex> wtlck(groupid_2_proxy_map_rwmutex_);
groupid_2_proxy_map_[groupid2cluster.first] = proxyInfoVec;
cache_proxy_info_[groupid2cluster.first] = meta_data;
LOG_INFO("groupid:" << groupid2cluster.first << " success update "
<< proxyInfoVec.size() << " proxy-ip.");
}
}
}
UpdateProxy(bid_2_cluster_id);

UpdateGroupid2ClusterIdMap();
UpdateGroupid2ClusterIdMap();

UpdateClusterId2ProxyMap();
UpdateClusterId2ProxyMap();
uint64_t id_count = GetGroupIdCount();
if (bid_2_cluster_id.size() == id_count) {
break;
}
LOG_INFO("retry DoUpdate!. update size:" << bid_2_cluster_id.size()
<< " != latest size:" << id_count);
} while (retry--);

if (SdkConfig::getInstance()->enable_local_cache_) {
WriteLocalCache();
}

last_update_time_ = Utils::getCurrentMsTime();

update_mutex_.unlock();
LOG_INFO("finish ProxyManager DoUpdate.");
}
Expand Down Expand Up @@ -254,7 +209,8 @@ int32_t ProxyManager::ParseAndGet(const std::string &inlong_group_id,

int32_t ProxyManager::GetProxy(const std::string &key,
ProxyInfoVec &proxy_info_vec) {
if (constants::IsolationLevel::kLevelOne == SdkConfig::getInstance()->isolation_level_) {
if (constants::IsolationLevel::kLevelOne ==
SdkConfig::getInstance()->isolation_level_) {
return GetProxyByGroupid(key, proxy_info_vec);
}
return GetProxyByClusterId(key, proxy_info_vec);
Expand Down Expand Up @@ -360,8 +316,9 @@ void ProxyManager::UpdateGroupid2ClusterIdMap() {
}
}

void ProxyManager::BuildLocalCache(std::ofstream &file, int32_t groupid_index, const std::string &groupid,
const std::string &meta_data) {
void ProxyManager::BuildLocalCache(std::ofstream &file, int32_t groupid_index,
const std::string &groupid,
const std::string &meta_data) {
file << "[groupid" << groupid_index << "]" << std::endl;
file << "groupid=" << groupid << std::endl;
file << "proxy_cfg=" << meta_data << std::endl;
Expand Down Expand Up @@ -393,7 +350,7 @@ void ProxyManager::ReadLocalCache() {
LOG_INFO("read cache file, id:" << groupid << ", local config:" << proxy);
cache_proxy_info_[groupid] = proxy;
}
}catch (...){
} catch (...) {
LOG_ERROR("ReadLocalCache error!");
}
}
Expand Down Expand Up @@ -441,4 +398,102 @@ std::string ProxyManager::GetClusterID(const std::string &groupid) {
}
return it->second;
}

void ProxyManager::UpdateProxy(
std::unordered_map<std::string, std::string> &group_id_2_cluster_id) {
for (auto &groupid2cluster : group_id_2_cluster_id) {
if (SkipUpdate(groupid2cluster.first)) {
LOG_WARN("SkipUpdate group_id:" << groupid2cluster.first);
continue;
}
std::string url;
if (SdkConfig::getInstance()->enable_manager_url_from_cluster_)
url = SdkConfig::getInstance()->manager_cluster_url_;
else {
url =
SdkConfig::getInstance()->manager_url_ + "/" + groupid2cluster.first;
}
std::string post_data = "ip=" + SdkConfig::getInstance()->local_ip_ +
"&version=" + constants::kVersion +
"&protocolType=" + constants::kProtocolType;
LOG_WARN("get inlong_group_id:" << groupid2cluster.first.c_str()
<< "proxy cfg url " << url.c_str()
<< "post_data:" << post_data.c_str());

std::string meta_data;
int32_t ret;
std::string urlByDNS;
for (int i = 0; i < constants::kMaxRequestTDMTimes; i++) {
HttpRequest request = {url,
timeout_,
SdkConfig::getInstance()->need_auth_,
SdkConfig::getInstance()->auth_id_,
SdkConfig::getInstance()->auth_key_,
post_data};
ret = Utils::requestUrl(meta_data, &request);
if (!ret) {
break;
} // request success
}

if (ret != SdkCode::kSuccess) {
if (groupid_2_proxy_map_.find(groupid2cluster.first) !=
groupid_2_proxy_map_.end()) {
LOG_WARN("failed to request from manager, use previous "
<< groupid2cluster.first);
continue;
}
if (!SdkConfig::getInstance()->enable_local_cache_) {
LOG_WARN("failed to request from manager, forbid local cache!");
continue;
}
meta_data = RecoverFromLocalCache(groupid2cluster.first);
if (meta_data.empty()) {
LOG_WARN("local cache is empty!");
continue;
}
}

ProxyInfoVec proxyInfoVec;
ret = ParseAndGet(groupid2cluster.first, meta_data, proxyInfoVec);
if (ret != SdkCode::kSuccess) {
LOG_ERROR("failed to parse groupid:%s json proxy list "
<< groupid2cluster.first.c_str());
continue;
}
if (!proxyInfoVec.empty()) {
unique_write_lock<read_write_mutex> wtlck(groupid_2_proxy_map_rwmutex_);
groupid_2_proxy_map_[groupid2cluster.first] = proxyInfoVec;
cache_proxy_info_[groupid2cluster.first] = meta_data;
LOG_INFO("groupid:" << groupid2cluster.first << " success update "
<< proxyInfoVec.size() << " proxy-ip.");
}
}
}
std::unordered_map<std::string, std::string>
ProxyManager::BuildGroupId2ClusterId() {
std::unordered_map<std::string, std::string> bid_2_cluster_id_map_tmp;
unique_read_lock<read_write_mutex> rdlck(groupid_2_cluster_id_rwmutex_);
for (auto &bid2cluster : groupid_2_cluster_id_map_) {
bid_2_cluster_id_map_tmp.insert(bid2cluster);
}
return bid_2_cluster_id_map_tmp;
}

uint64_t ProxyManager::GetGroupIdCount() {
unique_read_lock<read_write_mutex> rdlck(groupid_2_cluster_id_rwmutex_);
return groupid_2_cluster_id_map_.size();
}

bool ProxyManager::SkipUpdate(const std::string &group_id) {
uint64_t current_time = Utils::getCurrentMsTime();
uint64_t diff = current_time - last_update_time_;
uint64_t threshold =
SdkConfig::getInstance()->manager_update_interval_ * MINUTE;
bool ret = CheckGroupid(group_id);
if (diff < threshold && ret) {
return true;
}
return false;
}
} // namespace inlong
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ class ProxyManager {
std::thread update_conf_thread_;
volatile bool inited_ = false;
std::unordered_map<std::string, std::string> cache_proxy_info_;
uint64_t last_update_time_;

int32_t ParseAndGet(const std::string &key, const std::string &meta_data,
ProxyInfoVec &proxy_info_vec);
Expand All @@ -64,7 +65,8 @@ class ProxyManager {
void DoUpdate();
void Init();
int32_t GetProxy(const std::string &groupid, ProxyInfoVec &proxy_info_vec);
int32_t GetProxyByGroupid(const std::string &inlong_group_id, ProxyInfoVec &proxy_info_vec);
int32_t GetProxyByGroupid(const std::string &inlong_group_id,
ProxyInfoVec &proxy_info_vec);
int32_t GetProxyByClusterId(const std::string &cluster_id,
ProxyInfoVec &proxy_info_vec);
std::string GetGroupKey(const std::string &groupid);
Expand All @@ -73,11 +75,18 @@ class ProxyManager {
bool CheckClusterId(const std::string &cluster_id);
void UpdateClusterId2ProxyMap();
void UpdateGroupid2ClusterIdMap();
void BuildLocalCache(std::ofstream &file, int32_t groupid_index, const std::string &groupid, const std::string &meta_data);
void BuildLocalCache(std::ofstream &file, int32_t groupid_index,
const std::string &groupid,
const std::string &meta_data);
void ReadLocalCache();
void WriteLocalCache();
std::string RecoverFromLocalCache(const std::string&groupid);
std::string RecoverFromLocalCache(const std::string &groupid);
std::string GetClusterID(const std::string &groupid);
void UpdateProxy(
std::unordered_map<std::string, std::string> &group_id_2_cluster_id);
std::unordered_map<std::string, std::string> BuildGroupId2ClusterId();
uint64_t GetGroupIdCount();
bool SkipUpdate(const std::string &group_id);
};
} // namespace inlong

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,7 @@

namespace inlong {
namespace constants {
enum IsolationLevel{
kLevelOne =1, kLevelSecond =2, kLevelThird =3
};
enum IsolationLevel { kLevelOne = 1, kLevelSecond = 2, kLevelThird = 3 };
static const int32_t kMaxRequestTDMTimes = 4;
static const char kAttrFormat[] =
"__addcol1__reptime=yyyymmddHHMMSS&__addcol2_ip=xxx.xxx.xxx.xxx";
Expand Down Expand Up @@ -107,6 +105,7 @@ static const int32_t kWeight[30] = {1, 1, 1, 1, 1, 2, 2, 2, 2, 2,

static const char kCacheFile[] = ".proxy_list.ini";
static const char kCacheTmpFile[] = ".proxy_list.ini.tmp";
const int MAX_RETRY = 10;

} // namespace constants
} // namespace inlong
Expand Down

0 comments on commit 912dce3

Please sign in to comment.