-
Notifications
You must be signed in to change notification settings - Fork 25
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Metrics #24
base: master
Are you sure you want to change the base?
Metrics #24
Changes from all commits
dc5ffb1
e6233a5
49e7de7
314b2de
004c413
447825b
7430b79
43fc588
f620d8b
d33e954
36fbcbc
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1 +1,3 @@ | ||
*.gem | ||
nclude-dependencies | ||
Gemfile.lock |
This file was deleted.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,9 +1,13 @@ | ||
# frozen_string_literal: true | ||
require 'fluent/plugin/filter' | ||
require 'fluent/plugin/prometheus' | ||
|
||
module Fluent::Plugin | ||
class ThrottleFilter < Filter | ||
Fluent::Plugin.register_filter('throttle', self) | ||
include Fluent::Plugin::PrometheusLabelParser | ||
include Fluent::Plugin::Prometheus | ||
attr_reader :registry | ||
|
||
desc "Used to group logs. Groups are rate limited independently" | ||
config_param :group_key, :array, :default => ['kubernetes.container_name'] | ||
|
@@ -35,6 +39,11 @@ class ThrottleFilter < Filter | |
DESC | ||
config_param :group_warning_delay_s, :integer, :default => 10 | ||
|
||
desc <<~DESC | ||
Whether to emit a metric when the rate limit is exceeded. The metric is fluentd_throttle_rate_limit_exceeded | ||
DESC | ||
config_param :enable_metrics, :bool, :default => false | ||
|
||
Group = Struct.new( | ||
:rate_count, | ||
:rate_last_reset, | ||
|
@@ -43,10 +52,17 @@ class ThrottleFilter < Filter | |
:bucket_last_reset, | ||
:last_warning) | ||
|
||
def initialize | ||
super | ||
@registry = ::Prometheus::Client.registry | ||
end | ||
|
||
def configure(conf) | ||
super | ||
|
||
@group_key_paths = group_key.map { |key| key.split(".") } | ||
# Replace invalid characters with underscores and convert to symbols for valid labels | ||
@group_key_symbols = (group_key.map {|str| str.gsub(/[^a-zA-Z0-9_]/,'_')}).map(&:to_sym) | ||
|
||
raise "group_bucket_period_s must be > 0" \ | ||
unless @group_bucket_period_s > 0 | ||
|
@@ -68,12 +84,27 @@ def configure(conf) | |
|
||
raise "group_warning_delay_s must be >= 1" \ | ||
unless @group_warning_delay_s >= 1 | ||
|
||
if @enable_metrics | ||
hostname = Socket.gethostname | ||
expander_builder = Fluent::Plugin::Prometheus.placeholder_expander(log) | ||
expander = expander_builder.build({}) | ||
@base_labels = parse_labels_elements(conf) | ||
@base_labels.each do |key, value| | ||
unless value.is_a?(String) | ||
raise Fluent::ConfigError, "record accessor syntax is not available in metric labels for throttle plugin" | ||
end | ||
@base_labels[key] = expander.expand(value) | ||
end | ||
end | ||
end | ||
|
||
def start | ||
super | ||
|
||
@counters = {} | ||
# TODO add more relevant metrics to throttling | ||
@metrics = {throttle_rate_limit_exceeded: get_counter(:fluentd_throttle_rate_limit_exceeded, "The exceeded rate of pods in the group")} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this can be a variable. I dont think we need a map here There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This was made as a map just to make it standard to add more metrics. It can act as a template to add more metrics by adding in the map |
||
end | ||
|
||
def shutdown | ||
|
@@ -85,8 +116,8 @@ def filter(tag, time, record) | |
now = Time.now | ||
rate_limit_exceeded = @group_drop_logs ? nil : record # return nil on rate_limit_exceeded to drop the record | ||
group = extract_group(record) | ||
# Ruby hashes are ordered by insertion. | ||
|
||
# Ruby hashes are ordered by insertion. | ||
# Deleting and inserting moves the item to the end of the hash (most recently used) | ||
counter = @counters[group] = @counters.delete(group) || Group.new(0, now, 0, 0, now, nil) | ||
|
||
|
@@ -151,6 +182,15 @@ def extract_group(record) | |
end | ||
|
||
def log_rate_limit_exceeded(now, group, counter) | ||
# Check if metrics are enabled | ||
if @enable_metrics | ||
# We create the new hash of label to label values for the metric | ||
groupped_label = @group_key_symbols.zip(group).to_h | ||
metric = @metrics[:throttle_rate_limit_exceeded] | ||
log.debug("current rate",counter.rate_count,"current metric",metric.get(labels: @base_labels.merge(groupped_label))) | ||
metric.increment(by: 1, labels: @base_labels.merge(groupped_label)) | ||
end | ||
|
||
emit = counter.last_warning == nil ? true \ | ||
: (now - counter.last_warning) >= @group_warning_delay_s | ||
if emit | ||
|
@@ -176,5 +216,15 @@ def log_items(now, group, counter) | |
'rate_limit_s': @group_rate_limit, | ||
'reset_rate_s': @group_reset_rate_s} | ||
end | ||
|
||
def get_counter(name, docstring) | ||
if @enable_metrics | ||
if @registry.exist?(name) | ||
@registry.get(name) | ||
else | ||
@registry.counter(name, docstring: docstring, labels: @base_labels.keys + @group_key_symbols) | ||
end | ||
end | ||
end | ||
end | ||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you fix this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In general the lock file is not included in the plugins(since it depends on build env). Thats why i didnt include. I checked with other plugins too. Its the same