Skip to content

Commit

Permalink
chore: update code style
Browse files Browse the repository at this point in the history
  • Loading branch information
catdogpandas committed Dec 17, 2024
1 parent 0cfd01e commit 2edd176
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 40 deletions.
38 changes: 14 additions & 24 deletions core/plugin/processor/inner/ProcessorPromParseMetricNative.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,9 @@ bool ProcessorPromParseMetricNative::Init(const Json::Value& config) {
}

void ProcessorPromParseMetricNative::Process(PipelineEventGroup& eGroup) {
if (!eGroup.HasMetadata(EventGroupMetaKey::PROMETHEUS_STREAM_ID)) {
LOG_WARNING(sLogger, ("unexpected event", "need prom stream id"));
return;
}

EventsContainer& events = eGroup.MutableEvents();
auto rawEvents = std::move(events);
events.reserve(rawEvents.size());
EventsContainer newEvents;
newEvents.reserve(events.size());

StringView scrapeTimestampMilliSecStr = eGroup.GetMetadata(EventGroupMetaKey::PROMETHEUS_SCRAPE_TIMESTAMP_MILLISEC);
auto timestampMilliSec = StringTo<uint64_t>(scrapeTimestampMilliSecStr.to_string());
Expand All @@ -40,36 +35,31 @@ void ProcessorPromParseMetricNative::Process(PipelineEventGroup& eGroup) {
TextParser parser(mScrapeConfigPtr->mHonorTimestamps);
parser.SetDefaultTimestamp(timestamp, nanoSec);

for (auto& rawEvent : rawEvents) {
ProcessEvent(rawEvent, events, eGroup, parser);
for (auto& e : events) {
ProcessEvent(e, newEvents, eGroup, parser);
}
events.swap(newEvents);
eGroup.SetMetadata(EventGroupMetaKey::PROMETHEUS_SAMPLES_SCRAPED, ToString(events.size()));
}

bool ProcessorPromParseMetricNative::IsSupportedEvent(const PipelineEventPtr& e) const {
return e.Is<RawEvent>();
}

bool ProcessorPromParseMetricNative::ProcessEvent(PipelineEventPtr& e,
EventsContainer& events,
EventsContainer& newEvents,
PipelineEventGroup& eGroup,
TextParser& parser) {
if (!e.Is<RawEvent>()) {
LOG_WARNING(sLogger, ("unexpected event type", "need raw event"));
if (!IsSupportedEvent(e)) {
return false;
}
auto rawEvent = e.Cast<RawEvent>();
auto content = rawEvent.GetContent();
if (content.empty()) {
LOG_WARNING(sLogger, ("empty content", ""));
return false;
auto& sourceEvent = e.Cast<RawEvent>();
std::unique_ptr<MetricEvent> metricEvent = eGroup.CreateMetricEvent(true);
if (parser.ParseLine(sourceEvent.GetContent(), *metricEvent)) {
metricEvent->SetTag(string(prometheus::NAME), metricEvent->GetName());
newEvents.emplace_back(std::move(metricEvent), true, nullptr);
}
auto metricEvent = eGroup.CreateMetricEvent(true);
if (parser.ParseLine(content, *metricEvent)) {
metricEvent->SetTagNoCopy(prometheus::NAME, metricEvent->GetName());
events.emplace_back(std::move(metricEvent), true, nullptr);
return true;
}
return false;
return true;
}

} // namespace logtail
5 changes: 0 additions & 5 deletions core/plugin/processor/inner/ProcessorPromParseMetricNative.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@
#include "prometheus/labels/TextParser.h"
#include "prometheus/schedulers/ScrapeConfig.h"

DECLARE_FLAG_INT32(process_thread_count);

namespace logtail {
class ProcessorPromParseMetricNative : public Processor {
public:
Expand All @@ -24,9 +22,6 @@ class ProcessorPromParseMetricNative : public Processor {

private:
bool ProcessEvent(PipelineEventPtr&, EventsContainer&, PipelineEventGroup&, TextParser& parser);
void
AddEvent(const char* data, size_t size, EventsContainer& events, PipelineEventGroup& eGroup, TextParser& parser);

std::unique_ptr<ScrapeConfig> mScrapeConfigPtr;

#ifdef APSARA_UNIT_TEST_MAIN
Expand Down
16 changes: 8 additions & 8 deletions core/plugin/processor/inner/ProcessorPromRelabelMetricNative.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,13 @@ bool ProcessorPromRelabelMetricNative::Init(const Json::Value& config) {
return true;
}

void ProcessorPromRelabelMetricNative::Process(PipelineEventGroup& eGroup) {
void ProcessorPromRelabelMetricNative::Process(PipelineEventGroup& metricGroup) {
// if mMetricRelabelConfigs is empty and honor_labels is true, skip it
auto targetTags = eGroup.GetTags();
auto targetTags = metricGroup.GetTags();
auto toDelete = GetToDeleteTargetLabels(targetTags);

if (!mScrapeConfigPtr->mMetricRelabelConfigs.Empty() || !targetTags.empty()) {
EventsContainer& events = eGroup.MutableEvents();
EventsContainer& events = metricGroup.MutableEvents();
size_t wIdx = 0;
for (size_t rIdx = 0; rIdx < events.size(); ++rIdx) {
if (ProcessEvent(events[rIdx], targetTags, toDelete)) {
Expand All @@ -68,19 +68,19 @@ void ProcessorPromRelabelMetricNative::Process(PipelineEventGroup& eGroup) {

// delete mTags when key starts with __
for (const auto& k : toDelete) {
eGroup.DelTag(k);
metricGroup.DelTag(k);
}

if (eGroup.HasMetadata(EventGroupMetaKey::PROMETHEUS_STREAM_TOTAL)) {
if (metricGroup.HasMetadata(EventGroupMetaKey::PROMETHEUS_STREAM_TOTAL)) {
auto autoMetric = prom::AutoMetric();
UpdateAutoMetrics(eGroup, autoMetric);
AddAutoMetrics(eGroup, autoMetric);
UpdateAutoMetrics(metricGroup, autoMetric);
AddAutoMetrics(metricGroup, autoMetric);
}


// delete all tags
for (const auto& [k, v] : targetTags) {
eGroup.DelTag(k);
metricGroup.DelTag(k);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@
#include "pipeline/plugin/interface/Processor.h"
#include "prometheus/schedulers/ScrapeConfig.h"

DECLARE_FLAG_INT32(process_thread_count);

namespace logtail {

namespace prom {
Expand All @@ -45,7 +43,7 @@ class ProcessorPromRelabelMetricNative : public Processor {

const std::string& Name() const override { return sName; }
bool Init(const Json::Value& config) override;
void Process(PipelineEventGroup&) override;
void Process(PipelineEventGroup& metricGroup) override;

protected:
bool IsSupportedEvent(const PipelineEventPtr& e) const override;
Expand Down

0 comments on commit 2edd176

Please sign in to comment.