diff --git a/.gitignore b/.gitignore index c111b33..c65f739 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,3 @@ *.gem +nclude-dependencies +Gemfile.lock \ No newline at end of file diff --git a/Gemfile.lock b/Gemfile.lock deleted file mode 100644 index bf8ef73..0000000 --- a/Gemfile.lock +++ /dev/null @@ -1,87 +0,0 @@ -PATH - remote: . - specs: - fluent-plugin-throttle (0.0.5) - fluentd (~> 1.1) - -GEM - remote: https://rubygems.org/ - specs: - addressable (2.5.2) - public_suffix (>= 2.0.2, < 4.0) - appraisal (2.2.0) - bundler - rake - thor (>= 0.14.0) - byebug (10.0.2) - coderay (1.1.2) - concurrent-ruby (1.1.6) - cool.io (1.6.0) - crack (0.4.3) - safe_yaml (~> 1.0.0) - fluentd (1.11.1) - cool.io (>= 1.4.5, < 2.0.0) - http_parser.rb (>= 0.5.1, < 0.7.0) - msgpack (>= 1.3.1, < 2.0.0) - serverengine (>= 2.0.4, < 3.0.0) - sigdump (~> 0.2.2) - strptime (>= 0.2.2, < 1.0.0) - tzinfo (>= 1.0, < 3.0) - tzinfo-data (~> 1.0) - yajl-ruby (~> 1.0) - hashdiff (0.3.7) - http_parser.rb (0.6.0) - maxitest (3.1.0) - minitest (>= 5.0.0, < 5.12.0) - metaclass (0.0.4) - method_source (0.9.0) - minitest (5.11.3) - mocha (1.6.0) - metaclass (~> 0.0.1) - msgpack (1.3.3) - power_assert (1.1.3) - pry (0.11.3) - coderay (~> 1.1.0) - method_source (~> 0.9.0) - pry-byebug (3.6.0) - byebug (~> 10.0) - pry (~> 0.10) - public_suffix (3.0.2) - rake (12.3.1) - safe_yaml (1.0.4) - serverengine (2.2.1) - sigdump (~> 0.2.2) - sigdump (0.2.4) - single_cov (1.1.0) - strptime (0.2.4) - test-unit (3.2.8) - power_assert - thor (0.20.0) - tzinfo (2.0.2) - concurrent-ruby (~> 1.0) - tzinfo-data (1.2020.1) - tzinfo (>= 1.0.0) - webmock (3.4.2) - addressable (>= 2.3.6) - crack (>= 0.3.2) - hashdiff - yajl-ruby (1.4.1) - -PLATFORMS - ruby - -DEPENDENCIES - appraisal (~> 2.2) - bundler (~> 1.16) - fluent-plugin-throttle! - maxitest - mocha - pry - pry-byebug - rake (~> 12.3) - single_cov - test-unit (~> 3.2) - webmock (~> 3.3) - -BUNDLED WITH - 1.17.2 diff --git a/README.md b/README.md index ecf2167..4a1704f 100644 --- a/README.md +++ b/README.md @@ -23,6 +23,10 @@ $ gem install fluent-plugin-throttle group_bucket_period_s 60 group_bucket_limit 6000 group_reset_rate_s 100 + + key1 value1 + key2 value2 + ``` @@ -129,6 +133,39 @@ When a group reaches its limit and as long as it is not reset, a warning message with the current log rate of the group is emitted repeatedly. This is the delay between every repetition. +#### enable\_metrics + +Default: `false`. + +When a group reaches its limit, metrics will be emitted for the logs being dropped if this value is true . This metrics can be scraped like any other metrics emitted in prometheus format. Group keys are available to identify the throttled groups as additional labels. \\ +Metrics for the filter is +- `fluentd_throttle_rate_limit_exceeded` + +## Labels + +See [Prometheus Data Model](http://prometheus.io/docs/concepts/data_model/) first. + +You can add custom labels with static values + +### labels section + +``` + + key1 value1 + key2 value2 + +``` + +All labels sections has same format. Each lines have key/value for label. + +You can use placeholder for label values. The placeholders will be expanded from reserved values and records. +If you specify `${hostname}`, it will be expanded by value of a hostname where fluentd runs. + +Reserved placeholders are: + +- `${hostname}`: hostname +- `${worker_id}`: fluent worker id + ## License Apache License, Version 2.0 diff --git a/fluent-plugin-throttle.gemspec b/fluent-plugin-throttle.gemspec index ed7d4f0..02f9ec3 100755 --- a/fluent-plugin-throttle.gemspec +++ b/fluent-plugin-throttle.gemspec @@ -4,7 +4,7 @@ $LOAD_PATH.unshift(lib) unless $LOAD_PATH.include?(lib) Gem::Specification.new do |spec| spec.name = "fluent-plugin-throttle" - spec.version = "0.0.5" + spec.version = "0.0.6" spec.authors = ["François-Xavier Bourlet"] spec.email = ["fx.bourlet@rubrik.com"] spec.summary = %q{Fluentd filter for throttling logs based on a configurable key.} @@ -16,8 +16,8 @@ Gem::Specification.new do |spec| spec.test_files = spec.files.grep(%r{^(test|spec|features)/}) spec.require_paths = ["lib"] - spec.add_development_dependency "bundler", "~> 1.16" - spec.add_development_dependency "rake", "~> 12.3" + spec.add_development_dependency "bundler", "~> 2.5" + spec.add_development_dependency "rake", "~> 13.2" spec.add_development_dependency "webmock", "~> 3.3" spec.add_development_dependency "test-unit", "~> 3.2" spec.add_development_dependency "appraisal", "~> 2.2" @@ -25,5 +25,7 @@ Gem::Specification.new do |spec| spec.add_development_dependency "maxitest" spec.add_development_dependency "single_cov" - spec.add_runtime_dependency "fluentd", "~> 1.1" + spec.add_dependency "prometheus-client", '~> 4.2' + spec.add_dependency "fluentd", "~> 1.1" + spec.add_dependency "fluent-plugin-prometheus", "~> 2.1" end diff --git a/lib/fluent/plugin/filter_throttle.rb b/lib/fluent/plugin/filter_throttle.rb index 3defe8d..c58776f 100644 --- a/lib/fluent/plugin/filter_throttle.rb +++ b/lib/fluent/plugin/filter_throttle.rb @@ -1,9 +1,13 @@ # frozen_string_literal: true require 'fluent/plugin/filter' +require 'fluent/plugin/prometheus' module Fluent::Plugin class ThrottleFilter < Filter Fluent::Plugin.register_filter('throttle', self) + include Fluent::Plugin::PrometheusLabelParser + include Fluent::Plugin::Prometheus + attr_reader :registry desc "Used to group logs. Groups are rate limited independently" config_param :group_key, :array, :default => ['kubernetes.container_name'] @@ -35,6 +39,11 @@ class ThrottleFilter < Filter DESC config_param :group_warning_delay_s, :integer, :default => 10 + desc <<~DESC + Whether to emit a metric when the rate limit is exceeded. The metric is fluentd_throttle_rate_limit_exceeded + DESC + config_param :enable_metrics, :bool, :default => false + Group = Struct.new( :rate_count, :rate_last_reset, @@ -43,10 +52,17 @@ class ThrottleFilter < Filter :bucket_last_reset, :last_warning) + def initialize + super + @registry = ::Prometheus::Client.registry + end + def configure(conf) super @group_key_paths = group_key.map { |key| key.split(".") } + # Replace invalid characters with underscores and convert to symbols for valid labels + @group_key_symbols = (group_key.map {|str| str.gsub(/[^a-zA-Z0-9_]/,'_')}).map(&:to_sym) raise "group_bucket_period_s must be > 0" \ unless @group_bucket_period_s > 0 @@ -68,12 +84,27 @@ def configure(conf) raise "group_warning_delay_s must be >= 1" \ unless @group_warning_delay_s >= 1 + + if @enable_metrics + hostname = Socket.gethostname + expander_builder = Fluent::Plugin::Prometheus.placeholder_expander(log) + expander = expander_builder.build({}) + @base_labels = parse_labels_elements(conf) + @base_labels.each do |key, value| + unless value.is_a?(String) + raise Fluent::ConfigError, "record accessor syntax is not available in metric labels for throttle plugin" + end + @base_labels[key] = expander.expand(value) + end + end end def start super @counters = {} + # TODO add more relevant metrics to throttling + @metrics = {throttle_rate_limit_exceeded: get_counter(:fluentd_throttle_rate_limit_exceeded, "The exceeded rate of pods in the group")} end def shutdown @@ -85,8 +116,8 @@ def filter(tag, time, record) now = Time.now rate_limit_exceeded = @group_drop_logs ? nil : record # return nil on rate_limit_exceeded to drop the record group = extract_group(record) - - # Ruby hashes are ordered by insertion. + + # Ruby hashes are ordered by insertion. # Deleting and inserting moves the item to the end of the hash (most recently used) counter = @counters[group] = @counters.delete(group) || Group.new(0, now, 0, 0, now, nil) @@ -151,6 +182,15 @@ def extract_group(record) end def log_rate_limit_exceeded(now, group, counter) + # Check if metrics are enabled + if @enable_metrics + # We create the new hash of label to label values for the metric + groupped_label = @group_key_symbols.zip(group).to_h + metric = @metrics[:throttle_rate_limit_exceeded] + log.debug("current rate",counter.rate_count,"current metric",metric.get(labels: @base_labels.merge(groupped_label))) + metric.increment(by: 1, labels: @base_labels.merge(groupped_label)) + end + emit = counter.last_warning == nil ? true \ : (now - counter.last_warning) >= @group_warning_delay_s if emit @@ -176,5 +216,15 @@ def log_items(now, group, counter) 'rate_limit_s': @group_rate_limit, 'reset_rate_s': @group_reset_rate_s} end + + def get_counter(name, docstring) + if @enable_metrics + if @registry.exist?(name) + @registry.get(name) + else + @registry.counter(name, docstring: docstring, labels: @base_labels.keys + @group_key_symbols) + end + end + end end end diff --git a/test/fluent/plugin/filter_throttle_test.rb b/test/fluent/plugin/filter_throttle_test.rb index 188051f..d98e601 100644 --- a/test/fluent/plugin/filter_throttle_test.rb +++ b/test/fluent/plugin/filter_throttle_test.rb @@ -1,6 +1,7 @@ # frozen_string_literal: true require_relative '../../helper' require 'fluent/plugin/filter_throttle' +require 'fluent/plugin/prometheus' SingleCov.covered! @@ -178,5 +179,33 @@ def create_driver(conf='') assert driver.logs.any? { |log| log.include?('rate back down') } end + + it 'emit metrics when enabled and rate exceeds - multiple keys' do + driver = create_driver <<~CONF + group_key "group1,group2" + group_bucket_period_s 1 + group_bucket_limit 5 + enable_metrics true + CONF + + driver.run(default_tag: "test") do + driver.feed([[event_time, {"msg": "test", "group1": "a", "group2": "b"}]] * 100) + driver.feed([[event_time, {"msg": "test", "group1": "b", "group2": "b"}]] * 50) + driver.feed([[event_time, {"msg": "test", "group1": "c"}]] * 25) + driver.feed([[event_time, {"msg": "test", "group2": "c"}]] * 10) + end + + groups = driver.filtered_records.group_by { |r| r[:group1] } + groups.each { |k, g| groups[k] = g.group_by { |r| r[:group2] } } + + assert_equal(5, groups["a"]["b"].size) + assert_equal(5, groups["b"]["b"].size) + assert_equal(5, groups["c"][nil].size) + assert_equal(5, groups[nil]["c"].size) + assert_equal(95, driver.instance.registry.get(:fluentd_throttle_rate_limit_exceeded).get(labels: {group1: "a", group2: "b"})) + assert_equal(45, driver.instance.registry.get(:fluentd_throttle_rate_limit_exceeded).get(labels: {group1: "b", group2: "b"})) + assert_equal(20, driver.instance.registry.get(:fluentd_throttle_rate_limit_exceeded).get(labels: {group1: "c", group2: nil})) + assert_equal(5, driver.instance.registry.get(:fluentd_throttle_rate_limit_exceeded).get(labels: {group1: nil, group2: "c"})) + end end end