diff --git a/CHANGELOG.md b/CHANGELOG.md index 2d38606..14bc157 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,6 @@ +## 0.5.2 + - Now checks if logstash is being shutdown + ## 0.5.1 - Support using HTTP proxies, adding the `http_proxy` parameter. diff --git a/lib/logstash/outputs/datadog_logs.rb b/lib/logstash/outputs/datadog_logs.rb index 8e5b203..db42640 100644 --- a/lib/logstash/outputs/datadog_logs.rb +++ b/lib/logstash/outputs/datadog_logs.rb @@ -65,7 +65,11 @@ def multi_receive(events) end end rescue => e - @logger.error("Uncaught processing exception in datadog forwarder #{e.message}") + if e.is_a?(InterruptedError) + raise e + else + @logger.error("Uncaught processing exception in datadog forwarder #{e.message}") + end end end @@ -148,7 +152,7 @@ def gzip_compress(payload, compression_level) # Build a new transport client def new_client(logger, api_key, use_http, use_ssl, no_ssl_validation, host, port, use_compression, force_v1_routes, http_proxy) if use_http - DatadogHTTPClient.new logger, use_ssl, no_ssl_validation, host, port, use_compression, api_key, force_v1_routes, http_proxy + DatadogHTTPClient.new logger, use_ssl, no_ssl_validation, host, port, use_compression, api_key, force_v1_routes, http_proxy, -> { defined?(pipeline_shutdown_requested?) ? pipeline_shutdown_requested? : false } else DatadogTCPClient.new logger, use_ssl, no_ssl_validation, host, port end @@ -157,6 +161,9 @@ def new_client(logger, api_key, use_http, use_ssl, no_ssl_validation, host, port class RetryableError < StandardError; end + class InterruptedError < StandardError; + end + class DatadogClient def send_retries(payload, max_retries, max_backoff) backoff = 1 @@ -166,17 +173,36 @@ def send_retries(payload, max_retries, max_backoff) rescue RetryableError => e if retries < max_retries || max_retries < 0 @logger.warn("Retrying send due to: #{e.message}") - sleep backoff + interruptableSleep(backoff) backoff = 2 * backoff unless backoff > max_backoff retries += 1 retry end - @logger.error("Max number of retries reached, dropping message. Last exception: #{ex.message}") + @logger.error("Max number of retries reached, dropping message. Last exception: #{e.message}") rescue => ex - @logger.error("Unmanaged exception while sending log to datadog #{ex.message}") + if ex.is_a?(InterruptedError) + raise ex + else + @logger.error("Unmanaged exception while sending log to datadog #{ex.message}") + end + end + end + + def interruptableSleep(duration) + amountSlept = 0 + while amountSlept < duration + sleep 1 + amountSlept += 1 + if interrupted? + raise InterruptedError.new "Interrupted while backing off" + end end end + def interrupted? + false + end + def send(payload) raise NotImplementedError, "Datadog transport client should implement the send method" end @@ -196,7 +222,8 @@ class DatadogHTTPClient < DatadogClient ::Manticore::ResolutionFailure ] - def initialize(logger, use_ssl, no_ssl_validation, host, port, use_compression, api_key, force_v1_routes, http_proxy) + def initialize(logger, use_ssl, no_ssl_validation, host, port, use_compression, api_key, force_v1_routes, http_proxy, interruptedLambda = nil) + @interruptedLambda = interruptedLambda @logger = logger protocol = use_ssl ? "https" : "http" @@ -224,6 +251,14 @@ def initialize(logger, use_ssl, no_ssl_validation, host, port, use_compression, @client = Manticore::Client.new(config) end + def interrupted? + if @interruptedLambda + return @interruptedLambda.call + end + + false + end + def send(payload) begin response = @client.post(@url, :body => payload, :headers => @headers).call diff --git a/lib/logstash/outputs/version.rb b/lib/logstash/outputs/version.rb index 392ce67..dd6de3b 100644 --- a/lib/logstash/outputs/version.rb +++ b/lib/logstash/outputs/version.rb @@ -1,5 +1,5 @@ # frozen_string_literal: true module DatadogLogStashPlugin - VERSION = '0.5.1' -end \ No newline at end of file + VERSION = '0.5.2' +end