Skip to content

Commit

Permalink
Merge pull request #19 from DataDog/gaetan.deputier/logstash-http-client
Browse files Browse the repository at this point in the history
[Logstash]: Http client - v0.4.0
  • Loading branch information
gaetan-deputier authored Feb 21, 2020
2 parents 9e7c929 + e5f7690 commit eb7b202
Show file tree
Hide file tree
Showing 5 changed files with 356 additions and 44 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## 0.4.0
- Enable HTTP forwarding for logs
- Provide an option to disable SSL hostname verification for HTTPS

## 0.3.1
- Make sure that we can disable retries

Expand Down
37 changes: 31 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@

DatadogLogs lets you send logs to Datadog based on LogStash events.

## Requirements

The plugin relies upon the `zlib` library for compressing data.

## How to install it?

```bash
Expand All @@ -12,7 +16,9 @@ logstash-plugin install logstash-output-datadog_logs

## How to use it?

The `datadog_logs` plugin is configured by default to send logs to a US endpoint over an SSL-encrypted TCP connection.
The `datadog_logs` plugin is configured by default to send logs to a US endpoint over an SSL-encrypted HTTP connection.
The logs are by default batched and compressed.

Configure the plugin with your Datadog API key:

```
Expand All @@ -23,14 +29,26 @@ output {
}
```

To send logs to the Datadog's EU endpoint, override default `host` and `port` options:
To enable TCP forwarding, configure your forwarder with:

```
output {
datadog_logs {
api_key => "<DATADOG_API_KEY>"
host => "tcp-intake.logs.datadoghq.eu"
port => "443"
host => "tcp-intake.logs.datadoghq.com"
port => 10516
use_http => false
}
}
```

To send logs to the Datadog's EU HTTP endpoint, override the default `host`

```
output {
datadog_logs {
api_key => "<DATADOG_API_KEY>"
host => "http-intake.logs.datadoghq.eu"
}
}
```
Expand All @@ -41,9 +59,16 @@ output {
|-------------|--------------------------------------------------------------------------|----------------|
| **api_key** | The API key of your Datadog platform | nil |
| **host** | Proxy endpoint when logs are not directly forwarded to Datadog | intake.logs.datadoghq.com |
| **port** | Proxy port when logs are not directly forwarded to Datadog | 10516 |
| **use_ssl** | If true, the agent initializes a secure connection to Datadog. In clear TCP otherwise. | true |
| **port** | Proxy port when logs are not directly forwarded to Datadog | 443 |
| **use_ssl** | If true, the agent initializes a secure connection to Datadog. Ensure to update the port if you disable it. | true |
| **max_retries** | The number of retries before the output plugin stops | 5 |
| **max_backoff** | The maximum time waited between each retry in seconds | 30 |
| **use_http** | Enable HTTP forwarding. If you disable it, make sure to update the port to 10516 if use_ssl is enabled or 10514 otherwise. | true |
| **use_compression** | Enable log compression for HTTP | true |
| **compression_level** | Set the log compression level for HTTP (1 to 9, 9 being the best ratio) | 6 |
| **no_ssl_validation** | Disable SSL validation (useful for proxy forwarding) | false |



For additional options, see the [Datadog endpoint documentation](https://docs.datadoghq.com/logs/?tab=eusite#datadog-logs-endpoints)

Expand Down
250 changes: 216 additions & 34 deletions lib/logstash/outputs/datadog_logs.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,67 +6,249 @@
# encoding: utf-8
require "logstash/outputs/base"
require "logstash/namespace"
require "zlib"


# DatadogLogs lets you send logs to Datadog
# based on LogStash events.
class LogStash::Outputs::DatadogLogs < LogStash::Outputs::Base

# Respect limit documented at https://docs.datadoghq.com/api/?lang=bash#logs
DD_MAX_BATCH_LENGTH = 500
DD_MAX_BATCH_SIZE = 5000000
DD_TRUNCATION_SUFFIX = "...TRUNCATED..."

config_name "datadog_logs"

default :codec, "json"

# Datadog configuration parameters
config :api_key, :validate => :string, :required => true
config :host, :validate => :string, :required => true, :default => 'intake.logs.datadoghq.com'
config :port, :validate => :number, :required => true, :default => 10516
config :use_ssl, :validate => :boolean, :required => true, :default => true
config :max_backoff, :validate => :number, :required => true, :default => 30
config :max_retries, :validate => :number, :required => true, :default => 5
config :api_key, :validate => :string, :required => true
config :host, :validate => :string, :required => true, :default => "http-intake.logs.datadoghq.com"
config :port, :validate => :number, :required => true, :default => 443
config :use_ssl, :validate => :boolean, :required => true, :default => true
config :max_backoff, :validate => :number, :required => true, :default => 30
config :max_retries, :validate => :number, :required => true, :default => 5
config :use_http, :validate => :boolean, :required => false, :default => true
config :use_compression, :validate => :boolean, :required => false, :default => true
config :compression_level, :validate => :number, :required => false, :default => 6
config :no_ssl_validation, :validate => :boolean, :required => false, :default => false

# Register the plugin to logstash
public
def register
require "socket"
client = nil
@codec.on_event do |event, payload|
message = "#{@api_key} #{payload}\n"
retries = 0
@client = new_client(@logger, @api_key, @use_http, @use_ssl, @no_ssl_validation, @host, @port, @use_compression)
end

# Logstash shutdown hook
def close
@client.close
end

# Entry point of the plugin, receiving a set of Logstash events
public
def multi_receive(events)
return if events.empty?
encoded_events = @codec.multi_encode(events)
if @use_http
batches = batch_http_events(encoded_events, DD_MAX_BATCH_LENGTH, DD_MAX_BATCH_SIZE)
batches.each do |batched_event|
process_encoded_payload(format_http_event_batch(batched_event))
end
else
encoded_events.each do |encoded_event|
process_encoded_payload(format_tcp_event(encoded_event.last, @api_key, DD_MAX_BATCH_SIZE))
end
end
end

# Process and send each encoded payload
def process_encoded_payload(payload)
if @use_compression and @use_http
payload = gzip_compress(payload, @compression_level)
end
@client.send_retries(payload, @max_retries, @max_backoff)
end

# Format TCP event
def format_tcp_event(payload, api_key, max_request_size)
formatted_payload = "#{api_key} #{payload}"
if (formatted_payload.bytesize > max_request_size)
return truncate(formatted_payload, max_request_size)
end
formatted_payload
end

# Format HTTP events
def format_http_event_batch(batched_events)
"[#{batched_events.join(',')}]"
end

# Group HTTP events in batches
def batch_http_events(encoded_events, max_batch_length, max_request_size)
batches = []
current_batch = []
current_batch_size = 0
encoded_events.each_with_index do |event, i|
encoded_event = event.last
current_event_size = encoded_event.bytesize
# If this unique log size is bigger than the request size, truncate it
if current_event_size > max_request_size
encoded_event = truncate(encoded_event, max_request_size)
current_event_size = encoded_event.bytesize
end

if (i > 0 and i % max_batch_length == 0) or (current_batch_size + current_event_size > max_request_size)
batches << current_batch
current_batch = []
current_batch_size = 0
end

current_batch_size += encoded_event.bytesize
current_batch << encoded_event
end
batches << current_batch
batches
end

# Truncate events over the provided max length, appending a marker when truncated
def truncate(event, max_length)
if event.length > max_length
event = event[0..max_length - 1]
event[max(0, max_length - DD_TRUNCATION_SUFFIX.length)..max_length - 1] = DD_TRUNCATION_SUFFIX
return event
end
event
end

def max(a, b)
a > b ? a : b
end

# Compress logs with GZIP
def gzip_compress(payload, compression_level)
gz = StringIO.new
gz.set_encoding("BINARY")
z = Zlib::GzipWriter.new(gz, compression_level)
begin
z.write(payload)
ensure
z.close
end
gz.string
end

# Build a new transport client
def new_client(logger, api_key, use_http, use_ssl, no_ssl_validation, host, port, use_compression)
if use_http
DatadogHTTPClient.new logger, use_ssl, no_ssl_validation, host, port, use_compression, api_key
else
DatadogTCPClient.new logger, use_ssl, no_ssl_validation, host, port
end
end

class RetryableError < StandardError;
end

class DatadogClient
def send_retries(payload, max_retries, max_backoff)
backoff = 1
retries = 0
begin
client ||= new_client
client.write(message)
rescue => e
@logger.warn("Could not send payload", :exception => e, :backtrace => e.backtrace)
client.close rescue nil
client = nil
send(payload)
rescue RetryableError => e
if retries < max_retries || max_retries < 0
@logger.warn("Retrying ", :exception => e, :backtrace => e.backtrace)
sleep backoff
backoff = 2 * backoff unless backoff > max_backoff
retries += 1
retry
end
@logger.warn("Max number of retries reached, dropping the payload", :payload => payload, :max_retries => max_retries)
end
end

def send(payload)
raise NotImplementedError, "Datadog transport client should implement the send method"
end

def close
raise NotImplementedError, "Datadog transport client should implement the close method"
end
end

public
def receive(event)
# handle new event
@codec.encode(event)
class DatadogHTTPClient < DatadogClient
require "manticore"

def initialize(logger, use_ssl, no_ssl_validation, host, port, use_compression, api_key)
@logger = logger
protocol = use_ssl ? "https" : "http"
@url = "#{protocol}://#{host}:#{port.to_s}/v1/input/#{api_key}"
@headers = {"Content-Type" => "application/json"}
if use_compression
@headers["Content-Encoding"] = "gzip"
end
logger.info("Starting HTTP connection to #{protocol}://#{host}:#{port.to_s} with compression " + (use_compression ? "enabled" : "disabled"))
config = {}
config[:ssl][:verify] = :disable if no_ssl_validation
@client = Manticore::Client.new(config)
end

def send(payload)
response = @client.post(@url, :body => payload, :headers => @headers).call
if response.code >= 500
raise RetryableError.new "Unable to send payload: #{response.code} #{response.body}"
end
if response.code >= 400
@logger.error("Unable to send payload due to client error: #{response.code} #{response.body}")
end
end

def close
@client.close
end
end

private
def new_client
# open a secure connection with Datadog
if @use_ssl
@logger.info("Starting SSL connection", :host => @host, :port => @port)
socket = TCPSocket.new @host, @port
sslSocket = OpenSSL::SSL::SSLSocket.new socket
sslSocket.connect
return sslSocket
else
@logger.info("Starting plaintext connection", :host => @host, :port => @port)
return TCPSocket.new @host, @port
class DatadogTCPClient < DatadogClient
require "socket"

def initialize(logger, use_ssl, no_ssl_validation, host, port)
@logger = logger
@use_ssl = use_ssl
@no_ssl_validation = no_ssl_validation
@host = host
@port = port
end

def connect
if @use_ssl
@logger.info("Starting SSL connection #{@host} #{@port}")
socket = TCPSocket.new @host, @port
ssl_context = OpenSSL::SSL::SSLContext.new
if @no_ssl_validation
ssl_context.set_params({:verify_mode => OpenSSL::SSL::VERIFY_NONE})
end
ssl_context = OpenSSL::SSL::SSLSocket.new socket, ssl_context
ssl_context.connect
ssl_context
else
@logger.info("Starting plaintext connection #{@host} #{@port}")
TCPSocket.new @host, @port
end
end

def send(payload)
begin
@socket ||= connect
@socket.puts(payload)
rescue => e
@socket.close rescue nil
@socket = nil
raise RetryableError.new "Unable to send payload: #{e.message}."
end
end

def close
@socket.close rescue nil
end
end

Expand Down
11 changes: 7 additions & 4 deletions logstash-output-datadog_logs.gemspec
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Gem::Specification.new do |s|
s.name = 'logstash-output-datadog_logs'
s.version = '0.3.1'
s.version = '0.4.0'
s.licenses = ['Apache-2.0']
s.summary = 'DatadogLogs lets you send logs to Datadog based on LogStash events.'
s.homepage = 'https://www.datadoghq.com/'
Expand All @@ -9,14 +9,17 @@ Gem::Specification.new do |s|
s.require_paths = ['lib']

# Files
s.files = Dir['lib/**/*','spec/**/*','vendor/**/*','*.gemspec','*.md','CONTRIBUTORS','Gemfile','LICENSE','NOTICE.TXT']
# Tests
s.files = Dir['lib/**/*', 'spec/**/*', 'vendor/**/*', '*.gemspec', '*.md', 'CONTRIBUTORS', 'Gemfile', 'LICENSE', 'NOTICE.TXT']
# Tests
s.test_files = s.files.grep(%r{^(test|spec|features)/})

# Special flag to let us know this is actually a logstash plugin
s.metadata = { "logstash_plugin" => "true", "logstash_group" => "output" }
s.metadata = {"logstash_plugin" => "true", "logstash_group" => "output"}

# Gem dependencies
s.add_runtime_dependency "logstash-core-plugin-api", "~> 2.0"
s.add_runtime_dependency 'manticore', '>= 0.5.2', '< 1.0.0'
s.add_runtime_dependency 'logstash-codec-json'

s.add_development_dependency 'logstash-devutils'
end
Loading

0 comments on commit eb7b202

Please sign in to comment.