diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc index 8c2dfc4c1e2..e3771db99e4 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc @@ -27,6 +27,7 @@ #include namespace inlong { +const uint64_t MINUTE = 60000; ProxyManager *ProxyManager::instance_ = new ProxyManager(); ProxyManager::~ProxyManager() { if (update_conf_thread_.joinable()) { @@ -41,6 +42,7 @@ ProxyManager::~ProxyManager() { } void ProxyManager::Init() { timeout_ = SdkConfig::getInstance()->manager_url_timeout_; + last_update_time_ = Utils::getCurrentMsTime(); if (__sync_bool_compare_and_swap(&inited_, false, true)) { update_conf_thread_ = std::thread(&ProxyManager::Update, this); } @@ -65,8 +67,11 @@ void ProxyManager::Update() { LOG_INFO("proxylist DoUpdate thread exit"); } void ProxyManager::DoUpdate() { - update_mutex_.try_lock(); LOG_INFO("start ProxyManager DoUpdate."); + if (!update_mutex_.try_lock()) { + LOG_INFO("DoUpdate try_lock. " << getpid()); + return; + } std::srand(unsigned(std::time(nullptr))); @@ -76,80 +81,30 @@ void ProxyManager::DoUpdate() { return; } - { - unique_read_lock rdlck(groupid_2_cluster_id_rwmutex_); - for (auto &groupid2cluster : groupid_2_cluster_id_map_) { - std::string url; - if (SdkConfig::getInstance()->enable_manager_url_from_cluster_) - url = SdkConfig::getInstance()->manager_cluster_url_; - else { - url = SdkConfig::getInstance()->manager_url_ + "/" + - groupid2cluster.first; - } - std::string post_data = "ip=" + SdkConfig::getInstance()->local_ip_ + - "&version=" + constants::kVersion + - "&protocolType=" + constants::kProtocolType; - LOG_WARN("get inlong_group_id:" << groupid2cluster.first.c_str() - << "proxy cfg url " << url.c_str() - << "post_data:" << post_data.c_str()); - - std::string meta_data; - int32_t ret; - std::string urlByDNS; - for (int i = 0; i < constants::kMaxRequestTDMTimes; i++) { - HttpRequest request = {url, - timeout_, - SdkConfig::getInstance()->need_auth_, - SdkConfig::getInstance()->auth_id_, - SdkConfig::getInstance()->auth_key_, - post_data}; - ret = Utils::requestUrl(meta_data, &request); - if (!ret) { - break; - } // request success - } - - if (ret != SdkCode::kSuccess) { - if (groupid_2_proxy_map_.find(groupid2cluster.first) != groupid_2_proxy_map_.end()) { - LOG_WARN("failed to request from manager, use previous " << groupid2cluster.first); - continue; - } - if (!SdkConfig::getInstance()->enable_local_cache_) { - LOG_WARN("failed to request from manager, forbid local cache!"); - continue; - } - meta_data = RecoverFromLocalCache(groupid2cluster.first); - if (meta_data.empty()) { - LOG_WARN("local cache is empty!"); - continue; - } - } + int retry = constants::MAX_RETRY; + do { + std::unordered_map bid_2_cluster_id = + BuildGroupId2ClusterId(); - ProxyInfoVec proxyInfoVec; - ret = ParseAndGet(groupid2cluster.first, meta_data, proxyInfoVec); - if (ret != SdkCode::kSuccess) { - LOG_ERROR("failed to parse groupid:%s json proxy list " - << groupid2cluster.first.c_str()); - continue; - } - if (!proxyInfoVec.empty()) { - unique_write_lock wtlck(groupid_2_proxy_map_rwmutex_); - groupid_2_proxy_map_[groupid2cluster.first] = proxyInfoVec; - cache_proxy_info_[groupid2cluster.first] = meta_data; - LOG_INFO("groupid:" << groupid2cluster.first << " success update " - << proxyInfoVec.size() << " proxy-ip."); - } - } - } + UpdateProxy(bid_2_cluster_id); - UpdateGroupid2ClusterIdMap(); + UpdateGroupid2ClusterIdMap(); - UpdateClusterId2ProxyMap(); + UpdateClusterId2ProxyMap(); + uint64_t id_count = GetGroupIdCount(); + if (bid_2_cluster_id.size() == id_count) { + break; + } + LOG_INFO("retry DoUpdate!. update size:" << bid_2_cluster_id.size() + << " != latest size:" << id_count); + } while (retry--); if (SdkConfig::getInstance()->enable_local_cache_) { WriteLocalCache(); } + last_update_time_ = Utils::getCurrentMsTime(); + update_mutex_.unlock(); LOG_INFO("finish ProxyManager DoUpdate."); } @@ -254,7 +209,8 @@ int32_t ProxyManager::ParseAndGet(const std::string &inlong_group_id, int32_t ProxyManager::GetProxy(const std::string &key, ProxyInfoVec &proxy_info_vec) { - if (constants::IsolationLevel::kLevelOne == SdkConfig::getInstance()->isolation_level_) { + if (constants::IsolationLevel::kLevelOne == + SdkConfig::getInstance()->isolation_level_) { return GetProxyByGroupid(key, proxy_info_vec); } return GetProxyByClusterId(key, proxy_info_vec); @@ -360,8 +316,9 @@ void ProxyManager::UpdateGroupid2ClusterIdMap() { } } -void ProxyManager::BuildLocalCache(std::ofstream &file, int32_t groupid_index, const std::string &groupid, - const std::string &meta_data) { +void ProxyManager::BuildLocalCache(std::ofstream &file, int32_t groupid_index, + const std::string &groupid, + const std::string &meta_data) { file << "[groupid" << groupid_index << "]" << std::endl; file << "groupid=" << groupid << std::endl; file << "proxy_cfg=" << meta_data << std::endl; @@ -393,7 +350,7 @@ void ProxyManager::ReadLocalCache() { LOG_INFO("read cache file, id:" << groupid << ", local config:" << proxy); cache_proxy_info_[groupid] = proxy; } - }catch (...){ + } catch (...) { LOG_ERROR("ReadLocalCache error!"); } } @@ -441,4 +398,102 @@ std::string ProxyManager::GetClusterID(const std::string &groupid) { } return it->second; } + +void ProxyManager::UpdateProxy( + std::unordered_map &group_id_2_cluster_id) { + for (auto &groupid2cluster : group_id_2_cluster_id) { + if (SkipUpdate(groupid2cluster.first)) { + LOG_WARN("SkipUpdate group_id:" << groupid2cluster.first); + continue; + } + std::string url; + if (SdkConfig::getInstance()->enable_manager_url_from_cluster_) + url = SdkConfig::getInstance()->manager_cluster_url_; + else { + url = + SdkConfig::getInstance()->manager_url_ + "/" + groupid2cluster.first; + } + std::string post_data = "ip=" + SdkConfig::getInstance()->local_ip_ + + "&version=" + constants::kVersion + + "&protocolType=" + constants::kProtocolType; + LOG_WARN("get inlong_group_id:" << groupid2cluster.first.c_str() + << "proxy cfg url " << url.c_str() + << "post_data:" << post_data.c_str()); + + std::string meta_data; + int32_t ret; + std::string urlByDNS; + for (int i = 0; i < constants::kMaxRequestTDMTimes; i++) { + HttpRequest request = {url, + timeout_, + SdkConfig::getInstance()->need_auth_, + SdkConfig::getInstance()->auth_id_, + SdkConfig::getInstance()->auth_key_, + post_data}; + ret = Utils::requestUrl(meta_data, &request); + if (!ret) { + break; + } // request success + } + + if (ret != SdkCode::kSuccess) { + if (groupid_2_proxy_map_.find(groupid2cluster.first) != + groupid_2_proxy_map_.end()) { + LOG_WARN("failed to request from manager, use previous " + << groupid2cluster.first); + continue; + } + if (!SdkConfig::getInstance()->enable_local_cache_) { + LOG_WARN("failed to request from manager, forbid local cache!"); + continue; + } + meta_data = RecoverFromLocalCache(groupid2cluster.first); + if (meta_data.empty()) { + LOG_WARN("local cache is empty!"); + continue; + } + } + + ProxyInfoVec proxyInfoVec; + ret = ParseAndGet(groupid2cluster.first, meta_data, proxyInfoVec); + if (ret != SdkCode::kSuccess) { + LOG_ERROR("failed to parse groupid:%s json proxy list " + << groupid2cluster.first.c_str()); + continue; + } + if (!proxyInfoVec.empty()) { + unique_write_lock wtlck(groupid_2_proxy_map_rwmutex_); + groupid_2_proxy_map_[groupid2cluster.first] = proxyInfoVec; + cache_proxy_info_[groupid2cluster.first] = meta_data; + LOG_INFO("groupid:" << groupid2cluster.first << " success update " + << proxyInfoVec.size() << " proxy-ip."); + } + } +} +std::unordered_map +ProxyManager::BuildGroupId2ClusterId() { + std::unordered_map bid_2_cluster_id_map_tmp; + unique_read_lock rdlck(groupid_2_cluster_id_rwmutex_); + for (auto &bid2cluster : groupid_2_cluster_id_map_) { + bid_2_cluster_id_map_tmp.insert(bid2cluster); + } + return bid_2_cluster_id_map_tmp; +} + +uint64_t ProxyManager::GetGroupIdCount() { + unique_read_lock rdlck(groupid_2_cluster_id_rwmutex_); + return groupid_2_cluster_id_map_.size(); +} + +bool ProxyManager::SkipUpdate(const std::string &group_id) { + uint64_t current_time = Utils::getCurrentMsTime(); + uint64_t diff = current_time - last_update_time_; + uint64_t threshold = + SdkConfig::getInstance()->manager_update_interval_ * MINUTE; + bool ret = CheckGroupid(group_id); + if (diff < threshold && ret) { + return true; + } + return false; +} } // namespace inlong diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.h b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.h index 8ae859869f3..f07333c9899 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.h +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.h @@ -51,6 +51,7 @@ class ProxyManager { std::thread update_conf_thread_; volatile bool inited_ = false; std::unordered_map cache_proxy_info_; + uint64_t last_update_time_; int32_t ParseAndGet(const std::string &key, const std::string &meta_data, ProxyInfoVec &proxy_info_vec); @@ -64,7 +65,8 @@ class ProxyManager { void DoUpdate(); void Init(); int32_t GetProxy(const std::string &groupid, ProxyInfoVec &proxy_info_vec); - int32_t GetProxyByGroupid(const std::string &inlong_group_id, ProxyInfoVec &proxy_info_vec); + int32_t GetProxyByGroupid(const std::string &inlong_group_id, + ProxyInfoVec &proxy_info_vec); int32_t GetProxyByClusterId(const std::string &cluster_id, ProxyInfoVec &proxy_info_vec); std::string GetGroupKey(const std::string &groupid); @@ -73,11 +75,18 @@ class ProxyManager { bool CheckClusterId(const std::string &cluster_id); void UpdateClusterId2ProxyMap(); void UpdateGroupid2ClusterIdMap(); - void BuildLocalCache(std::ofstream &file, int32_t groupid_index, const std::string &groupid, const std::string &meta_data); + void BuildLocalCache(std::ofstream &file, int32_t groupid_index, + const std::string &groupid, + const std::string &meta_data); void ReadLocalCache(); void WriteLocalCache(); - std::string RecoverFromLocalCache(const std::string&groupid); + std::string RecoverFromLocalCache(const std::string &groupid); std::string GetClusterID(const std::string &groupid); + void UpdateProxy( + std::unordered_map &group_id_2_cluster_id); + std::unordered_map BuildGroupId2ClusterId(); + uint64_t GetGroupIdCount(); + bool SkipUpdate(const std::string &group_id); }; } // namespace inlong diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h index eaa28212637..1dbebd03db6 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h @@ -25,9 +25,7 @@ namespace inlong { namespace constants { -enum IsolationLevel{ - kLevelOne =1, kLevelSecond =2, kLevelThird =3 -}; +enum IsolationLevel { kLevelOne = 1, kLevelSecond = 2, kLevelThird = 3 }; static const int32_t kMaxRequestTDMTimes = 4; static const char kAttrFormat[] = "__addcol1__reptime=yyyymmddHHMMSS&__addcol2_ip=xxx.xxx.xxx.xxx"; @@ -107,6 +105,7 @@ static const int32_t kWeight[30] = {1, 1, 1, 1, 1, 2, 2, 2, 2, 2, static const char kCacheFile[] = ".proxy_list.ini"; static const char kCacheTmpFile[] = ".proxy_list.ini.tmp"; +const int MAX_RETRY = 10; } // namespace constants } // namespace inlong