From b1084b2cb65a727d095d3849fb25143ddac75098 Mon Sep 17 00:00:00 2001 From: Gaetan deputier Date: Tue, 21 Jan 2020 18:21:20 +0100 Subject: [PATCH 1/7] [HTTP]: Update the forwarder config --- lib/logstash/outputs/datadog_logs.rb | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/lib/logstash/outputs/datadog_logs.rb b/lib/logstash/outputs/datadog_logs.rb index 1910800..c2d42a2 100644 --- a/lib/logstash/outputs/datadog_logs.rb +++ b/lib/logstash/outputs/datadog_logs.rb @@ -16,12 +16,15 @@ class LogStash::Outputs::DatadogLogs < LogStash::Outputs::Base default :codec, "json" # Datadog configuration parameters - config :api_key, :validate => :string, :required => true - config :host, :validate => :string, :required => true, :default => 'intake.logs.datadoghq.com' - config :port, :validate => :number, :required => true, :default => 10516 - config :use_ssl, :validate => :boolean, :required => true, :default => true - config :max_backoff, :validate => :number, :required => true, :default => 30 - config :max_retries, :validate => :number, :required => true, :default => 5 + config :api_key, :validate => :string, :required => true + config :host, :validate => :string, :required => true, :default => 'http-intake.logs.datadoghq.com' + config :port, :validate => :number, :required => true, :default => 443 + config :use_ssl, :validate => :boolean, :required => true, :default => true + config :max_backoff, :validate => :number, :required => true, :default => 30 + config :max_retries, :validate => :number, :required => true, :default => 5 + config :use_http, :validate => :boolean, :required => false, :default => true + config :use_compression, :validate => :boolean, :required => false, :default => true + config :compression_level, :validate => :number, :required => false, :default => 6 public def register From 5d2f2dbe4eb4d3ff6262d6741a033c8706b667af Mon Sep 17 00:00:00 2001 From: Gaetan deputier Date: Mon, 3 Feb 2020 17:20:27 +0100 Subject: [PATCH 2/7] [Logstash]: Add HTTP client supporting compression and related tests --- CHANGELOG.md | 4 + README.md | 29 ++++- lib/logstash/outputs/datadog_logs.rb | 181 ++++++++++++++++++++++----- logstash-output-datadog_logs.gemspec | 11 +- spec/outputs/datadog_logs_spec.rb | 49 ++++++++ 5 files changed, 237 insertions(+), 37 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f62d946..c180147 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +## 0.4.0 + - Enable HTTP forwarding for logs + - Provide an option to disable SSL hostname verification for HTTPS + ## 0.3.1 - Make sure that we can disable retries diff --git a/README.md b/README.md index 689af9d..2717c89 100644 --- a/README.md +++ b/README.md @@ -12,7 +12,9 @@ logstash-plugin install logstash-output-datadog_logs ## How to use it? -The `datadog_logs` plugin is configured by default to send logs to a US endpoint over an SSL-encrypted TCP connection. +The `datadog_logs` plugin is configured by default to send logs to a US endpoint over an SSL-encrypted HTTP connection. +The logs are by default batched and compressed. + Configure the plugin with your Datadog API key: ``` @@ -23,14 +25,26 @@ output { } ``` -To send logs to the Datadog's EU endpoint, override default `host` and `port` options: +To enable TCP forwarding, configure your forwarder with: ``` output { datadog_logs { api_key => "" - host => "tcp-intake.logs.datadoghq.eu" - port => "443" + host => "tcp-intake.logs.datadoghq.com" + port => 10516 + use_http => false + } +} +``` + +To send logs to the Datadog's EU HTTP endpoint, override the default `host` + +``` +output { + datadog_logs { + api_key => "" + host => "http-intake.logs.datadoghq.eu" } } ``` @@ -44,6 +58,13 @@ output { | **port** | Proxy port when logs are not directly forwarded to Datadog | 10516 | | **use_ssl** | If true, the agent initializes a secure connection to Datadog. In clear TCP otherwise. | true | | **max_retries** | The number of retries before the output plugin stops | 5 | +| **max_backoff** | The maximum time waited between each retry in seconds | 30 | +| **use_http** | Enable HTTP forwarding | true | +| **use_compression** | Enable log compression for HTTP | true | +| **compression_level** | Set the log compression level for HTTP (1 to 9, 9 being the best ratio) | 6 | +| **no_ssl_validation** | Disable SSL validation (useful for proxy forwarding) | false | + + For additional options, see the [Datadog endpoint documentation](https://docs.datadoghq.com/logs/?tab=eusite#datadog-logs-endpoints) diff --git a/lib/logstash/outputs/datadog_logs.rb b/lib/logstash/outputs/datadog_logs.rb index c2d42a2..ce07d75 100644 --- a/lib/logstash/outputs/datadog_logs.rb +++ b/lib/logstash/outputs/datadog_logs.rb @@ -6,18 +6,23 @@ # encoding: utf-8 require "logstash/outputs/base" require "logstash/namespace" +require "zlib" + # DatadogLogs lets you send logs to Datadog # based on LogStash events. class LogStash::Outputs::DatadogLogs < LogStash::Outputs::Base + # Respect limit documented at https://docs.datadoghq.com/agent/logs/?tab=tailexistingfiles#send-logs-over-https + DD_MAX_BATCH_SIZE = 200 + config_name "datadog_logs" default :codec, "json" # Datadog configuration parameters config :api_key, :validate => :string, :required => true - config :host, :validate => :string, :required => true, :default => 'http-intake.logs.datadoghq.com' + config :host, :validate => :string, :required => true, :default => "http-intake.logs.datadoghq.com" config :port, :validate => :number, :required => true, :default => 443 config :use_ssl, :validate => :boolean, :required => true, :default => true config :max_backoff, :validate => :number, :required => true, :default => 30 @@ -25,51 +30,169 @@ class LogStash::Outputs::DatadogLogs < LogStash::Outputs::Base config :use_http, :validate => :boolean, :required => false, :default => true config :use_compression, :validate => :boolean, :required => false, :default => true config :compression_level, :validate => :number, :required => false, :default => 6 + config :no_ssl_validation, :validate => :boolean, :required => false, :default => false + # Register the plugin to logstash public def register - require "socket" - client = nil - @codec.on_event do |event, payload| - message = "#{@api_key} #{payload}\n" - retries = 0 + client ||= new_client(@logger, @api_key, @use_http, @use_ssl, @no_ssl_validation, @host, @port, @use_compression) + @codec.on_event do |_, payload| + payload = encode(payload, @use_http, @api_key) + if @use_compression and @use_http + payload = gzip_compress(payload, @compression_level) + end + client.send_retries(payload, @max_retries, @max_backoff) + end + end + + # Process a set of log events + public + def multi_receive(events) + return if events.empty? + if @use_http + batches = batch_events(events, DD_MAX_BATCH_SIZE) + batches.each do |batched_event| + @codec.encode(batched_event) + end + else + events.each do |event| + @codec.encode(event) + end + end + end + + # Encode payload for Datadog to the right format (no-op for HTTP) + def encode(payload, use_http, api_key) + if not use_http + return "#{api_key} #{payload}" + else + return payload + end + end + + # Group events in batches + def batch_events(events, max_batch_size) + batches = [] + current_batch = [] + events.each_with_index do |event, i| + if i > 0 and i % max_batch_size == 0 + batches << current_batch + current_batch = [] + end + current_batch << event + end + batches << current_batch + return batches + end + + # Compress logs with GZIP + def gzip_compress(payload, compression_level) + gz = StringIO.new + gz.set_encoding("BINARY") + z = Zlib::GzipWriter.new(gz, compression_level) + z.write(payload) + z.close + gz.string + end + + # Build a new transport client + def new_client(logger, api_key, use_http, use_ssl, no_ssl_validation, host, port, use_compression) + if use_http + return DatadogHTTPClient.new logger, use_ssl, no_ssl_validation, host, port, use_compression, api_key + else + return DatadogTCPClient.new logger, use_ssl, no_ssl_validation, host, port + end + end + + class RetryableError < StandardError; + end + + class DatadogClient + def send_retries(payload, max_retries, max_backoff) backoff = 1 + retries = 0 begin - client ||= new_client - client.write(message) - rescue => e - @logger.warn("Could not send payload", :exception => e, :backtrace => e.backtrace) - client.close rescue nil - client = nil + send(payload) + rescue RetryableError => e if retries < max_retries || max_retries < 0 + @logger.warn("Retrying ", :exception => e, :backtrace => e.backtrace) sleep backoff backoff = 2 * backoff unless backoff > max_backoff retries += 1 retry end - @logger.warn("Max number of retries reached, dropping the payload", :payload => payload, :max_retries => max_retries) end end + + def send(payload) + end end - public - def receive(event) - # handle new event - @codec.encode(event) + class DatadogHTTPClient < DatadogClient + require "manticore" + + def initialize(logger, use_ssl, no_ssl_validation, host, port, use_compression, api_key) + @logger = logger + protocol = use_ssl ? "https" : "http" + @url = "#{protocol}://#{host}:#{port.to_s}/v1/input/#{api_key}" + @headers = {"Content-Type" => "application/json"} + if use_compression + @headers["Content-Encoding"] = "gzip" + end + logger.info("Starting HTTP connection to #{protocol}://#{@host}:#{port.to_s}") + config = {} + config[:ssl][:verify] = :disable if no_ssl_validation + @client = Manticore::Client.new(config) + end + + def send(payload) + response = @client.post(@url, :body => payload, :headers => @headers).call + if response.code >= 500 + raise RetryableError.new "Unable to send payload: #{response.code} #{response.body}" + end + if response.code >= 400 + @logger.error("Unable to send payload due to client error: #{response.code} #{response.body}") + end + end end - private - def new_client - # open a secure connection with Datadog - if @use_ssl - @logger.info("Starting SSL connection", :host => @host, :port => @port) - socket = TCPSocket.new @host, @port - sslSocket = OpenSSL::SSL::SSLSocket.new socket - sslSocket.connect - return sslSocket - else - @logger.info("Starting plaintext connection", :host => @host, :port => @port) - return TCPSocket.new @host, @port + class DatadogTCPClient < DatadogClient + require "socket" + + def initialize(logger, use_ssl, no_ssl_validation, host, port) + @logger = logger + @use_ssl = use_ssl + @no_ssl_validation = no_ssl_validation + @host = host + @port = port + end + + def connect + if @use_ssl + @logger.info("Starting SSL connection #{@host} #{@port}") + socket = TCPSocket.new @host, @port + sslContext = OpenSSL::SSL::SSLContext.new + if @no_ssl_validation + sslContext.set_params({:verify_mode => OpenSSL::SSL::VERIFY_NONE}) + end + sslSocket = OpenSSL::SSL::SSLSocket.new socket, sslContext + sslSocket.connect + return sslSocket + else + @logger.info("Starting plaintext connection #{@host} #{@port}") + return TCPSocket.new @host, @port + end + end + + def send(payload) + begin + @socket ||= connect + @socket.puts(payload) + rescue => e + @socket.close rescue nil + @socket = nil + raise RetryableError.new "Unable to send payload: #{e.message}." + end end end diff --git a/logstash-output-datadog_logs.gemspec b/logstash-output-datadog_logs.gemspec index dfbfeb1..bcc1754 100644 --- a/logstash-output-datadog_logs.gemspec +++ b/logstash-output-datadog_logs.gemspec @@ -1,6 +1,6 @@ Gem::Specification.new do |s| s.name = 'logstash-output-datadog_logs' - s.version = '0.3.1' + s.version = '0.4.0' s.licenses = ['Apache-2.0'] s.summary = 'DatadogLogs lets you send logs to Datadog based on LogStash events.' s.homepage = 'https://www.datadoghq.com/' @@ -9,14 +9,17 @@ Gem::Specification.new do |s| s.require_paths = ['lib'] # Files - s.files = Dir['lib/**/*','spec/**/*','vendor/**/*','*.gemspec','*.md','CONTRIBUTORS','Gemfile','LICENSE','NOTICE.TXT'] - # Tests + s.files = Dir['lib/**/*', 'spec/**/*', 'vendor/**/*', '*.gemspec', '*.md', 'CONTRIBUTORS', 'Gemfile', 'LICENSE', 'NOTICE.TXT'] + # Tests s.test_files = s.files.grep(%r{^(test|spec|features)/}) # Special flag to let us know this is actually a logstash plugin - s.metadata = { "logstash_plugin" => "true", "logstash_group" => "output" } + s.metadata = {"logstash_plugin" => "true", "logstash_group" => "output"} # Gem dependencies s.add_runtime_dependency "logstash-core-plugin-api", "~> 2.0" + s.add_runtime_dependency 'manticore', '>= 0.5.2', '< 1.0.0' + s.add_runtime_dependency 'logstash-codec-json' + s.add_development_dependency 'logstash-devutils' end diff --git a/spec/outputs/datadog_logs_spec.rb b/spec/outputs/datadog_logs_spec.rb index f8954ef..a69f6b2 100644 --- a/spec/outputs/datadog_logs_spec.rb +++ b/spec/outputs/datadog_logs_spec.rb @@ -4,3 +4,52 @@ # Copyright 2017 Datadog, Inc. require "logstash/devutils/rspec/spec_helper" +require "logstash/outputs/datadog_logs" + +describe LogStash::Outputs::DatadogLogs do + context "should register" do + it "with an api key" do + plugin = LogStash::Plugin.lookup("output", "datadog_logs").new({"api_key" => "xxx"}) + expect { plugin.register }.to_not raise_error + end + + it "without an api key" do + expect { LogStash::Plugin.lookup("output", "datadog_logs").new() }.to raise_error(LogStash::ConfigurationError) + end + end + + subject do + plugin = LogStash::Plugin.lookup("output", "datadog_logs").new({"api_key" => "xxx"}) + plugin.register + plugin + end + + context "when using HTTP" do + it "should create one batch of one event" do + input_events = [LogStash::Event.new({"message" => "dd"})] + expect(subject.batch_events(input_events, 1).length).to eq(1) + end + + it "should create two batches of one event each" do + input_events = [LogStash::Event.new({"message" => "dd1"}), LogStash::Event.new({"message" => "dd2"})] + actual_events = subject.batch_events(input_events, 1) + expect(actual_events.length).to eq(2) + expect(actual_events[0][0].get("message")).to eq("dd1") + expect(actual_events[1][0].get("message")).to eq("dd2") + end + + it "should not re-encode events" do + input_event = "{message=dd}" + encoded_event = subject.encode(input_event, true, "xxx") + expect(encoded_event).to eq(input_event) + end + end + + context "when using TCP" do + it "should re-encode events" do + input_event = "{message=dd}" + encoded_event = subject.encode(input_event, false, "xxx") + expect(encoded_event).to eq("xxx " + input_event) + end + end +end \ No newline at end of file From f1400306739b86119168f334f28a20fdca8e7f06 Mon Sep 17 00:00:00 2001 From: Gaetan deputier Date: Tue, 4 Feb 2020 18:09:29 +0100 Subject: [PATCH 3/7] [Logstash]: Support max batch size in bytes, log if compression is enabled and clean-up --- lib/logstash/outputs/datadog_logs.rb | 43 ++++++++++++++++------------ 1 file changed, 25 insertions(+), 18 deletions(-) diff --git a/lib/logstash/outputs/datadog_logs.rb b/lib/logstash/outputs/datadog_logs.rb index ce07d75..7af4fee 100644 --- a/lib/logstash/outputs/datadog_logs.rb +++ b/lib/logstash/outputs/datadog_logs.rb @@ -14,7 +14,8 @@ class LogStash::Outputs::DatadogLogs < LogStash::Outputs::Base # Respect limit documented at https://docs.datadoghq.com/agent/logs/?tab=tailexistingfiles#send-logs-over-https - DD_MAX_BATCH_SIZE = 200 + DD_MAX_BATCH_LENGTH = 200 + DD_MAX_BATCH_SIZE = 1000000 config_name "datadog_logs" @@ -50,7 +51,7 @@ def register def multi_receive(events) return if events.empty? if @use_http - batches = batch_events(events, DD_MAX_BATCH_SIZE) + batches = batch_events(events, DD_MAX_BATCH_LENGTH, DD_MAX_BATCH_SIZE) batches.each do |batched_event| @codec.encode(batched_event) end @@ -64,25 +65,28 @@ def multi_receive(events) # Encode payload for Datadog to the right format (no-op for HTTP) def encode(payload, use_http, api_key) if not use_http - return "#{api_key} #{payload}" + "#{api_key} #{payload}" else - return payload + payload end end # Group events in batches - def batch_events(events, max_batch_size) + def batch_events(events, max_batch_length, max_request_size) batches = [] current_batch = [] + current_batch_size = 0 events.each_with_index do |event, i| - if i > 0 and i % max_batch_size == 0 + if (i > 0 and i % max_batch_length == 0) or (current_batch_size > max_request_size) batches << current_batch current_batch = [] + current_batch_size = 0 end + current_batch_size += event.get('message').bytesize current_batch << event end batches << current_batch - return batches + batches end # Compress logs with GZIP @@ -90,17 +94,20 @@ def gzip_compress(payload, compression_level) gz = StringIO.new gz.set_encoding("BINARY") z = Zlib::GzipWriter.new(gz, compression_level) - z.write(payload) - z.close + begin + z.write(payload) + ensure + z.close + end gz.string end # Build a new transport client def new_client(logger, api_key, use_http, use_ssl, no_ssl_validation, host, port, use_compression) if use_http - return DatadogHTTPClient.new logger, use_ssl, no_ssl_validation, host, port, use_compression, api_key + DatadogHTTPClient.new logger, use_ssl, no_ssl_validation, host, port, use_compression, api_key else - return DatadogTCPClient.new logger, use_ssl, no_ssl_validation, host, port + DatadogTCPClient.new logger, use_ssl, no_ssl_validation, host, port end end @@ -139,7 +146,7 @@ def initialize(logger, use_ssl, no_ssl_validation, host, port, use_compression, if use_compression @headers["Content-Encoding"] = "gzip" end - logger.info("Starting HTTP connection to #{protocol}://#{@host}:#{port.to_s}") + logger.info("Starting HTTP connection to #{protocol}://#{host}:#{port.to_s} with compression " + (use_compression ? "enabled" : "disabled")) config = {} config[:ssl][:verify] = :disable if no_ssl_validation @client = Manticore::Client.new(config) @@ -171,16 +178,16 @@ def connect if @use_ssl @logger.info("Starting SSL connection #{@host} #{@port}") socket = TCPSocket.new @host, @port - sslContext = OpenSSL::SSL::SSLContext.new + ssl_context = OpenSSL::SSL::SSLContext.new if @no_ssl_validation - sslContext.set_params({:verify_mode => OpenSSL::SSL::VERIFY_NONE}) + ssl_context.set_params({:verify_mode => OpenSSL::SSL::VERIFY_NONE}) end - sslSocket = OpenSSL::SSL::SSLSocket.new socket, sslContext - sslSocket.connect - return sslSocket + ssl_context = OpenSSL::SSL::SSLSocket.new socket, sslContext + ssl_context.connect + ssl_context else @logger.info("Starting plaintext connection #{@host} #{@port}") - return TCPSocket.new @host, @port + TCPSocket.new @host, @port end end From 13d0aa441d9cedee20d29b7d2865c5d7580eb286 Mon Sep 17 00:00:00 2001 From: Gaetan deputier Date: Wed, 5 Feb 2020 18:06:39 +0100 Subject: [PATCH 4/7] [Logstash]: Update plugin to manage batch per request size, clean-up --- lib/logstash/outputs/datadog_logs.rb | 83 +++++++++++++++++++--------- spec/outputs/datadog_logs_spec.rb | 75 ++++++++++++++++++++----- 2 files changed, 119 insertions(+), 39 deletions(-) diff --git a/lib/logstash/outputs/datadog_logs.rb b/lib/logstash/outputs/datadog_logs.rb index 7af4fee..39cc987 100644 --- a/lib/logstash/outputs/datadog_logs.rb +++ b/lib/logstash/outputs/datadog_logs.rb @@ -16,6 +16,7 @@ class LogStash::Outputs::DatadogLogs < LogStash::Outputs::Base # Respect limit documented at https://docs.datadoghq.com/agent/logs/?tab=tailexistingfiles#send-logs-over-https DD_MAX_BATCH_LENGTH = 200 DD_MAX_BATCH_SIZE = 1000000 + DD_TRUNCATION_SUFFIX = "...TRUNCATED..." config_name "datadog_logs" @@ -36,59 +37,89 @@ class LogStash::Outputs::DatadogLogs < LogStash::Outputs::Base # Register the plugin to logstash public def register - client ||= new_client(@logger, @api_key, @use_http, @use_ssl, @no_ssl_validation, @host, @port, @use_compression) - @codec.on_event do |_, payload| - payload = encode(payload, @use_http, @api_key) - if @use_compression and @use_http - payload = gzip_compress(payload, @compression_level) - end - client.send_retries(payload, @max_retries, @max_backoff) - end + @client = new_client(@logger, @api_key, @use_http, @use_ssl, @no_ssl_validation, @host, @port, @use_compression) end - # Process a set of log events + # Entry point of the plugin, receiving a set of Logstash events public def multi_receive(events) return if events.empty? + encoded_events = @codec.multi_encode(events) if @use_http - batches = batch_events(events, DD_MAX_BATCH_LENGTH, DD_MAX_BATCH_SIZE) + batches = batch_http_events(encoded_events, DD_MAX_BATCH_LENGTH, DD_MAX_BATCH_SIZE) batches.each do |batched_event| - @codec.encode(batched_event) + process_encoded_payload(format_http_event_batch(batched_event)) end else - events.each do |event| - @codec.encode(event) + encoded_events.each do |encoded_event| + process_encoded_payload(format_tcp_event(encoded_event.last, @api_key, DD_MAX_BATCH_SIZE)) end end end - # Encode payload for Datadog to the right format (no-op for HTTP) - def encode(payload, use_http, api_key) - if not use_http - "#{api_key} #{payload}" - else - payload + # Process and send each encoded payload + def process_encoded_payload(payload) + if @use_compression and @use_http + payload = gzip_compress(payload, @compression_level) + end + @client.send_retries(payload, @max_retries, @max_backoff) + end + + # Format TCP event + def format_tcp_event(payload, api_key, max_request_size) + formatted_payload = "#{api_key} #{payload}" + if (formatted_payload.bytesize > max_request_size) + return truncate(formatted_payload, max_request_size) end + formatted_payload + end + + # Format HTTP events + def format_http_event_batch(batched_events) + "[#{batched_events.join(',')}]" end - # Group events in batches - def batch_events(events, max_batch_length, max_request_size) + # Group HTTP events in batches + def batch_http_events(encoded_events, max_batch_length, max_request_size) batches = [] current_batch = [] current_batch_size = 0 - events.each_with_index do |event, i| - if (i > 0 and i % max_batch_length == 0) or (current_batch_size > max_request_size) + encoded_events.each_with_index do |event, i| + encoded_event = event.last + current_event_size = encoded_event.bytesize + # If this unique log size is bigger than the request size, truncate it + if current_event_size > max_request_size + encoded_event = truncate(encoded_event, max_request_size) + current_event_size = encoded_event.bytesize + end + + if (i > 0 and i % max_batch_length == 0) or (current_batch_size + current_event_size > max_request_size) batches << current_batch current_batch = [] current_batch_size = 0 end - current_batch_size += event.get('message').bytesize - current_batch << event + + current_batch_size += encoded_event.bytesize + current_batch << encoded_event end batches << current_batch batches end + # Truncate events over the provided max length, appending a marker when truncated + def truncate(event, max_length) + if event.length > max_length + event = event[0..max_length - 1] + event[max(0, max_length - DD_TRUNCATION_SUFFIX.length)..max_length-1] = DD_TRUNCATION_SUFFIX + return event + end + event + end + + def max(a, b) + a > b ? a : b + end + # Compress logs with GZIP def gzip_compress(payload, compression_level) gz = StringIO.new @@ -182,7 +213,7 @@ def connect if @no_ssl_validation ssl_context.set_params({:verify_mode => OpenSSL::SSL::VERIFY_NONE}) end - ssl_context = OpenSSL::SSL::SSLSocket.new socket, sslContext + ssl_context = OpenSSL::SSL::SSLSocket.new socket, ssl_context ssl_context.connect ssl_context else diff --git a/spec/outputs/datadog_logs_spec.rb b/spec/outputs/datadog_logs_spec.rb index a69f6b2..ab52ccd 100644 --- a/spec/outputs/datadog_logs_spec.rb +++ b/spec/outputs/datadog_logs_spec.rb @@ -24,32 +24,81 @@ plugin end + context "when truncating" do + it "should truncate messages of the given length" do + input = "foobarfoobarfoobarfoobar" + expect(subject.truncate(input, 15).length).to eq(15) + end + + it "should replace the end of the message with a marker when truncated" do + input = "foobarfoobarfoobarfoobar" + expect(subject.truncate(input, 15)).to end_with("...TRUNCATED...") + end + + it "should return the marker if the message length is smaller than the marker length" do + input = "foobar" + expect(subject.truncate(input, 1)).to eq("...TRUNCATED...") + end + + it "should do nothing if the input length is smaller than the given length" do + input = "foobar" + expect(subject.truncate(input, 15)).to eq("foobar") + end + end + context "when using HTTP" do - it "should create one batch of one event" do - input_events = [LogStash::Event.new({"message" => "dd"})] - expect(subject.batch_events(input_events, 1).length).to eq(1) + it "should respect the batch length and create one batch of one event" do + input_events = [[LogStash::Event.new({"message" => "dd"}), "dd"]] + expect(subject.batch_http_events(input_events, 1, 1000).length).to eq(1) end - it "should create two batches of one event each" do - input_events = [LogStash::Event.new({"message" => "dd1"}), LogStash::Event.new({"message" => "dd2"})] - actual_events = subject.batch_events(input_events, 1) + it "should respect the batch length and create two batches of one event" do + input_events = [[LogStash::Event.new({"message" => "dd1"}), "dd1"], [LogStash::Event.new({"message" => "dd2"}), "dd2"]] + actual_events = subject.batch_http_events(input_events, 1, 1000) expect(actual_events.length).to eq(2) - expect(actual_events[0][0].get("message")).to eq("dd1") - expect(actual_events[1][0].get("message")).to eq("dd2") + expect(actual_events[0][0]).to eq("dd1") + expect(actual_events[0][1]).to eq("dd2") end - it "should not re-encode events" do - input_event = "{message=dd}" - encoded_event = subject.encode(input_event, true, "xxx") - expect(encoded_event).to eq(input_event) + it "should respect the request size and create two batches of one event" do + input_events = [[LogStash::Event.new({"message" => "dd1"}), "dd1"], [LogStash::Event.new({"message" => "dd2"}), "dd2"]] + actual_events = subject.batch_http_events(input_events, 10, 3) + expect(actual_events.length).to eq(2) + expect(actual_events[0][0]).to eq("dd1") + expect(actual_events[1][0]).to eq("dd2") + end + + it "should respect the request size and create two batches of two events" do + input_events = [[LogStash::Event.new({"message" => "dd1"}), "dd1"], [LogStash::Event.new({"message" => "dd2"}), "dd2"], [LogStash::Event.new({"message" => "dd3"}), "dd3"], [LogStash::Event.new({"message" => "dd4"}), "dd4"]] + actual_events = subject.batch_http_events(input_events, 6, 6) + expect(actual_events.length).to eq(2) + expect(actual_events[0][0]).to eq("dd1") + expect(actual_events[0][1]).to eq("dd2") + expect(actual_events[1][0]).to eq("dd3") + expect(actual_events[1][1]).to eq("dd4") + end + + it "should truncate events whose length is bigger than the max request size" do + input_events = [[LogStash::Event.new({"message" => "dd1"}), "dd1"], [LogStash::Event.new({"message" => "foobarfoobarfoobar"}),"foobarfoobarfoobar"], [LogStash::Event.new({"message" => "dd2"}), "dd2"]] + actual_events = subject.batch_http_events(input_events, 10, 3) + expect(actual_events.length).to eq(3) + expect(actual_events[0][0]).to eq("dd1") + expect(actual_events[1][0]).to eq("...TRUNCATED...") + expect(actual_events[2][0]).to eq("dd2") end end context "when using TCP" do it "should re-encode events" do input_event = "{message=dd}" - encoded_event = subject.encode(input_event, false, "xxx") + encoded_event = subject.format_tcp_event(input_event, "xxx", 1000) expect(encoded_event).to eq("xxx " + input_event) end + + it "should truncate too long messages" do + input_event = "{message=foobarfoobarfoobar}" + encoded_event = subject.format_tcp_event(input_event, "xxx", 20) + expect(encoded_event).to eq("xxx {...TRUNCATED...") + end end end \ No newline at end of file From 4513bc1571f9ae060d4f01556a93838a1f7eaf4a Mon Sep 17 00:00:00 2001 From: Gaetan deputier Date: Thu, 6 Feb 2020 15:25:40 +0100 Subject: [PATCH 5/7] [Logstash]: Mention zlib as a requirement --- README.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/README.md b/README.md index 2717c89..fdce2b4 100644 --- a/README.md +++ b/README.md @@ -3,6 +3,10 @@ DatadogLogs lets you send logs to Datadog based on LogStash events. +## Requirements + +The plugin relies upon the `zlib` library for compressing data. + ## How to install it? ```bash From c56705a3675d0b7bd339739353c3a7fa7d181348 Mon Sep 17 00:00:00 2001 From: Gaetan deputier Date: Tue, 18 Feb 2020 17:54:25 +0100 Subject: [PATCH 6/7] [Logstash]: Update forwarder limits --- lib/logstash/outputs/datadog_logs.rb | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/logstash/outputs/datadog_logs.rb b/lib/logstash/outputs/datadog_logs.rb index 39cc987..61e2195 100644 --- a/lib/logstash/outputs/datadog_logs.rb +++ b/lib/logstash/outputs/datadog_logs.rb @@ -13,9 +13,9 @@ # based on LogStash events. class LogStash::Outputs::DatadogLogs < LogStash::Outputs::Base - # Respect limit documented at https://docs.datadoghq.com/agent/logs/?tab=tailexistingfiles#send-logs-over-https - DD_MAX_BATCH_LENGTH = 200 - DD_MAX_BATCH_SIZE = 1000000 + # Respect limit documented at https://docs.datadoghq.com/api/?lang=bash#logs + DD_MAX_BATCH_LENGTH = 500 + DD_MAX_BATCH_SIZE = 5000000 DD_TRUNCATION_SUFFIX = "...TRUNCATED..." config_name "datadog_logs" From e5f769048e4d793a3aedb299ab156570c2185dcc Mon Sep 17 00:00:00 2001 From: Gaetan deputier Date: Fri, 21 Feb 2020 12:48:51 +0100 Subject: [PATCH 7/7] [Logstash]: Add shutdown hook, fix test, update doc --- README.md | 6 +++--- lib/logstash/outputs/datadog_logs.rb | 20 +++++++++++++++++++- spec/outputs/datadog_logs_spec.rb | 2 +- 3 files changed, 23 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index fdce2b4..4811807 100644 --- a/README.md +++ b/README.md @@ -59,11 +59,11 @@ output { |-------------|--------------------------------------------------------------------------|----------------| | **api_key** | The API key of your Datadog platform | nil | | **host** | Proxy endpoint when logs are not directly forwarded to Datadog | intake.logs.datadoghq.com | -| **port** | Proxy port when logs are not directly forwarded to Datadog | 10516 | -| **use_ssl** | If true, the agent initializes a secure connection to Datadog. In clear TCP otherwise. | true | +| **port** | Proxy port when logs are not directly forwarded to Datadog | 443 | +| **use_ssl** | If true, the agent initializes a secure connection to Datadog. Ensure to update the port if you disable it. | true | | **max_retries** | The number of retries before the output plugin stops | 5 | | **max_backoff** | The maximum time waited between each retry in seconds | 30 | -| **use_http** | Enable HTTP forwarding | true | +| **use_http** | Enable HTTP forwarding. If you disable it, make sure to update the port to 10516 if use_ssl is enabled or 10514 otherwise. | true | | **use_compression** | Enable log compression for HTTP | true | | **compression_level** | Set the log compression level for HTTP (1 to 9, 9 being the best ratio) | 6 | | **no_ssl_validation** | Disable SSL validation (useful for proxy forwarding) | false | diff --git a/lib/logstash/outputs/datadog_logs.rb b/lib/logstash/outputs/datadog_logs.rb index 61e2195..e5edbcd 100644 --- a/lib/logstash/outputs/datadog_logs.rb +++ b/lib/logstash/outputs/datadog_logs.rb @@ -40,6 +40,11 @@ def register @client = new_client(@logger, @api_key, @use_http, @use_ssl, @no_ssl_validation, @host, @port, @use_compression) end + # Logstash shutdown hook + def close + @client.close + end + # Entry point of the plugin, receiving a set of Logstash events public def multi_receive(events) @@ -110,7 +115,7 @@ def batch_http_events(encoded_events, max_batch_length, max_request_size) def truncate(event, max_length) if event.length > max_length event = event[0..max_length - 1] - event[max(0, max_length - DD_TRUNCATION_SUFFIX.length)..max_length-1] = DD_TRUNCATION_SUFFIX + event[max(0, max_length - DD_TRUNCATION_SUFFIX.length)..max_length - 1] = DD_TRUNCATION_SUFFIX return event end event @@ -163,6 +168,11 @@ def send_retries(payload, max_retries, max_backoff) end def send(payload) + raise NotImplementedError, "Datadog transport client should implement the send method" + end + + def close + raise NotImplementedError, "Datadog transport client should implement the close method" end end @@ -192,6 +202,10 @@ def send(payload) @logger.error("Unable to send payload due to client error: #{response.code} #{response.body}") end end + + def close + @client.close + end end class DatadogTCPClient < DatadogClient @@ -232,6 +246,10 @@ def send(payload) raise RetryableError.new "Unable to send payload: #{e.message}." end end + + def close + @socket.close rescue nil + end end end diff --git a/spec/outputs/datadog_logs_spec.rb b/spec/outputs/datadog_logs_spec.rb index ab52ccd..97f30d2 100644 --- a/spec/outputs/datadog_logs_spec.rb +++ b/spec/outputs/datadog_logs_spec.rb @@ -57,7 +57,7 @@ actual_events = subject.batch_http_events(input_events, 1, 1000) expect(actual_events.length).to eq(2) expect(actual_events[0][0]).to eq("dd1") - expect(actual_events[0][1]).to eq("dd2") + expect(actual_events[1][0]).to eq("dd2") end it "should respect the request size and create two batches of one event" do