diff --git a/.gitignore b/.gitignore index c2087af..cd3e886 100644 --- a/.gitignore +++ b/.gitignore @@ -59,3 +59,4 @@ gradle/wrapper/gradle-wrapper.properties .vscode/settings.json rspec.xml e2e/output_file.txt +logs.txt diff --git a/CHANGELOG.md b/CHANGELOG.md index 5f3e28e..299e42e 100755 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,17 @@ # Changelog + +# 2.0.3 + +- Make JSON mapping optional + + +# 2.0.2 + +- Bugfix for the scenario where the plugin uses managed identity. Instead of providing the managed identity name as empty in the config, +it can completely be skipped + + # 2.0.0 - Use (5.0.2) version of the java sdk, and retrieve it from maven with bundler. Supports logstash 8.6 versions and up diff --git a/README.md b/README.md index 565c2cc..876f683 100755 --- a/README.md +++ b/README.md @@ -62,7 +62,7 @@ More information about configuring Logstash can be found in the [logstash config | **managed_identity**| Managed Identity to authenticate. For user-based managed ID, use the Client ID GUID. For system-based, use the value `system`. The ID needs to have 'ingest' privileges on the cluster. | Optional| | **database**| Database name to place events | Required | | **table** | Target table name to place events | Required -| **json_mapping** | Maps each attribute from incoming event JSON strings to the appropriate column in the table. Note that this must be in JSON format, as this is the interface between Logstash and Kusto | Required | +| **json_mapping** | Maps each attribute from incoming event JSON strings to the appropriate column in the table. Note that this must be in JSON format, as this is the interface between Logstash and Kusto | Optional | | **recovery** | If set to true (default), plugin will attempt to resend pre-existing temp files found in the path upon startup | | | **delete_temp_files** | Determines if temp files will be deleted after a successful upload (true is default; set false for debug purposes only)| | | **flush_interval** | The time (in seconds) for flushing writes to temporary files. Default is 2 seconds, 0 will flush on every event. Increase this value to reduce IO calls but keep in mind that events in the buffer will be lost in case of abrupt failure.| | @@ -81,6 +81,8 @@ export LS_JAVA_OPTS="-Dhttp.proxyHost=1.2.34 -Dhttp.proxyPort=8989 -Dhttps.prox | Version | Release Date | Notes | | --- | --- | --- | +| 2.0.3 | 2023-12-12 | - Make JSON mapping field optional. If not provided logstash output JSON attribute names will be used for column resolution | +| 2.0.2 | 2023-11-28 | - Bugfix for the scenario where the plugin uses managed identity. Instead of providing the managed identity name as empty in the config,it can completely be skipped | | 2.0.0 | 2023-09-19 | - Upgrade to the latest Java SDK version [5.0.2](https://github.com/Azure/azure-kusto-java/releases/tag/v5.0.2). Tests have been performed on **__Logstash 8.5__** and up (Does not work with 6.x or 7.x versions of Logstash - For these versions use 1.x.x versions of logstash-output-kusto gem) - Fixes CVE's in common-text & outdated Jackson libraries | | 1.0.6 | 2022-11-29 | - Upgrade to the latest Java SDK [3.2.1](https://github.com/Azure/azure-kusto-java/releases/tag/v3.2.1) version. Tests have been performed on Logstash 6.x and up.| diff --git a/build.gradle b/build.gradle index 4777082..286185e 100644 --- a/build.gradle +++ b/build.gradle @@ -28,12 +28,12 @@ dependencies { implementation 'com.azure:azure-storage-blob:12.23.0' implementation 'com.azure:azure-storage-common:12.22.0' implementation 'com.azure:azure-storage-internal-avro:12.8.0' - implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-xml:2.14.2' + implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-xml:2.14.3' implementation 'com.azure:azure-storage-queue:12.18.0' - implementation 'com.fasterxml.jackson.core:jackson-annotations:2.14.2' - implementation 'com.fasterxml.jackson.core:jackson-core:2.14.2' - implementation 'com.fasterxml.jackson.core:jackson-databind:2.14.2' - implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.14.2' + implementation 'com.fasterxml.jackson.core:jackson-annotations:2.14.3' + implementation 'com.fasterxml.jackson.core:jackson-core:2.14.3' + implementation 'com.fasterxml.jackson.core:jackson-databind:2.14.3' + implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.14.3' implementation 'com.fasterxml.woodstox:woodstox-core:6.5.0' implementation 'com.github.stephenc.jcip:jcip-annotations:1.0-1' implementation 'com.microsoft.azure.kusto:kusto-data:5.0.2' diff --git a/e2e/e2e.rb b/e2e/e2e.rb index cf81130..618f673 100755 --- a/e2e/e2e.rb +++ b/e2e/e2e.rb @@ -15,45 +15,66 @@ def initialize @engine_url = ENV["ENGINE_URL"] @ingest_url = ENV["INGEST_URL"] @app_id = ENV["APP_ID"] - @app_kay = ENV['APP_KEY'] + @app_key = ENV['APP_KEY'] @tenant_id = ENV['TENANT_ID'] @database = ENV['TEST_DATABASE'] - @table = "RubyE2E#{Time.now.getutc.to_i}" + @table_with_mapping = "RubyE2E#{Time.now.getutc.to_i}" + @table_without_mapping = "RubyE2ENoMapping#{Time.now.getutc.to_i}" @mapping_name = "test_mapping" @csv_file = "dataset.csv" @logstash_config = %{ -input { - file { path => "#{@input_file}"} -} -filter { - csv { columns => [#{@csv_columns}]} -} -output { - file { path => "#{@output_file}"} - stdout { codec => rubydebug } - kusto { - path => "tmp%{+YYYY-MM-dd-HH-mm}.txt" - ingest_url => "#{@ingest_url}" - app_id => "#{@app_id}" - app_key => "#{@app_kay}" - app_tenant => "#{@tenant_id}" - database => "#{@database}" - table => "#{@table}" - json_mapping => "#{@mapping_name}" + input { + file { path => "#{@input_file}"} + } + filter { + csv { columns => [#{@csv_columns}]} + } + output { + file { path => "#{@output_file}"} + stdout { codec => rubydebug } + kusto { + path => "tmp%{+YYYY-MM-dd-HH-mm}.txt" + ingest_url => "#{@ingest_url}" + app_id => "#{@app_id}" + app_key => "#{@app_key}" + app_tenant => "#{@tenant_id}" + database => "#{@database}" + table => "#{@table_with_mapping}" + json_mapping => "#{@mapping_name}" + } + kusto { + path => "nomaptmp%{+YYYY-MM-dd-HH-mm}.txt" + ingest_url => "#{@ingest_url}" + app_id => "#{@app_id}" + app_key => "#{@app_key}" + app_tenant => "#{@tenant_id}" + database => "#{@database}" + table => "#{@table_without_mapping}" + } } -} } end def create_table_and_mapping - puts "Creating table #{@table}" - @query_client.execute(@database, ".drop table #{@table} ifexists") - sleep(1) - @query_client.execute(@database, ".create table #{@table} #{@columns}") - @query_client.execute(@database, ".alter table #{@table} policy ingestionbatching @'{\"MaximumBatchingTimeSpan\":\"00:00:10\", \"MaximumNumberOfItems\": 1, \"MaximumRawDataSizeMB\": 100}' -") - @query_client.execute(@database, ".create table #{@table} ingestion json mapping '#{@mapping_name}' '#{File.read("dataset_mapping.json")}'") + Array[@table_with_mapping, @table_without_mapping].each { |tableop| + puts "Creating table #{tableop}" + @query_client.execute(@database, ".drop table #{tableop} ifexists") + sleep(1) + @query_client.execute(@database, ".create table #{tableop} #{@columns}") + @query_client.execute(@database, ".alter table #{tableop} policy ingestionbatching @'{\"MaximumBatchingTimeSpan\":\"00:00:10\", \"MaximumNumberOfItems\": 1, \"MaximumRawDataSizeMB\": 100}'") + } + # Mapping only for one table + @query_client.execute(@database, ".create table #{@table_with_mapping} ingestion json mapping '#{@mapping_name}' '#{File.read("dataset_mapping.json")}'") + end + + + def drop_and_cleanup + Array[@table_with_mapping, @table_without_mapping].each { |tableop| + puts "Dropping table #{tableop}" + @query_client.execute(@database, ".drop table #{tableop} ifexists") + sleep(1) + } end def run_logstash @@ -76,54 +97,53 @@ def run_logstash def assert_data max_timeout = 10 csv_data = CSV.read(@csv_file) - - (0...max_timeout).each do |_| - begin - sleep(5) - query = @query_client.execute(@database, "#{@table} | sort by rownumber asc") - result = query.getPrimaryResults() - raise "Wrong count - expected #{csv_data.length}, got #{result.count()}" unless result.count() == csv_data.length - rescue Exception => e - puts "Error: #{e}" - end - (0...csv_data.length).each do |i| - result.next() - puts "Item #{i}" - (0...@column_count).each do |j| - csv_item = csv_data[i][j] - result_item = result.getObject(j) == nil ? "null" : result.getString(j) - - #special cases for data that is different in csv vs kusto - if j == 4 #kusto boolean field - csv_item = csv_item.to_s == "1" ? "true" : "false" - elsif j == 12 # date formatting - csv_item = csv_item.sub(".0000000", "") - elsif j == 15 # numbers as text - result_item = i.to_s - elsif j == 17 #null - next + Array[@table_with_mapping, @table_without_mapping].each { |tableop| + puts "Validating results for table #{tableop}" + (0...max_timeout).each do |_| + begin + sleep(5) + query = @query_client.execute(@database, "#{tableop} | sort by rownumber asc") + result = query.getPrimaryResults() + raise "Wrong count - expected #{csv_data.length}, got #{result.count()} in table #{tableop}" unless result.count() == csv_data.length + rescue Exception => e + puts "Error: #{e}" + end + (0...csv_data.length).each do |i| + result.next() + puts "Item #{i}" + (0...@column_count).each do |j| + csv_item = csv_data[i][j] + result_item = result.getObject(j) == nil ? "null" : result.getString(j) + #special cases for data that is different in csv vs kusto + if j == 4 #kusto boolean field + csv_item = csv_item.to_s == "1" ? "true" : "false" + elsif j == 12 # date formatting + csv_item = csv_item.sub(".0000000", "") + elsif j == 15 # numbers as text + result_item = i.to_s + elsif j == 17 #null + next + end + puts " csv[#{j}] = #{csv_item}" + puts " result[#{j}] = #{result_item}" + raise "Result Doesn't match csv in table #{tableop}" unless csv_item == result_item end - puts " csv[#{j}] = #{csv_item}" - puts " result[#{j}] = #{result_item}" - raise "Result Doesn't match csv" unless csv_item == result_item + puts "" end - puts "" + return end - return - - end - raise "Failed after timeouts" - + raise "Failed after timeouts" + } end def start @query_client = $kusto_java.data.ClientFactory.createClient($kusto_java.data.auth.ConnectionStringBuilder::createWithAadApplicationCredentials(@engine_url, @app_id, - @app_kay, @tenant_id)) + @app_key, @tenant_id)) create_table_and_mapping run_logstash assert_data - end - + drop_and_cleanup + end end E2E::new().start \ No newline at end of file diff --git a/lib/logstash/outputs/kusto.rb b/lib/logstash/outputs/kusto.rb index 298a71f..dacb442 100755 --- a/lib/logstash/outputs/kusto.rb +++ b/lib/logstash/outputs/kusto.rb @@ -88,7 +88,8 @@ class LogStash::Outputs::Kusto < LogStash::Outputs::Base config :table, validate: :string, required: true # Mapping name - Used by Kusto to map each attribute from incoming event JSON strings to the appropriate column in the table. # Note that this must be in JSON format, as this is the interface between Logstash and Kusto - config :json_mapping, validate: :string, required: true + # Make this optional as name resolution in the JSON mapping can be done based on attribute names in the incoming event JSON strings + config :json_mapping, validate: :string, default: nil # Mapping name - deprecated, use json_mapping config :mapping, validate: :string, deprecated: true @@ -126,11 +127,12 @@ def register @io_mutex = Mutex.new final_mapping = json_mapping - if final_mapping.empty? + if final_mapping.nil? || final_mapping.empty? final_mapping = mapping end - # TODO: add id to the tmp path to support multiple outputs of the same type + # TODO: add id to the tmp path to support multiple outputs of the same type. + # TODO: Fix final_mapping when dynamic routing is supported # add fields from the meta that will note the destination of the events in the file @path = if dynamic_event_routing File.expand_path("#{path}.%{[@metadata][database]}.%{[@metadata][table]}.%{[@metadata][final_mapping]}") diff --git a/lib/logstash/outputs/kusto/ingestor.rb b/lib/logstash/outputs/kusto/ingestor.rb index 07201e1..3a0b4a5 100755 --- a/lib/logstash/outputs/kusto/ingestor.rb +++ b/lib/logstash/outputs/kusto/ingestor.rb @@ -47,7 +47,6 @@ def initialize(ingest_url, app_id, app_key, app_tenant, managed_identity_id, dat else kusto_java.data.auth.ConnectionStringBuilder.createWithAadApplicationCredentials(ingest_url, app_id, app_key.value, app_tenant) end - # @logger.debug(Gem.loaded_specs.to_s) # Unfortunately there's no way to avoid using the gem/plugin name directly... @@ -69,10 +68,15 @@ def initialize(ingest_url, app_id, app_key, app_tenant, managed_identity_id, dat end @ingestion_properties = kusto_java.ingest.IngestionProperties.new(database, table) - @ingestion_properties.setIngestionMapping(json_mapping, kusto_java.ingest.IngestionMapping::IngestionMappingKind::JSON) - @ingestion_properties.setDataFormat(kusto_java.ingest.IngestionProperties::DataFormat::JSON) + is_mapping_ref_provided = !(json_mapping.nil? || json_mapping.empty?) + if is_mapping_ref_provided + @logger.debug('Using mapping reference.', json_mapping) + @ingestion_properties.setIngestionMapping(json_mapping, kusto_java.ingest.IngestionMapping::IngestionMappingKind::JSON) + @ingestion_properties.setDataFormat(kusto_java.ingest.IngestionProperties::DataFormat::JSON) + else + @logger.debug('No mapping reference provided. Columns will be mapped by names in the logstash output') + end @delete_local = delete_local - @logger.debug('Kusto resources are ready.') end diff --git a/version b/version index f93ea0c..6acdb44 100644 --- a/version +++ b/version @@ -1 +1 @@ -2.0.2 \ No newline at end of file +2.0.3 \ No newline at end of file