Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Metrics #24

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
*.gem
nclude-dependencies

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you fix this?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general the lock file is not included in the plugins(since it depends on build env). Thats why i didnt include. I checked with other plugins too. Its the same

Gemfile.lock
87 changes: 0 additions & 87 deletions Gemfile.lock

This file was deleted.

37 changes: 37 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ $ gem install fluent-plugin-throttle
group_bucket_period_s 60
group_bucket_limit 6000
group_reset_rate_s 100
<labels>
key1 value1
key2 value2
<labels>
</filter>
```

Expand Down Expand Up @@ -129,6 +133,39 @@ When a group reaches its limit and as long as it is not reset, a warning
message with the current log rate of the group is emitted repeatedly. This is
the delay between every repetition.

#### enable\_metrics

Default: `false`.

When a group reaches its limit, metrics will be emitted for the logs being dropped if this value is true . This metrics can be scraped like any other metrics emitted in prometheus format. Group keys are available to identify the throttled groups as additional labels. \\
Metrics for the filter is
- `fluentd_throttle_rate_limit_exceeded`

## Labels

See [Prometheus Data Model](http://prometheus.io/docs/concepts/data_model/) first.

You can add custom labels with static values

### labels section

```
<labels>
key1 value1
key2 value2
</labels>
```

All labels sections has same format. Each lines have key/value for label.

You can use placeholder for label values. The placeholders will be expanded from reserved values and records.
If you specify `${hostname}`, it will be expanded by value of a hostname where fluentd runs.

Reserved placeholders are:

- `${hostname}`: hostname
- `${worker_id}`: fluent worker id

## License

Apache License, Version 2.0
Expand Down
10 changes: 6 additions & 4 deletions fluent-plugin-throttle.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ $LOAD_PATH.unshift(lib) unless $LOAD_PATH.include?(lib)

Gem::Specification.new do |spec|
spec.name = "fluent-plugin-throttle"
spec.version = "0.0.5"
spec.version = "0.0.6"
spec.authors = ["François-Xavier Bourlet"]
spec.email = ["fx.bourlet@rubrik.com"]
spec.summary = %q{Fluentd filter for throttling logs based on a configurable key.}
Expand All @@ -16,14 +16,16 @@ Gem::Specification.new do |spec|
spec.test_files = spec.files.grep(%r{^(test|spec|features)/})
spec.require_paths = ["lib"]

spec.add_development_dependency "bundler", "~> 1.16"
spec.add_development_dependency "rake", "~> 12.3"
spec.add_development_dependency "bundler", "~> 2.5"
spec.add_development_dependency "rake", "~> 13.2"
spec.add_development_dependency "webmock", "~> 3.3"
spec.add_development_dependency "test-unit", "~> 3.2"
spec.add_development_dependency "appraisal", "~> 2.2"
spec.add_development_dependency "mocha"
spec.add_development_dependency "maxitest"
spec.add_development_dependency "single_cov"

spec.add_runtime_dependency "fluentd", "~> 1.1"
spec.add_dependency "prometheus-client", '~> 4.2'
spec.add_dependency "fluentd", "~> 1.1"
spec.add_dependency "fluent-plugin-prometheus", "~> 2.1"
end
54 changes: 52 additions & 2 deletions lib/fluent/plugin/filter_throttle.rb
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
# frozen_string_literal: true
require 'fluent/plugin/filter'
require 'fluent/plugin/prometheus'

module Fluent::Plugin
class ThrottleFilter < Filter
Fluent::Plugin.register_filter('throttle', self)
include Fluent::Plugin::PrometheusLabelParser
include Fluent::Plugin::Prometheus
attr_reader :registry

desc "Used to group logs. Groups are rate limited independently"
config_param :group_key, :array, :default => ['kubernetes.container_name']
Expand Down Expand Up @@ -35,6 +39,11 @@ class ThrottleFilter < Filter
DESC
config_param :group_warning_delay_s, :integer, :default => 10

desc <<~DESC
Whether to emit a metric when the rate limit is exceeded. The metric is fluentd_throttle_rate_limit_exceeded
DESC
config_param :enable_metrics, :bool, :default => false

Group = Struct.new(
:rate_count,
:rate_last_reset,
Expand All @@ -43,10 +52,17 @@ class ThrottleFilter < Filter
:bucket_last_reset,
:last_warning)

def initialize
super
@registry = ::Prometheus::Client.registry
end

def configure(conf)
super

@group_key_paths = group_key.map { |key| key.split(".") }
# Replace invalid characters with underscores and convert to symbols for valid labels
@group_key_symbols = (group_key.map {|str| str.gsub(/[^a-zA-Z0-9_]/,'_')}).map(&:to_sym)

raise "group_bucket_period_s must be > 0" \
unless @group_bucket_period_s > 0
Expand All @@ -68,12 +84,27 @@ def configure(conf)

raise "group_warning_delay_s must be >= 1" \
unless @group_warning_delay_s >= 1

if @enable_metrics
hostname = Socket.gethostname
expander_builder = Fluent::Plugin::Prometheus.placeholder_expander(log)
expander = expander_builder.build({})
@base_labels = parse_labels_elements(conf)
@base_labels.each do |key, value|
unless value.is_a?(String)
raise Fluent::ConfigError, "record accessor syntax is not available in metric labels for throttle plugin"
end
@base_labels[key] = expander.expand(value)
end
end
end

def start
super

@counters = {}
# TODO add more relevant metrics to throttling
@metrics = {throttle_rate_limit_exceeded: get_counter(:fluentd_throttle_rate_limit_exceeded, "The exceeded rate of pods in the group")}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this can be a variable. I dont think we need a map here

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was made as a map just to make it standard to add more metrics. It can act as a template to add more metrics by adding in the map

end

def shutdown
Expand All @@ -85,8 +116,8 @@ def filter(tag, time, record)
now = Time.now
rate_limit_exceeded = @group_drop_logs ? nil : record # return nil on rate_limit_exceeded to drop the record
group = extract_group(record)
# Ruby hashes are ordered by insertion.

# Ruby hashes are ordered by insertion.
# Deleting and inserting moves the item to the end of the hash (most recently used)
counter = @counters[group] = @counters.delete(group) || Group.new(0, now, 0, 0, now, nil)

Expand Down Expand Up @@ -151,6 +182,15 @@ def extract_group(record)
end

def log_rate_limit_exceeded(now, group, counter)
# Check if metrics are enabled
if @enable_metrics
# We create the new hash of label to label values for the metric
groupped_label = @group_key_symbols.zip(group).to_h
metric = @metrics[:throttle_rate_limit_exceeded]
log.debug("current rate",counter.rate_count,"current metric",metric.get(labels: @base_labels.merge(groupped_label)))
metric.increment(by: 1, labels: @base_labels.merge(groupped_label))
end

emit = counter.last_warning == nil ? true \
: (now - counter.last_warning) >= @group_warning_delay_s
if emit
Expand All @@ -176,5 +216,15 @@ def log_items(now, group, counter)
'rate_limit_s': @group_rate_limit,
'reset_rate_s': @group_reset_rate_s}
end

def get_counter(name, docstring)
if @enable_metrics
if @registry.exist?(name)
@registry.get(name)
else
@registry.counter(name, docstring: docstring, labels: @base_labels.keys + @group_key_symbols)
end
end
end
end
end
29 changes: 29 additions & 0 deletions test/fluent/plugin/filter_throttle_test.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# frozen_string_literal: true
require_relative '../../helper'
require 'fluent/plugin/filter_throttle'
require 'fluent/plugin/prometheus'

SingleCov.covered!

Expand Down Expand Up @@ -178,5 +179,33 @@ def create_driver(conf='')

assert driver.logs.any? { |log| log.include?('rate back down') }
end

it 'emit metrics when enabled and rate exceeds - multiple keys' do
driver = create_driver <<~CONF
group_key "group1,group2"
group_bucket_period_s 1
group_bucket_limit 5
enable_metrics true
CONF

driver.run(default_tag: "test") do
driver.feed([[event_time, {"msg": "test", "group1": "a", "group2": "b"}]] * 100)
driver.feed([[event_time, {"msg": "test", "group1": "b", "group2": "b"}]] * 50)
driver.feed([[event_time, {"msg": "test", "group1": "c"}]] * 25)
driver.feed([[event_time, {"msg": "test", "group2": "c"}]] * 10)
end

groups = driver.filtered_records.group_by { |r| r[:group1] }
groups.each { |k, g| groups[k] = g.group_by { |r| r[:group2] } }

assert_equal(5, groups["a"]["b"].size)
assert_equal(5, groups["b"]["b"].size)
assert_equal(5, groups["c"][nil].size)
assert_equal(5, groups[nil]["c"].size)
assert_equal(95, driver.instance.registry.get(:fluentd_throttle_rate_limit_exceeded).get(labels: {group1: "a", group2: "b"}))
assert_equal(45, driver.instance.registry.get(:fluentd_throttle_rate_limit_exceeded).get(labels: {group1: "b", group2: "b"}))
assert_equal(20, driver.instance.registry.get(:fluentd_throttle_rate_limit_exceeded).get(labels: {group1: "c", group2: nil}))
assert_equal(5, driver.instance.registry.get(:fluentd_throttle_rate_limit_exceeded).get(labels: {group1: nil, group2: "c"}))
end
end
end