Skip to content

Commit

Permalink
Updated README.md and configs
Browse files Browse the repository at this point in the history
Updated failed_items_path to be a required config parameter. If set to
"nil", the failed items will not be persisted to local storage, else
the items are stored in the file path provided after max_retries.
  • Loading branch information
MonishkaDas committed Nov 5, 2024
1 parent 0be738e commit 7da8798
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 42 deletions.
68 changes: 37 additions & 31 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,19 +35,22 @@ Perform configuration before sending events from Logstash to Azure Data Explorer

```ruby
output {
kusto {
path => "/tmp/kusto/%{+YYYY-MM-dd-HH-mm}.txt"
ingest_url => "https://ingest-<cluster-name>.kusto.windows.net/"
app_id => "<application id>"
app_key => "<application key/secret>"
app_tenant => "<tenant id>"
database => "<database name>"
table => "<target table>"
json_mapping => "<mapping name>"
proxy_host => "<proxy host>"
proxy_port => <proxy port>
proxy_protocol => <"http"|"https">
}
kusto {
ingest_url => "https://ingest-<cluster-name>.kusto.windows.net/"
app_id => "<application id>"
app_key => "<application key/secret>"
app_tenant => "<tenant id>"
database => "<database name>"
table => "<target table>"
json_mapping => "<mapping name>"
proxy_host => "<proxy host>"
proxy_port => <proxy port>
proxy_protocol => <"http"|"https">
max_size => 10
max_interval => 10
max_retries => 3
failed_items_path => "<path to store failed items>"
}
}
```
More information about configuring Logstash can be found in the [logstash configuration guide](https://www.elastic.co/guide/en/logstash/current/configuration.html)
Expand All @@ -56,22 +59,24 @@ More information about configuring Logstash can be found in the [logstash config

| Parameter Name | Description | Notes |
| --- | --- | --- |
| **path** | The plugin writes events to temporary files before sending them to ADX. This parameter includes a path where files should be written and a time expression for file rotation to trigger an upload to the ADX service. The example above shows how to rotate the files every minute and check the Logstash docs for more information on time expressions. | Required
| **ingest_url** | The Kusto endpoint for ingestion-related communication. See it on the Azure Portal.| Required|
| **app_id, app_key, app_tenant**| Credentials required to connect to the ADX service. Be sure to use an application with 'ingest' privileges. | Optional|
| **managed_identity**| Managed Identity to authenticate. For user-based managed ID, use the Client ID GUID. For system-based, use the value `system`. The ID needs to have 'ingest' privileges on the cluster. | Optional|
| **database**| Database name to place events | Required |
| **table** | Target table name to place events | Required
| **ingest_url** | The Kusto endpoint for ingestion-related communication. See it on the Azure Portal. | Required |
| **app_id, app_key, app_tenant** | Credentials required to connect to the ADX service. Be sure to use an application with 'ingest' privileges. | Optional |
| **managed_identity** | Managed Identity to authenticate. For user-based managed ID, use the Client ID GUID. For system-based, use the value `system`. The ID needs to have 'ingest' privileges on the cluster. | Optional |
| **database** | Database name to place events | Required |
| **table** | Target table name to place events | Required |
| **failed_items_path** | Path to store failed items when max_retries is reached. Set to nil to disable persistence to file (May cause data loss). | Required |
| **json_mapping** | Maps each attribute from incoming event JSON strings to the appropriate column in the table. Note that this must be in JSON format, as this is the interface between Logstash and Kusto | Optional |
| **recovery** | If set to true (default), plugin will attempt to resend pre-existing temp files found in the path upon startup | |
| **delete_temp_files** | Determines if temp files will be deleted after a successful upload (true is default; set false for debug purposes only)| |
| **flush_interval** | The time (in seconds) for flushing writes to temporary files. Default is 2 seconds, 0 will flush on every event. Increase this value to reduce IO calls but keep in mind that events in the buffer will be lost in case of abrupt failure.| |
| **proxy_host** | The proxy hostname for redirecting traffic to Kusto.| |
| **proxy_port** | The proxy port for the proxy. Defaults to 80.| |
| **proxy_protocol** | The proxy server protocol , is one of http or https.| |
| **proxy_host** | The proxy hostname for redirecting traffic to Kusto. | Optional |
| **proxy_port** | The proxy port for the proxy. Defaults to 80. | Optional |
| **proxy_protocol** | The proxy server protocol, is one of http or https. | Optional |
| **max_size** | Maximum size of the buffer before it gets flushed, defaults to 10MB. | Optional |
| **max_interval** | Maximum interval (in seconds) before the buffer gets flushed, defaults to 10. | Optional |
| **max_retries** | Maximum number of retries before the flush fails. Defaults to 3. | Optional |

> Note : LS_JAVA_OPTS can be used to set proxy parameters as well (using export or SET options)
> Note: **path** config parameter is no longer used in the latest release (3.0.0) and will be deprecated in future releases
```bash
export LS_JAVA_OPTS="-Dhttp.proxyHost=1.2.34 -Dhttp.proxyPort=8989 -Dhttps.proxyHost=1.2.3.4 -Dhttps.proxyPort=8989"
```
Expand All @@ -81,12 +86,13 @@ export LS_JAVA_OPTS="-Dhttp.proxyHost=1.2.34 -Dhttp.proxyPort=8989 -Dhttps.prox

| Version | Release Date | Notes |
| --- | --- | --- |
| 2.0.8 | 2024-10-23 | - Fix library deprecations, fix issues in the Azure Identity library |
| 2.0.7 | 2024-01-01 | - Update Kusto JAVA SDK |
| 2.0.3 | 2023-12-12 | - Make JSON mapping field optional. If not provided logstash output JSON attribute names will be used for column resolution |
| 2.0.2 | 2023-11-28 | - Bugfix for the scenario where the plugin uses managed identity. Instead of providing the managed identity name as empty in the config,it can completely be skipped |
| 2.0.0 | 2023-09-19 | - Upgrade to the latest Java SDK version [5.0.2](https://github.com/Azure/azure-kusto-java/releases/tag/v5.0.2). Tests have been performed on **__Logstash 8.5__** and up (Does not work with 6.x or 7.x versions of Logstash - For these versions use 1.x.x versions of logstash-output-kusto gem) - Fixes CVE's in common-text & outdated Jackson libraries |
| 1.0.6 | 2022-11-29 | - Upgrade to the latest Java SDK [3.2.1](https://github.com/Azure/azure-kusto-java/releases/tag/v3.2.1) version. Tests have been performed on Logstash 6.x and up.|
| 3.0.0 | 2024-11-01 | Updated configuration options |
| 2.0.8 | 2024-10-23 | Fix library deprecations, fix issues in the Azure Identity library |
| 2.0.7 | 2024-01-01 | Update Kusto JAVA SDK |
| 2.0.3 | 2023-12-12 | Make JSON mapping field optional. If not provided logstash output JSON attribute names will be used for column resolution |
| 2.0.2 | 2023-11-28 | Bugfix for the scenario where the plugin uses managed identity. Instead of providing the managed identity name as empty in the config,it can completely be skipped |
| 2.0.0 | 2023-09-19 | Upgrade to the latest Java SDK version [5.0.2](https://github.com/Azure/azure-kusto-java/releases/tag/v5.0.2). Tests have been performed on **__Logstash 8.5__** and up (Does not work with 6.x or 7.x versions of Logstash - For these versions use 1.x.x versions of logstash-output-kusto gem) - Fixes CVE's in common-text & outdated Jackson libraries |
| 1.0.6 | 2022-11-29 | Upgrade to the latest Java SDK [3.2.1](https://github.com/Azure/azure-kusto-java/releases/tag/v3.2.1) version. Tests have been performed on Logstash 6.x and up.|


## Development Requirements
Expand Down
12 changes: 6 additions & 6 deletions lib/logstash/outputs/kusto.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ class LogStash::Outputs::Kusto < LogStash::Outputs::Base
config :database, validate: :string, required: true
# Target table name
config :table, validate: :string, required: true
# Path to store failed items when max_retries is reached, set to "nil" to disable persistence to file
config :failed_items_path, validate: :string, required: true

# Mapping name - Used by Kusto to map each attribute from incoming event JSON strings to the appropriate column in the table.
# Note that this must be in JSON format, as this is the interface between Logstash and Kusto
# Make this optional as name resolution in the JSON mapping can be done based on attribute names in the incoming event JSON strings
Expand Down Expand Up @@ -70,16 +73,13 @@ class LogStash::Outputs::Kusto < LogStash::Outputs::Base
config :proxy_protocol, validate: :string, required: false , default: 'http'

# Maximum size of the buffer before it gets flushed, defaults to 10MB
config :max_size, validate: :number, default: 10
config :max_size, validate: :number, required: false , default: 10

# Maximum interval (in seconds) before the buffer gets flushed, defaults to 10
config :max_interval, validate: :number, default: 10
config :max_interval, validate: :number, required: false , default: 10

# Maximum number of retries before the flush fails, defaults to 3
config :max_retries, validate: :number, default: 3

# Path to store failed items, defaults to nil
config :failed_items_path, validate: :string, default: nil
config :max_retries, validate: :number, required: false , default: 3

default :codec, 'json_lines'

Expand Down
10 changes: 5 additions & 5 deletions lib/logstash/outputs/kusto/custom_size_based_buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ def buffer_flush(options = {})
retry
else
@buffer_config[:logger].error("Max retries reached. Failed to flush #{outgoing_items.size} items and #{outgoing_size} bytes")
handle_failed_flush(outgoing_items, e.message)
handle_failed_flush(outgoing_items)
end
end

Expand All @@ -118,8 +118,10 @@ def buffer_flush(options = {})
end
end

def handle_failed_flush(items, error_message)
if @buffer_config[:failed_items_path]
def handle_failed_flush(items)
if @buffer_config[:failed_items_path].nil? || @buffer_config[:failed_items_path] == "nil"
@buffer_config[:logger].warn("No failed_items_path configured. The failed items are not persisted. Data loss may occur.")
else
begin
::File.open(@buffer_config[:failed_items_path], 'a') do |file|
items.each do |item|
Expand All @@ -130,8 +132,6 @@ def handle_failed_flush(items, error_message)
rescue => e
@buffer_config[:logger].error("Failed to store items: #{e.message}")
end
else
@buffer_config[:logger].warn("No failed_items_path configured. Data loss may occur.")
end
end

Expand Down

0 comments on commit 7da8798

Please sign in to comment.