diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/CMakeLists.txt b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/CMakeLists.txt index ccbab353360..2a3183fff33 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/CMakeLists.txt +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/CMakeLists.txt @@ -33,6 +33,7 @@ include_directories(src/manager) include_directories(src/group) include_directories(src/protocol) include_directories(src/client) +include_directories(src/metric) link_directories(${PROJECT_SOURCE_DIR}/third_party/lib) link_directories(${PROJECT_SOURCE_DIR}/third_party/lib64) @@ -57,7 +58,7 @@ aux_source_directory(src/protocol PROTOCOL) aux_source_directory(src/client CLIENT) # static library -add_library(dataproxy_sdk STATIC ${UTILS} ${CONFIGS} ${CORE} ${MANAGER} ${GROUP} ${PROTOCOL} ${CLIENT}) +add_library(dataproxy_sdk STATIC ${UTILS} ${CONFIGS} ${CORE} ${MANAGER} ${GROUP} ${PROTOCOL} ${CLIENT} ${METRIC}) set_target_properties(dataproxy_sdk PROPERTIES OUTPUT_NAME "dataproxy_sdk" PREFIX "") diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.cc b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.cc index c4b493b068a..13ce7223b52 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.cc +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/core/api_imp.cc @@ -25,6 +25,8 @@ #include #include +#include "metric_manager.h" + namespace inlong { int32_t ApiImp::InitApi(const char *config_file_path) { if (!__sync_bool_compare_and_swap(&inited_, false, true)) { @@ -104,7 +106,7 @@ int32_t ApiImp::DoInit() { LOG_INFO("inlong dataproxy cpp sdk Init complete!"); ProxyManager::GetInstance()->Init(); - ProxyManager::GetInstance()->ReadLocalCache(); + MetricManager::GetInstance()->Init(); for (int i = 0; i < SdkConfig::getInstance()->inlong_group_ids_.size(); i++) { LOG_INFO("DoInit CheckConf inlong_group_id:" diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/metric_manager.cc b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/metric_manager.cc new file mode 100644 index 00000000000..061abc06782 --- /dev/null +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/metric_manager.cc @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "metric_manager.h" + +#include +#include +#include + +#include "../utils/logger.h" +#include "../utils/utils.h" +#include "../utils/capi_constant.h" + +namespace inlong { +void MetricManager::Init() { + if (__sync_bool_compare_and_swap(&inited_, false, true)) { + update_thread_ = std::thread(&MetricManager::Run, this); + } + InitEnvironment(); +} +void MetricManager::InitEnvironment() { + environment_.setType("cpp"); + environment_.setVersion(constants::kVersion); + environment_.setPid(getpid()); + environment_.setIp(SdkConfig::getInstance()->local_ip_); +} +void MetricManager::Run() { + prctl(PR_SET_NAME, "metric-manager"); + while (running_) { + LOG_INFO("Start report metric"); + PrintMetric(); + std::this_thread::sleep_for(std::chrono::minutes(constants::kMetricIntervalMinutes)); + } +} +void MetricManager::PrintMetric() { + std::unordered_map stat_map; + { + std::lock_guard lck(mutex_); + stat_map.swap(stat_map_); + } + + LOG_INFO("[MetricManager] Environment info: " << environment_.ToString()); + + for (auto it : stat_map) { + LOG_INFO("[MetricManager] Metric info: " << it.first << " " << it.second.ToString()); + } +} +} // namespace inlong diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/metric_manager.h b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/metric_manager.h new file mode 100644 index 00000000000..5dc013f2d5d --- /dev/null +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/metric_manager.h @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include +#include +#include + +#include "../config/sdk_conf.h" +#include "../metric/environment.h" +#include "../metric/metric.h" + +#ifndef INLONG_METRIC_MANAGER_H +#define INLONG_METRIC_MANAGER_H +namespace inlong { +using MetricMap = std::unordered_map; +static const char kStatJoiner = ' '; +class MetricManager { + private: + mutable std::mutex mutex_; + MetricMap stat_map_; + std::thread update_thread_; + volatile bool inited_ = false; + bool running_ = true; + Environment environment_; + std::string coreParma_; + + MetricManager() { + + } + + public: + static MetricManager *GetInstance() { + static MetricManager instance; + return &instance; + } + void Init(); + void InitEnvironment(); + void PrintMetric(); + void Run(); + void UpdateMetric(const std::string &stat_key, Metric &stat) { + std::lock_guard lck(mutex_); + stat_map_[stat_key].Update(stat); + } + + void AddReceiveBufferFullCount(const std::string &inlong_group_id, const std::string &inlong_stream_id,uint64_t count) { + std::lock_guard lck(mutex_); + std::string stat_key= BuildStatKey(inlong_group_id,inlong_stream_id); + stat_map_[stat_key].AddReceiveBufferFullCount(count); + } + + void AddTooLongMsgCount(const std::string &inlong_group_id, const std::string &inlong_stream_id,uint64_t count) { + std::lock_guard lck(mutex_); + std::string stat_key= BuildStatKey(inlong_group_id,inlong_stream_id); + stat_map_[stat_key].AddTooLongMsgCount(count); + } + + void AddMetadataFailCount(const std::string &inlong_group_id, const std::string &inlong_stream_id,uint64_t count) { + std::lock_guard lck(mutex_); + std::string stat_key= BuildStatKey(inlong_group_id,inlong_stream_id); + stat_map_[stat_key].AddMetadataFailCount(count); + } + + void Reset(); + + std::string BuildStatKey(const std::string &inlong_group_id, const std::string &inlong_stream_id) { + return inlong_group_id + kStatJoiner + inlong_stream_id; + } + + ~MetricManager() { + running_ = false; + if (update_thread_.joinable()) { + update_thread_.join(); + } + } +}; +} // namespace inlong +#endif // INLONG_METRIC_MANAGER_H diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc index e4aef239e4d..09014d728d1 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/manager/proxy_manager.cc @@ -44,6 +44,7 @@ void ProxyManager::Init() { timeout_ = SdkConfig::getInstance()->manager_url_timeout_; last_update_time_ = Utils::getCurrentMsTime(); if (__sync_bool_compare_and_swap(&inited_, false, true)) { + ReadLocalCache(); update_conf_thread_ = std::thread(&ProxyManager::Update, this); } } diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/metric/environment.h b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/metric/environment.h new file mode 100644 index 00000000000..e3d4a9bcc84 --- /dev/null +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/metric/environment.h @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef INLONG_ENVIRONMENT_H +#define INLONG_ENVIRONMENT_H + +#include +#include +namespace inlong { +class Environment { + public: + std::string type_; + std::string version_; + std::string ip_; + uint64_t pid_; + const std::string &getType() const { return type_; } + void setType(const std::string &type) { type_ = type; } + std::string getVersion() { return version_; } + void setVersion(const std::string &version) { version_ = version; } + const std::string &getIp() const { return ip_; } + void setIp(const std::string &ip) { ip_ = ip; } + uint64_t getPid() const { return pid_; } + void setPid(uint64_t pid) { pid_ = pid; } + + std::string ToString() const { + std::stringstream metric; + metric << "local ip[" << ip_ << "] "; + metric << "version [" << version_ << "] "; + metric << "pid [" << pid_ << "] "; + return metric.str(); + } +}; +} // namespace inlong +#endif // INLONG_ENVIRONMENT_H \ No newline at end of file diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/metric/metric.h b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/metric/metric.h new file mode 100644 index 00000000000..87bd991b28c --- /dev/null +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/metric/metric.h @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef INLONG_METRIC_H +#define INLONG_METRIC_H + +#include +#include +namespace inlong { +class Metric { + private: + uint64_t send_success_pack_num_; + uint64_t send_success_msg_num_; + uint64_t send_failed_pack_num_; + uint64_t send_failed_msg_num_; + + uint64_t time_cost_; + uint64_t time_cost_0t32_; + uint64_t time_cost_32t128_; + uint64_t time_cost_128t1024_; + uint64_t time_cost_1024t65536_; + + uint64_t receive_buffer_full_count_; + uint64_t too_long_msg_count_; + uint64_t metadata_fail_count_; + + public: + Metric() + : send_success_pack_num_(0), + send_success_msg_num_(0), + send_failed_pack_num_(0), + send_failed_msg_num_(0), + time_cost_(0), + time_cost_0t32_(0), + time_cost_32t128_(0), + time_cost_128t1024_(0), + time_cost_1024t65536_(0), + receive_buffer_full_count_(0), + too_long_msg_count_(0), + metadata_fail_count_(0) {} + + void AddSendSuccessPackNum(uint64_t num) { send_success_pack_num_ += num; } + void AddSendSuccessMsgNum(uint64_t num) { send_success_msg_num_ += num; } + void AddSendFailPackNum(uint64_t num) { send_failed_pack_num_ += num; } + void AddSendFailMsgNum(uint64_t num) { send_failed_msg_num_ += num; } + void AddReceiveBufferFullCount(uint64_t receive_buffer_full_count) { + receive_buffer_full_count_ += receive_buffer_full_count; + } + void AddTooLongMsgCount(uint64_t too_long_msg_count) { too_long_msg_count_ += too_long_msg_count; } + void AddMetadataFailCount(uint64_t metadata_fail_count) { metadata_fail_count_ += metadata_fail_count; } + + uint64_t GetSendSuccessPackNum() { return send_success_pack_num_; } + uint64_t GetSendSuccessMsgNum() { return send_success_msg_num_; } + uint64_t GetSendFailedPackNum() { return send_failed_pack_num_; } + uint64_t GetSendFailedMsgNum() { return send_failed_msg_num_; } + uint64_t GetTimeCost() { return time_cost_; } + uint64_t GetTimeCost0T32() const { return time_cost_0t32_; } + uint64_t GetTimeCost32T128() const { return time_cost_32t128_; } + uint64_t GetTimeCost128T1024() const { return time_cost_128t1024_; } + uint64_t GetTimeCost1024T65536() const { return time_cost_1024t65536_; } + uint64_t GetReceiveBufferFullCount() const { return receive_buffer_full_count_; } + uint64_t GetTooLongMsgCount() const { return too_long_msg_count_; } + uint64_t GetMetadataFailCount() const { return metadata_fail_count_; } + + void AddTimeCost(uint64_t time_cost) { + time_cost_ += time_cost; + if (time_cost < 32) { + time_cost_0t32_++; + return; + } else if (time_cost < 128) { + time_cost_32t128_++; + return; + } else if (time_cost < 1024) { + time_cost_128t1024_++; + return; + } else { + time_cost_1024t65536_++; + } + } + + void ResetStat() { + send_success_pack_num_ = 0; + send_success_msg_num_ = 0; + send_failed_pack_num_ = 0; + send_failed_msg_num_ = 0; + time_cost_ = 0; + time_cost_0t32_ = 0; + time_cost_32t128_ = 0; + time_cost_128t1024_ = 0; + time_cost_1024t65536_ = 0; + receive_buffer_full_count_ = 0; + too_long_msg_count_ = 0; + metadata_fail_count_ = 0; + } + + void Update(Metric stat) { + send_success_pack_num_ += stat.send_success_pack_num_; + send_success_msg_num_ += stat.send_success_msg_num_; + send_failed_pack_num_ += stat.send_failed_pack_num_; + send_failed_msg_num_ += stat.send_failed_msg_num_; + time_cost_ += stat.time_cost_; + time_cost_0t32_ += stat.time_cost_0t32_; + time_cost_32t128_ += stat.time_cost_32t128_; + time_cost_128t1024_ += stat.time_cost_128t1024_; + time_cost_1024t65536_ += stat.time_cost_1024t65536_; + } + + uint64_t getTransTime() const { + uint64_t pack_num = send_success_pack_num_ + send_failed_pack_num_ + 1; + return time_cost_ / pack_num; + } + + std::string GetSendMetricInfo() const { + std::stringstream metric; + metric << "success-pack[" << send_success_pack_num_ << "] "; + metric << "msg[" << send_success_msg_num_ << "] "; + metric << "failed-pack[" << send_failed_pack_num_ << "] "; + metric << "msg[" << send_failed_msg_num_ << "] "; + metric << "trans[" << getTransTime() << "] "; + return metric.str(); + } + std::string ToString() const { + std::stringstream metric; + metric << "success-pack[" << send_success_pack_num_ << "] "; + metric << "msg[" << send_success_msg_num_ << "] "; + metric << "failed-pack[" << send_failed_pack_num_ << "] "; + metric << "msg[" << send_failed_msg_num_ << "] "; + metric << "trans[" << getTransTime() << "] "; + metric << "buffer full[" << receive_buffer_full_count_ << "] "; + metric << "too long msg[" << too_long_msg_count_ << "] "; + metric << "metadata fail[" << metadata_fail_count_ << "] "; + return metric.str(); + } +}; +} // namespace inlong + +#endif // INLONG_METRIC_H diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/CMakeLists.txt b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/CMakeLists.txt index 6ecf82464b4..a95af7462f0 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/CMakeLists.txt +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/CMakeLists.txt @@ -19,6 +19,9 @@ cmake_minimum_required(VERSION 3.1) -aux_source_directory(. UTILS_SRCS) - +file(GLOB UTILS_SRCS + "*.cc" + "*.h" + ) add_library(utils STATIC ${UTILS_SRCS}) + diff --git a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h index 399bd1b3487..dbac24f6a61 100644 --- a/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h +++ b/inlong-sdk/dataproxy-sdk-twins/dataproxy-sdk-cpp/src/utils/capi_constant.h @@ -108,6 +108,7 @@ static const int32_t kWeight[30] = {1, 1, 1, 1, 1, 2, 2, 2, 2, 2, static const char kCacheFile[] = ".proxy_list.ini"; static const char kCacheTmpFile[] = ".proxy_list.ini.tmp"; const int MAX_RETRY = 10; +static const int kMetricIntervalMinutes = 1; } // namespace constants } // namespace inlong