From 1dc7edc0dfa57fe3f274d57ce23b268022fe58a8 Mon Sep 17 00:00:00 2001 From: Ramachandran A G Date: Mon, 11 Dec 2023 12:53:50 +0530 Subject: [PATCH] *Update and complete tests --- build.gradle | 10 +-- e2e/e2e.rb | 150 +++++++++++++++++++--------------- lib/logstash/outputs/kusto.rb | 7 +- 3 files changed, 94 insertions(+), 73 deletions(-) diff --git a/build.gradle b/build.gradle index 4777082..286185e 100644 --- a/build.gradle +++ b/build.gradle @@ -28,12 +28,12 @@ dependencies { implementation 'com.azure:azure-storage-blob:12.23.0' implementation 'com.azure:azure-storage-common:12.22.0' implementation 'com.azure:azure-storage-internal-avro:12.8.0' - implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-xml:2.14.2' + implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-xml:2.14.3' implementation 'com.azure:azure-storage-queue:12.18.0' - implementation 'com.fasterxml.jackson.core:jackson-annotations:2.14.2' - implementation 'com.fasterxml.jackson.core:jackson-core:2.14.2' - implementation 'com.fasterxml.jackson.core:jackson-databind:2.14.2' - implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.14.2' + implementation 'com.fasterxml.jackson.core:jackson-annotations:2.14.3' + implementation 'com.fasterxml.jackson.core:jackson-core:2.14.3' + implementation 'com.fasterxml.jackson.core:jackson-databind:2.14.3' + implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.14.3' implementation 'com.fasterxml.woodstox:woodstox-core:6.5.0' implementation 'com.github.stephenc.jcip:jcip-annotations:1.0-1' implementation 'com.microsoft.azure.kusto:kusto-data:5.0.2' diff --git a/e2e/e2e.rb b/e2e/e2e.rb index cf81130..1d9a115 100755 --- a/e2e/e2e.rb +++ b/e2e/e2e.rb @@ -15,45 +15,66 @@ def initialize @engine_url = ENV["ENGINE_URL"] @ingest_url = ENV["INGEST_URL"] @app_id = ENV["APP_ID"] - @app_kay = ENV['APP_KEY'] + @app_key = ENV['APP_KEY'] @tenant_id = ENV['TENANT_ID'] @database = ENV['TEST_DATABASE'] - @table = "RubyE2E#{Time.now.getutc.to_i}" + @table_with_mapping = "RubyE2E#{Time.now.getutc.to_i}" + @table_without_mapping = "RubyE2ENoMapping#{Time.now.getutc.to_i}" @mapping_name = "test_mapping" @csv_file = "dataset.csv" @logstash_config = %{ -input { - file { path => "#{@input_file}"} -} -filter { - csv { columns => [#{@csv_columns}]} -} -output { - file { path => "#{@output_file}"} - stdout { codec => rubydebug } - kusto { - path => "tmp%{+YYYY-MM-dd-HH-mm}.txt" - ingest_url => "#{@ingest_url}" - app_id => "#{@app_id}" - app_key => "#{@app_kay}" - app_tenant => "#{@tenant_id}" - database => "#{@database}" - table => "#{@table}" - json_mapping => "#{@mapping_name}" + input { + file { path => "#{@input_file}"} + } + filter { + csv { columns => [#{@csv_columns}]} + } + output { + file { path => "#{@output_file}"} + stdout { codec => rubydebug } + kusto { + path => "tmp%{+YYYY-MM-dd-HH-mm}.txt" + ingest_url => "#{@ingest_url}" + app_id => "#{@app_id}" + app_key => "#{@app_key}" + app_tenant => "#{@tenant_id}" + database => "#{@database}" + table => "#{@table_with_mapping}" + json_mapping => "#{@mapping_name}" + } + kusto { + path => "nomaptmp%{+YYYY-MM-dd-HH-mm}.txt" + ingest_url => "#{@ingest_url}" + app_id => "#{@app_id}" + app_key => "#{@app_key}" + app_tenant => "#{@tenant_id}" + database => "#{@database}" + table => "#{@table_without_mapping}" + } } -} } end def create_table_and_mapping - puts "Creating table #{@table}" - @query_client.execute(@database, ".drop table #{@table} ifexists") - sleep(1) - @query_client.execute(@database, ".create table #{@table} #{@columns}") - @query_client.execute(@database, ".alter table #{@table} policy ingestionbatching @'{\"MaximumBatchingTimeSpan\":\"00:00:10\", \"MaximumNumberOfItems\": 1, \"MaximumRawDataSizeMB\": 100}' -") - @query_client.execute(@database, ".create table #{@table} ingestion json mapping '#{@mapping_name}' '#{File.read("dataset_mapping.json")}'") + Array[@table_with_mapping, @table_without_mapping].each { |tableop| + puts "Creating table #{tableop}" + @query_client.execute(@database, ".drop table #{tableop} ifexists") + sleep(1) + @query_client.execute(@database, ".create table #{tableop} #{@columns}") + @query_client.execute(@database, ".alter table #{tableop} policy ingestionbatching @'{\"MaximumBatchingTimeSpan\":\"00:00:10\", \"MaximumNumberOfItems\": 1, \"MaximumRawDataSizeMB\": 100}'") + } + # Mapping only for one table + @query_client.execute(@database, ".create table #{@table_with_mapping} ingestion json mapping '#{@mapping_name}' '#{File.read("dataset_mapping.json")}'") + end + + + def drop_and_cleanup + Array[@table_with_mapping, @table_without_mapping].each { |tableop| + puts "Dropping table #{tableop}" + @query_client.execute(@database, ".drop table #{tableop} ifexists") + sleep(1) + } end def run_logstash @@ -76,44 +97,43 @@ def run_logstash def assert_data max_timeout = 10 csv_data = CSV.read(@csv_file) - - (0...max_timeout).each do |_| - begin - sleep(5) - query = @query_client.execute(@database, "#{@table} | sort by rownumber asc") - result = query.getPrimaryResults() - raise "Wrong count - expected #{csv_data.length}, got #{result.count()}" unless result.count() == csv_data.length - rescue Exception => e - puts "Error: #{e}" - end - (0...csv_data.length).each do |i| - result.next() - puts "Item #{i}" - (0...@column_count).each do |j| - csv_item = csv_data[i][j] - result_item = result.getObject(j) == nil ? "null" : result.getString(j) - - #special cases for data that is different in csv vs kusto - if j == 4 #kusto boolean field - csv_item = csv_item.to_s == "1" ? "true" : "false" - elsif j == 12 # date formatting - csv_item = csv_item.sub(".0000000", "") - elsif j == 15 # numbers as text - result_item = i.to_s - elsif j == 17 #null - next + Array[@table_with_mapping, @table_without_mapping].each { |tableop| + puts "Validating results for table #{tableop}" + (0...max_timeout).each do |_| + begin + sleep(5) + query = @query_client.execute(@database, "#{tableop} | sort by rownumber asc") + result = query.getPrimaryResults() + raise "Wrong count - expected #{csv_data.length}, got #{result.count()} in table #{tableop}" unless result.count() == csv_data.length + rescue Exception => e + puts "Error: #{e}" + end + (0...csv_data.length).each do |i| + result.next() + puts "Item #{i}" + (0...@column_count).each do |j| + csv_item = csv_data[i][j] + result_item = result.getObject(j) == nil ? "null" : result.getString(j) + #special cases for data that is different in csv vs kusto + if j == 4 #kusto boolean field + csv_item = csv_item.to_s == "1" ? "true" : "false" + elsif j == 12 # date formatting + csv_item = csv_item.sub(".0000000", "") + elsif j == 15 # numbers as text + result_item = i.to_s + elsif j == 17 #null + next + end + puts " csv[#{j}] = #{csv_item}" + puts " result[#{j}] = #{result_item}" + raise "Result Doesn't match csv in table #{tableop}" unless csv_item == result_item end - puts " csv[#{j}] = #{csv_item}" - puts " result[#{j}] = #{result_item}" - raise "Result Doesn't match csv" unless csv_item == result_item + puts "" end - puts "" + return end - return - - end - raise "Failed after timeouts" - + raise "Failed after timeouts" + } end def start @@ -122,8 +142,8 @@ def start create_table_and_mapping run_logstash assert_data - end - + drop_and_cleanup + end end E2E::new().start \ No newline at end of file diff --git a/lib/logstash/outputs/kusto.rb b/lib/logstash/outputs/kusto.rb index 8f0b88d..dacb442 100755 --- a/lib/logstash/outputs/kusto.rb +++ b/lib/logstash/outputs/kusto.rb @@ -89,7 +89,7 @@ class LogStash::Outputs::Kusto < LogStash::Outputs::Base # Mapping name - Used by Kusto to map each attribute from incoming event JSON strings to the appropriate column in the table. # Note that this must be in JSON format, as this is the interface between Logstash and Kusto # Make this optional as name resolution in the JSON mapping can be done based on attribute names in the incoming event JSON strings - config :json_mapping, validate: :string, , default: nil + config :json_mapping, validate: :string, default: nil # Mapping name - deprecated, use json_mapping config :mapping, validate: :string, deprecated: true @@ -127,11 +127,12 @@ def register @io_mutex = Mutex.new final_mapping = json_mapping - if final_mapping.empty? + if final_mapping.nil? || final_mapping.empty? final_mapping = mapping end - # TODO: add id to the tmp path to support multiple outputs of the same type + # TODO: add id to the tmp path to support multiple outputs of the same type. + # TODO: Fix final_mapping when dynamic routing is supported # add fields from the meta that will note the destination of the events in the file @path = if dynamic_event_routing File.expand_path("#{path}.%{[@metadata][database]}.%{[@metadata][table]}.%{[@metadata][final_mapping]}")