diff --git a/core/plugin/processor/inner/ProcessorPromParseMetricNative.cpp b/core/plugin/processor/inner/ProcessorPromParseMetricNative.cpp index 9950e9df29..95b81a569a 100644 --- a/core/plugin/processor/inner/ProcessorPromParseMetricNative.cpp +++ b/core/plugin/processor/inner/ProcessorPromParseMetricNative.cpp @@ -24,14 +24,9 @@ bool ProcessorPromParseMetricNative::Init(const Json::Value& config) { } void ProcessorPromParseMetricNative::Process(PipelineEventGroup& eGroup) { - if (!eGroup.HasMetadata(EventGroupMetaKey::PROMETHEUS_STREAM_ID)) { - LOG_WARNING(sLogger, ("unexpected event", "need prom stream id")); - return; - } - EventsContainer& events = eGroup.MutableEvents(); - auto rawEvents = std::move(events); - events.reserve(rawEvents.size()); + EventsContainer newEvents; + newEvents.reserve(events.size()); StringView scrapeTimestampMilliSecStr = eGroup.GetMetadata(EventGroupMetaKey::PROMETHEUS_SCRAPE_TIMESTAMP_MILLISEC); auto timestampMilliSec = StringTo(scrapeTimestampMilliSecStr.to_string()); @@ -40,9 +35,11 @@ void ProcessorPromParseMetricNative::Process(PipelineEventGroup& eGroup) { TextParser parser(mScrapeConfigPtr->mHonorTimestamps); parser.SetDefaultTimestamp(timestamp, nanoSec); - for (auto& rawEvent : rawEvents) { - ProcessEvent(rawEvent, events, eGroup, parser); + for (auto& e : events) { + ProcessEvent(e, newEvents, eGroup, parser); } + events.swap(newEvents); + eGroup.SetMetadata(EventGroupMetaKey::PROMETHEUS_SAMPLES_SCRAPED, ToString(events.size())); } bool ProcessorPromParseMetricNative::IsSupportedEvent(const PipelineEventPtr& e) const { @@ -50,26 +47,19 @@ bool ProcessorPromParseMetricNative::IsSupportedEvent(const PipelineEventPtr& e) } bool ProcessorPromParseMetricNative::ProcessEvent(PipelineEventPtr& e, - EventsContainer& events, + EventsContainer& newEvents, PipelineEventGroup& eGroup, TextParser& parser) { - if (!e.Is()) { - LOG_WARNING(sLogger, ("unexpected event type", "need raw event")); + if (!IsSupportedEvent(e)) { return false; } - auto rawEvent = e.Cast(); - auto content = rawEvent.GetContent(); - if (content.empty()) { - LOG_WARNING(sLogger, ("empty content", "")); - return false; + auto& sourceEvent = e.Cast(); + std::unique_ptr metricEvent = eGroup.CreateMetricEvent(true); + if (parser.ParseLine(sourceEvent.GetContent(), *metricEvent)) { + metricEvent->SetTag(string(prometheus::NAME), metricEvent->GetName()); + newEvents.emplace_back(std::move(metricEvent), true, nullptr); } - auto metricEvent = eGroup.CreateMetricEvent(true); - if (parser.ParseLine(content, *metricEvent)) { - metricEvent->SetTagNoCopy(prometheus::NAME, metricEvent->GetName()); - events.emplace_back(std::move(metricEvent), true, nullptr); - return true; - } - return false; + return true; } } // namespace logtail diff --git a/core/plugin/processor/inner/ProcessorPromParseMetricNative.h b/core/plugin/processor/inner/ProcessorPromParseMetricNative.h index f72327ce62..f9c036c58a 100644 --- a/core/plugin/processor/inner/ProcessorPromParseMetricNative.h +++ b/core/plugin/processor/inner/ProcessorPromParseMetricNative.h @@ -8,8 +8,6 @@ #include "prometheus/labels/TextParser.h" #include "prometheus/schedulers/ScrapeConfig.h" -DECLARE_FLAG_INT32(process_thread_count); - namespace logtail { class ProcessorPromParseMetricNative : public Processor { public: @@ -24,9 +22,6 @@ class ProcessorPromParseMetricNative : public Processor { private: bool ProcessEvent(PipelineEventPtr&, EventsContainer&, PipelineEventGroup&, TextParser& parser); - void - AddEvent(const char* data, size_t size, EventsContainer& events, PipelineEventGroup& eGroup, TextParser& parser); - std::unique_ptr mScrapeConfigPtr; #ifdef APSARA_UNIT_TEST_MAIN diff --git a/core/plugin/processor/inner/ProcessorPromRelabelMetricNative.cpp b/core/plugin/processor/inner/ProcessorPromRelabelMetricNative.cpp index 9caf9eecdb..5c595282c6 100644 --- a/core/plugin/processor/inner/ProcessorPromRelabelMetricNative.cpp +++ b/core/plugin/processor/inner/ProcessorPromRelabelMetricNative.cpp @@ -47,13 +47,13 @@ bool ProcessorPromRelabelMetricNative::Init(const Json::Value& config) { return true; } -void ProcessorPromRelabelMetricNative::Process(PipelineEventGroup& eGroup) { +void ProcessorPromRelabelMetricNative::Process(PipelineEventGroup& metricGroup) { // if mMetricRelabelConfigs is empty and honor_labels is true, skip it - auto targetTags = eGroup.GetTags(); + auto targetTags = metricGroup.GetTags(); auto toDelete = GetToDeleteTargetLabels(targetTags); if (!mScrapeConfigPtr->mMetricRelabelConfigs.Empty() || !targetTags.empty()) { - EventsContainer& events = eGroup.MutableEvents(); + EventsContainer& events = metricGroup.MutableEvents(); size_t wIdx = 0; for (size_t rIdx = 0; rIdx < events.size(); ++rIdx) { if (ProcessEvent(events[rIdx], targetTags, toDelete)) { @@ -68,19 +68,19 @@ void ProcessorPromRelabelMetricNative::Process(PipelineEventGroup& eGroup) { // delete mTags when key starts with __ for (const auto& k : toDelete) { - eGroup.DelTag(k); + metricGroup.DelTag(k); } - if (eGroup.HasMetadata(EventGroupMetaKey::PROMETHEUS_STREAM_TOTAL)) { + if (metricGroup.HasMetadata(EventGroupMetaKey::PROMETHEUS_STREAM_TOTAL)) { auto autoMetric = prom::AutoMetric(); - UpdateAutoMetrics(eGroup, autoMetric); - AddAutoMetrics(eGroup, autoMetric); + UpdateAutoMetrics(metricGroup, autoMetric); + AddAutoMetrics(metricGroup, autoMetric); } // delete all tags for (const auto& [k, v] : targetTags) { - eGroup.DelTag(k); + metricGroup.DelTag(k); } } diff --git a/core/plugin/processor/inner/ProcessorPromRelabelMetricNative.h b/core/plugin/processor/inner/ProcessorPromRelabelMetricNative.h index 4f598de65a..2c0a0c027f 100644 --- a/core/plugin/processor/inner/ProcessorPromRelabelMetricNative.h +++ b/core/plugin/processor/inner/ProcessorPromRelabelMetricNative.h @@ -23,8 +23,6 @@ #include "pipeline/plugin/interface/Processor.h" #include "prometheus/schedulers/ScrapeConfig.h" -DECLARE_FLAG_INT32(process_thread_count); - namespace logtail { namespace prom { @@ -45,7 +43,7 @@ class ProcessorPromRelabelMetricNative : public Processor { const std::string& Name() const override { return sName; } bool Init(const Json::Value& config) override; - void Process(PipelineEventGroup&) override; + void Process(PipelineEventGroup& metricGroup) override; protected: bool IsSupportedEvent(const PipelineEventPtr& e) const override;