From 482c22eb117004841db441f6d96fea04c2b63ee6 Mon Sep 17 00:00:00 2001 From: Wakee Ho Date: Fri, 8 Dec 2023 15:29:38 -0500 Subject: [PATCH 1/5] DatadogClient class now supports interruption in send_retries method --- lib/logstash/outputs/datadog_logs.rb | 26 ++++++++++++++++++++++++-- 1 file changed, 24 insertions(+), 2 deletions(-) diff --git a/lib/logstash/outputs/datadog_logs.rb b/lib/logstash/outputs/datadog_logs.rb index 8e5b203..a0a0d10 100644 --- a/lib/logstash/outputs/datadog_logs.rb +++ b/lib/logstash/outputs/datadog_logs.rb @@ -157,6 +157,9 @@ def new_client(logger, api_key, use_http, use_ssl, no_ssl_validation, host, port class RetryableError < StandardError; end + class InterruptedError < StandardError; + end + class DatadogClient def send_retries(payload, max_retries, max_backoff) backoff = 1 @@ -166,17 +169,36 @@ def send_retries(payload, max_retries, max_backoff) rescue RetryableError => e if retries < max_retries || max_retries < 0 @logger.warn("Retrying send due to: #{e.message}") - sleep backoff + interruptableSleep(backoff) backoff = 2 * backoff unless backoff > max_backoff retries += 1 retry end @logger.error("Max number of retries reached, dropping message. Last exception: #{ex.message}") rescue => ex - @logger.error("Unmanaged exception while sending log to datadog #{ex.message}") + if ex.is_a?(InterruptedError) + raise ex + else + @logger.error("Unmanaged exception while sending log to datadog #{ex.message}") + end + end + end + + def interruptableSleep(duration) + amountSlept = 0 + while amountSlept < duration + sleep 1 + amountSlept += 1 + if interrupted? + raise InterruptedError.new "Interrupted while backing off" + end end end + def interrupted? + false + end + def send(payload) raise NotImplementedError, "Datadog transport client should implement the send method" end From e1e9feb3a6e51ed0afa9f6141c9629838b503806 Mon Sep 17 00:00:00 2001 From: Wakee Ho Date: Fri, 8 Dec 2023 15:31:51 -0500 Subject: [PATCH 2/5] DatadogHTTPClient class now overrides interrupted? method --- lib/logstash/outputs/datadog_logs.rb | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/lib/logstash/outputs/datadog_logs.rb b/lib/logstash/outputs/datadog_logs.rb index a0a0d10..e8b0ac4 100644 --- a/lib/logstash/outputs/datadog_logs.rb +++ b/lib/logstash/outputs/datadog_logs.rb @@ -218,7 +218,8 @@ class DatadogHTTPClient < DatadogClient ::Manticore::ResolutionFailure ] - def initialize(logger, use_ssl, no_ssl_validation, host, port, use_compression, api_key, force_v1_routes, http_proxy) + def initialize(logger, use_ssl, no_ssl_validation, host, port, use_compression, api_key, force_v1_routes, http_proxy, interruptedLambda = nil) + @interruptedLambda = interruptedLambda @logger = logger protocol = use_ssl ? "https" : "http" @@ -246,6 +247,14 @@ def initialize(logger, use_ssl, no_ssl_validation, host, port, use_compression, @client = Manticore::Client.new(config) end + def interrupted? + if @interruptedLambda + return @interruptedLambda.call + end + + false + end + def send(payload) begin response = @client.post(@url, :body => payload, :headers => @headers).call From 8e292a76ceca93401b64224cd9e88aca0b3a467c Mon Sep 17 00:00:00 2001 From: Wakee Ho Date: Fri, 8 Dec 2023 15:33:42 -0500 Subject: [PATCH 3/5] DatadogLogs class now checks if logstash is being shutdown --- lib/logstash/outputs/datadog_logs.rb | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/lib/logstash/outputs/datadog_logs.rb b/lib/logstash/outputs/datadog_logs.rb index e8b0ac4..fbfac16 100644 --- a/lib/logstash/outputs/datadog_logs.rb +++ b/lib/logstash/outputs/datadog_logs.rb @@ -65,7 +65,11 @@ def multi_receive(events) end end rescue => e - @logger.error("Uncaught processing exception in datadog forwarder #{e.message}") + if e.is_a?(InterruptedError) + raise e + else + @logger.error("Uncaught processing exception in datadog forwarder #{e.message}") + end end end @@ -148,7 +152,7 @@ def gzip_compress(payload, compression_level) # Build a new transport client def new_client(logger, api_key, use_http, use_ssl, no_ssl_validation, host, port, use_compression, force_v1_routes, http_proxy) if use_http - DatadogHTTPClient.new logger, use_ssl, no_ssl_validation, host, port, use_compression, api_key, force_v1_routes, http_proxy + DatadogHTTPClient.new logger, use_ssl, no_ssl_validation, host, port, use_compression, api_key, force_v1_routes, http_proxy, -> { defined?(pipeline_shutdown_requested?) ? pipeline_shutdown_requested? : false } else DatadogTCPClient.new logger, use_ssl, no_ssl_validation, host, port end From df9042b6d34c6c061a96ba83bd17db7382b0a2ac Mon Sep 17 00:00:00 2001 From: Wakee Ho Date: Fri, 8 Dec 2023 15:38:41 -0500 Subject: [PATCH 4/5] Bumping version and adding list of changes to CHANGELOG.md --- CHANGELOG.md | 3 +++ lib/logstash/outputs/version.rb | 4 ++-- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2d38606..14bc157 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,6 @@ +## 0.5.2 + - Now checks if logstash is being shutdown + ## 0.5.1 - Support using HTTP proxies, adding the `http_proxy` parameter. diff --git a/lib/logstash/outputs/version.rb b/lib/logstash/outputs/version.rb index 392ce67..dd6de3b 100644 --- a/lib/logstash/outputs/version.rb +++ b/lib/logstash/outputs/version.rb @@ -1,5 +1,5 @@ # frozen_string_literal: true module DatadogLogStashPlugin - VERSION = '0.5.1' -end \ No newline at end of file + VERSION = '0.5.2' +end From b4d2759fac4d374a3a7451716cc015b52e4c539c Mon Sep 17 00:00:00 2001 From: Wakee Ho Date: Tue, 4 Jun 2024 17:21:26 -0400 Subject: [PATCH 5/5] Fix @logger.error using wrong variable name --- lib/logstash/outputs/datadog_logs.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/logstash/outputs/datadog_logs.rb b/lib/logstash/outputs/datadog_logs.rb index fbfac16..db42640 100644 --- a/lib/logstash/outputs/datadog_logs.rb +++ b/lib/logstash/outputs/datadog_logs.rb @@ -178,7 +178,7 @@ def send_retries(payload, max_retries, max_backoff) retries += 1 retry end - @logger.error("Max number of retries reached, dropping message. Last exception: #{ex.message}") + @logger.error("Max number of retries reached, dropping message. Last exception: #{e.message}") rescue => ex if ex.is_a?(InterruptedError) raise ex