From 7da8798da4064cbea9ab66fa69c9a8e2ca5baa78 Mon Sep 17 00:00:00 2001 From: MonishkaDas Date: Tue, 5 Nov 2024 15:59:55 +0530 Subject: [PATCH] Updated README.md and configs Updated failed_items_path to be a required config parameter. If set to "nil", the failed items will not be persisted to local storage, else the items are stored in the file path provided after max_retries. --- README.md | 68 ++++++++++--------- lib/logstash/outputs/kusto.rb | 12 ++-- .../outputs/kusto/custom_size_based_buffer.rb | 10 +-- 3 files changed, 48 insertions(+), 42 deletions(-) diff --git a/README.md b/README.md index b256a26..bd113fb 100755 --- a/README.md +++ b/README.md @@ -35,19 +35,22 @@ Perform configuration before sending events from Logstash to Azure Data Explorer ```ruby output { - kusto { - path => "/tmp/kusto/%{+YYYY-MM-dd-HH-mm}.txt" - ingest_url => "https://ingest-.kusto.windows.net/" - app_id => "" - app_key => "" - app_tenant => "" - database => "" - table => "" - json_mapping => "" - proxy_host => "" - proxy_port => - proxy_protocol => <"http"|"https"> - } + kusto { + ingest_url => "https://ingest-.kusto.windows.net/" + app_id => "" + app_key => "" + app_tenant => "" + database => "" + table => "" + json_mapping => "" + proxy_host => "" + proxy_port => + proxy_protocol => <"http"|"https"> + max_size => 10 + max_interval => 10 + max_retries => 3 + failed_items_path => "" + } } ``` More information about configuring Logstash can be found in the [logstash configuration guide](https://www.elastic.co/guide/en/logstash/current/configuration.html) @@ -56,22 +59,24 @@ More information about configuring Logstash can be found in the [logstash config | Parameter Name | Description | Notes | | --- | --- | --- | -| **path** | The plugin writes events to temporary files before sending them to ADX. This parameter includes a path where files should be written and a time expression for file rotation to trigger an upload to the ADX service. The example above shows how to rotate the files every minute and check the Logstash docs for more information on time expressions. | Required -| **ingest_url** | The Kusto endpoint for ingestion-related communication. See it on the Azure Portal.| Required| -| **app_id, app_key, app_tenant**| Credentials required to connect to the ADX service. Be sure to use an application with 'ingest' privileges. | Optional| -| **managed_identity**| Managed Identity to authenticate. For user-based managed ID, use the Client ID GUID. For system-based, use the value `system`. The ID needs to have 'ingest' privileges on the cluster. | Optional| -| **database**| Database name to place events | Required | -| **table** | Target table name to place events | Required +| **ingest_url** | The Kusto endpoint for ingestion-related communication. See it on the Azure Portal. | Required | +| **app_id, app_key, app_tenant** | Credentials required to connect to the ADX service. Be sure to use an application with 'ingest' privileges. | Optional | +| **managed_identity** | Managed Identity to authenticate. For user-based managed ID, use the Client ID GUID. For system-based, use the value `system`. The ID needs to have 'ingest' privileges on the cluster. | Optional | +| **database** | Database name to place events | Required | +| **table** | Target table name to place events | Required | +| **failed_items_path** | Path to store failed items when max_retries is reached. Set to nil to disable persistence to file (May cause data loss). | Required | | **json_mapping** | Maps each attribute from incoming event JSON strings to the appropriate column in the table. Note that this must be in JSON format, as this is the interface between Logstash and Kusto | Optional | -| **recovery** | If set to true (default), plugin will attempt to resend pre-existing temp files found in the path upon startup | | -| **delete_temp_files** | Determines if temp files will be deleted after a successful upload (true is default; set false for debug purposes only)| | -| **flush_interval** | The time (in seconds) for flushing writes to temporary files. Default is 2 seconds, 0 will flush on every event. Increase this value to reduce IO calls but keep in mind that events in the buffer will be lost in case of abrupt failure.| | -| **proxy_host** | The proxy hostname for redirecting traffic to Kusto.| | -| **proxy_port** | The proxy port for the proxy. Defaults to 80.| | -| **proxy_protocol** | The proxy server protocol , is one of http or https.| | +| **proxy_host** | The proxy hostname for redirecting traffic to Kusto. | Optional | +| **proxy_port** | The proxy port for the proxy. Defaults to 80. | Optional | +| **proxy_protocol** | The proxy server protocol, is one of http or https. | Optional | +| **max_size** | Maximum size of the buffer before it gets flushed, defaults to 10MB. | Optional | +| **max_interval** | Maximum interval (in seconds) before the buffer gets flushed, defaults to 10. | Optional | +| **max_retries** | Maximum number of retries before the flush fails. Defaults to 3. | Optional | > Note : LS_JAVA_OPTS can be used to set proxy parameters as well (using export or SET options) +> Note: **path** config parameter is no longer used in the latest release (3.0.0) and will be deprecated in future releases + ```bash export LS_JAVA_OPTS="-Dhttp.proxyHost=1.2.34 -Dhttp.proxyPort=8989 -Dhttps.proxyHost=1.2.3.4 -Dhttps.proxyPort=8989" ``` @@ -81,12 +86,13 @@ export LS_JAVA_OPTS="-Dhttp.proxyHost=1.2.34 -Dhttp.proxyPort=8989 -Dhttps.prox | Version | Release Date | Notes | | --- | --- | --- | -| 2.0.8 | 2024-10-23 | - Fix library deprecations, fix issues in the Azure Identity library | -| 2.0.7 | 2024-01-01 | - Update Kusto JAVA SDK | -| 2.0.3 | 2023-12-12 | - Make JSON mapping field optional. If not provided logstash output JSON attribute names will be used for column resolution | -| 2.0.2 | 2023-11-28 | - Bugfix for the scenario where the plugin uses managed identity. Instead of providing the managed identity name as empty in the config,it can completely be skipped | -| 2.0.0 | 2023-09-19 | - Upgrade to the latest Java SDK version [5.0.2](https://github.com/Azure/azure-kusto-java/releases/tag/v5.0.2). Tests have been performed on **__Logstash 8.5__** and up (Does not work with 6.x or 7.x versions of Logstash - For these versions use 1.x.x versions of logstash-output-kusto gem) - Fixes CVE's in common-text & outdated Jackson libraries | -| 1.0.6 | 2022-11-29 | - Upgrade to the latest Java SDK [3.2.1](https://github.com/Azure/azure-kusto-java/releases/tag/v3.2.1) version. Tests have been performed on Logstash 6.x and up.| +| 3.0.0 | 2024-11-01 | Updated configuration options | +| 2.0.8 | 2024-10-23 | Fix library deprecations, fix issues in the Azure Identity library | +| 2.0.7 | 2024-01-01 | Update Kusto JAVA SDK | +| 2.0.3 | 2023-12-12 | Make JSON mapping field optional. If not provided logstash output JSON attribute names will be used for column resolution | +| 2.0.2 | 2023-11-28 | Bugfix for the scenario where the plugin uses managed identity. Instead of providing the managed identity name as empty in the config,it can completely be skipped | +| 2.0.0 | 2023-09-19 | Upgrade to the latest Java SDK version [5.0.2](https://github.com/Azure/azure-kusto-java/releases/tag/v5.0.2). Tests have been performed on **__Logstash 8.5__** and up (Does not work with 6.x or 7.x versions of Logstash - For these versions use 1.x.x versions of logstash-output-kusto gem) - Fixes CVE's in common-text & outdated Jackson libraries | +| 1.0.6 | 2022-11-29 | Upgrade to the latest Java SDK [3.2.1](https://github.com/Azure/azure-kusto-java/releases/tag/v3.2.1) version. Tests have been performed on Logstash 6.x and up.| ## Development Requirements diff --git a/lib/logstash/outputs/kusto.rb b/lib/logstash/outputs/kusto.rb index 246980d..b53b648 100755 --- a/lib/logstash/outputs/kusto.rb +++ b/lib/logstash/outputs/kusto.rb @@ -39,6 +39,9 @@ class LogStash::Outputs::Kusto < LogStash::Outputs::Base config :database, validate: :string, required: true # Target table name config :table, validate: :string, required: true + # Path to store failed items when max_retries is reached, set to "nil" to disable persistence to file + config :failed_items_path, validate: :string, required: true + # Mapping name - Used by Kusto to map each attribute from incoming event JSON strings to the appropriate column in the table. # Note that this must be in JSON format, as this is the interface between Logstash and Kusto # Make this optional as name resolution in the JSON mapping can be done based on attribute names in the incoming event JSON strings @@ -70,16 +73,13 @@ class LogStash::Outputs::Kusto < LogStash::Outputs::Base config :proxy_protocol, validate: :string, required: false , default: 'http' # Maximum size of the buffer before it gets flushed, defaults to 10MB - config :max_size, validate: :number, default: 10 + config :max_size, validate: :number, required: false , default: 10 # Maximum interval (in seconds) before the buffer gets flushed, defaults to 10 - config :max_interval, validate: :number, default: 10 + config :max_interval, validate: :number, required: false , default: 10 # Maximum number of retries before the flush fails, defaults to 3 - config :max_retries, validate: :number, default: 3 - - # Path to store failed items, defaults to nil - config :failed_items_path, validate: :string, default: nil + config :max_retries, validate: :number, required: false , default: 3 default :codec, 'json_lines' diff --git a/lib/logstash/outputs/kusto/custom_size_based_buffer.rb b/lib/logstash/outputs/kusto/custom_size_based_buffer.rb index 646d61a..e04478a 100644 --- a/lib/logstash/outputs/kusto/custom_size_based_buffer.rb +++ b/lib/logstash/outputs/kusto/custom_size_based_buffer.rb @@ -109,7 +109,7 @@ def buffer_flush(options = {}) retry else @buffer_config[:logger].error("Max retries reached. Failed to flush #{outgoing_items.size} items and #{outgoing_size} bytes") - handle_failed_flush(outgoing_items, e.message) + handle_failed_flush(outgoing_items) end end @@ -118,8 +118,10 @@ def buffer_flush(options = {}) end end - def handle_failed_flush(items, error_message) - if @buffer_config[:failed_items_path] + def handle_failed_flush(items) + if @buffer_config[:failed_items_path].nil? || @buffer_config[:failed_items_path] == "nil" + @buffer_config[:logger].warn("No failed_items_path configured. The failed items are not persisted. Data loss may occur.") + else begin ::File.open(@buffer_config[:failed_items_path], 'a') do |file| items.each do |item| @@ -130,8 +132,6 @@ def handle_failed_flush(items, error_message) rescue => e @buffer_config[:logger].error("Failed to store items: #{e.message}") end - else - @buffer_config[:logger].warn("No failed_items_path configured. Data loss may occur.") end end