Skip to content

Commit

Permalink
[INLONG-10793][SDK] Added metric management for DataProxy CPP SDK (#1…
Browse files Browse the repository at this point in the history
  • Loading branch information
doleyzi authored Aug 19, 2024
1 parent 44adf62 commit 47c710b
Show file tree
Hide file tree
Showing 9 changed files with 368 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ include_directories(src/manager)
include_directories(src/group)
include_directories(src/protocol)
include_directories(src/client)
include_directories(src/metric)

link_directories(${PROJECT_SOURCE_DIR}/third_party/lib)
link_directories(${PROJECT_SOURCE_DIR}/third_party/lib64)
Expand All @@ -57,7 +58,7 @@ aux_source_directory(src/protocol PROTOCOL)
aux_source_directory(src/client CLIENT)

# static library
add_library(dataproxy_sdk STATIC ${UTILS} ${CONFIGS} ${CORE} ${MANAGER} ${GROUP} ${PROTOCOL} ${CLIENT})
add_library(dataproxy_sdk STATIC ${UTILS} ${CONFIGS} ${CORE} ${MANAGER} ${GROUP} ${PROTOCOL} ${CLIENT} ${METRIC})

set_target_properties(dataproxy_sdk PROPERTIES OUTPUT_NAME "dataproxy_sdk" PREFIX "")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
#include <iostream>
#include <signal.h>

#include "metric_manager.h"

namespace inlong {
int32_t ApiImp::InitApi(const char *config_file_path) {
if (!__sync_bool_compare_and_swap(&inited_, false, true)) {
Expand Down Expand Up @@ -104,7 +106,7 @@ int32_t ApiImp::DoInit() {
LOG_INFO("inlong dataproxy cpp sdk Init complete!");

ProxyManager::GetInstance()->Init();
ProxyManager::GetInstance()->ReadLocalCache();
MetricManager::GetInstance()->Init();

for (int i = 0; i < SdkConfig::getInstance()->inlong_group_ids_.size(); i++) {
LOG_INFO("DoInit CheckConf inlong_group_id:"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#include "metric_manager.h"

#include <rapidjson/document.h>
#include <sys/prctl.h>
#include <unistd.h>

#include "../utils/logger.h"
#include "../utils/utils.h"
#include "../utils/capi_constant.h"

namespace inlong {
void MetricManager::Init() {
if (__sync_bool_compare_and_swap(&inited_, false, true)) {
update_thread_ = std::thread(&MetricManager::Run, this);
}
InitEnvironment();
}
void MetricManager::InitEnvironment() {
environment_.setType("cpp");
environment_.setVersion(constants::kVersion);
environment_.setPid(getpid());
environment_.setIp(SdkConfig::getInstance()->local_ip_);
}
void MetricManager::Run() {
prctl(PR_SET_NAME, "metric-manager");
while (running_) {
LOG_INFO("Start report metric");
PrintMetric();
std::this_thread::sleep_for(std::chrono::minutes(constants::kMetricIntervalMinutes));
}
}
void MetricManager::PrintMetric() {
std::unordered_map<std::string, Metric> stat_map;
{
std::lock_guard<std::mutex> lck(mutex_);
stat_map.swap(stat_map_);
}

LOG_INFO("[MetricManager] Environment info: " << environment_.ToString());

for (auto it : stat_map) {
LOG_INFO("[MetricManager] Metric info: " << it.first << " " << it.second.ToString());
}
}
} // namespace inlong
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#include <queue>
#include <thread>
#include <unordered_map>

#include "../config/sdk_conf.h"
#include "../metric/environment.h"
#include "../metric/metric.h"

#ifndef INLONG_METRIC_MANAGER_H
#define INLONG_METRIC_MANAGER_H
namespace inlong {
using MetricMap = std::unordered_map<std::string, Metric>;
static const char kStatJoiner = ' ';
class MetricManager {
private:
mutable std::mutex mutex_;
MetricMap stat_map_;
std::thread update_thread_;
volatile bool inited_ = false;
bool running_ = true;
Environment environment_;
std::string coreParma_;

MetricManager() {

}

public:
static MetricManager *GetInstance() {
static MetricManager instance;
return &instance;
}
void Init();
void InitEnvironment();
void PrintMetric();
void Run();
void UpdateMetric(const std::string &stat_key, Metric &stat) {
std::lock_guard<std::mutex> lck(mutex_);
stat_map_[stat_key].Update(stat);
}

void AddReceiveBufferFullCount(const std::string &inlong_group_id, const std::string &inlong_stream_id,uint64_t count) {
std::lock_guard<std::mutex> lck(mutex_);
std::string stat_key= BuildStatKey(inlong_group_id,inlong_stream_id);
stat_map_[stat_key].AddReceiveBufferFullCount(count);
}

void AddTooLongMsgCount(const std::string &inlong_group_id, const std::string &inlong_stream_id,uint64_t count) {
std::lock_guard<std::mutex> lck(mutex_);
std::string stat_key= BuildStatKey(inlong_group_id,inlong_stream_id);
stat_map_[stat_key].AddTooLongMsgCount(count);
}

void AddMetadataFailCount(const std::string &inlong_group_id, const std::string &inlong_stream_id,uint64_t count) {
std::lock_guard<std::mutex> lck(mutex_);
std::string stat_key= BuildStatKey(inlong_group_id,inlong_stream_id);
stat_map_[stat_key].AddMetadataFailCount(count);
}

void Reset();

std::string BuildStatKey(const std::string &inlong_group_id, const std::string &inlong_stream_id) {
return inlong_group_id + kStatJoiner + inlong_stream_id;
}

~MetricManager() {
running_ = false;
if (update_thread_.joinable()) {
update_thread_.join();
}
}
};
} // namespace inlong
#endif // INLONG_METRIC_MANAGER_H
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ void ProxyManager::Init() {
timeout_ = SdkConfig::getInstance()->manager_url_timeout_;
last_update_time_ = Utils::getCurrentMsTime();
if (__sync_bool_compare_and_swap(&inited_, false, true)) {
ReadLocalCache();
update_conf_thread_ = std::thread(&ProxyManager::Update, this);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#ifndef INLONG_ENVIRONMENT_H
#define INLONG_ENVIRONMENT_H

#include <string>
#include <sstream>
namespace inlong {
class Environment {
public:
std::string type_;
std::string version_;
std::string ip_;
uint64_t pid_;
const std::string &getType() const { return type_; }
void setType(const std::string &type) { type_ = type; }
std::string getVersion() { return version_; }
void setVersion(const std::string &version) { version_ = version; }
const std::string &getIp() const { return ip_; }
void setIp(const std::string &ip) { ip_ = ip; }
uint64_t getPid() const { return pid_; }
void setPid(uint64_t pid) { pid_ = pid; }

std::string ToString() const {
std::stringstream metric;
metric << "local ip[" << ip_ << "] ";
metric << "version [" << version_ << "] ";
metric << "pid [" << pid_ << "] ";
return metric.str();
}
};
} // namespace inlong
#endif // INLONG_ENVIRONMENT_H
Loading

0 comments on commit 47c710b

Please sign in to comment.