Skip to content

Commit

Permalink
*Update and complete tests
Browse files Browse the repository at this point in the history
  • Loading branch information
ag-ramachandran committed Dec 11, 2023
1 parent 946b5dc commit 1dc7edc
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 73 deletions.
10 changes: 5 additions & 5 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@ dependencies {
implementation 'com.azure:azure-storage-blob:12.23.0'
implementation 'com.azure:azure-storage-common:12.22.0'
implementation 'com.azure:azure-storage-internal-avro:12.8.0'
implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-xml:2.14.2'
implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-xml:2.14.3'
implementation 'com.azure:azure-storage-queue:12.18.0'
implementation 'com.fasterxml.jackson.core:jackson-annotations:2.14.2'
implementation 'com.fasterxml.jackson.core:jackson-core:2.14.2'
implementation 'com.fasterxml.jackson.core:jackson-databind:2.14.2'
implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.14.2'
implementation 'com.fasterxml.jackson.core:jackson-annotations:2.14.3'
implementation 'com.fasterxml.jackson.core:jackson-core:2.14.3'
implementation 'com.fasterxml.jackson.core:jackson-databind:2.14.3'
implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.14.3'
implementation 'com.fasterxml.woodstox:woodstox-core:6.5.0'
implementation 'com.github.stephenc.jcip:jcip-annotations:1.0-1'
implementation 'com.microsoft.azure.kusto:kusto-data:5.0.2'
Expand Down
150 changes: 85 additions & 65 deletions e2e/e2e.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,45 +15,66 @@ def initialize
@engine_url = ENV["ENGINE_URL"]
@ingest_url = ENV["INGEST_URL"]
@app_id = ENV["APP_ID"]
@app_kay = ENV['APP_KEY']
@app_key = ENV['APP_KEY']
@tenant_id = ENV['TENANT_ID']
@database = ENV['TEST_DATABASE']
@table = "RubyE2E#{Time.now.getutc.to_i}"
@table_with_mapping = "RubyE2E#{Time.now.getutc.to_i}"
@table_without_mapping = "RubyE2ENoMapping#{Time.now.getutc.to_i}"
@mapping_name = "test_mapping"
@csv_file = "dataset.csv"

@logstash_config = %{
input {
file { path => "#{@input_file}"}
}
filter {
csv { columns => [#{@csv_columns}]}
}
output {
file { path => "#{@output_file}"}
stdout { codec => rubydebug }
kusto {
path => "tmp%{+YYYY-MM-dd-HH-mm}.txt"
ingest_url => "#{@ingest_url}"
app_id => "#{@app_id}"
app_key => "#{@app_kay}"
app_tenant => "#{@tenant_id}"
database => "#{@database}"
table => "#{@table}"
json_mapping => "#{@mapping_name}"
input {
file { path => "#{@input_file}"}
}
filter {
csv { columns => [#{@csv_columns}]}
}
output {
file { path => "#{@output_file}"}
stdout { codec => rubydebug }
kusto {
path => "tmp%{+YYYY-MM-dd-HH-mm}.txt"
ingest_url => "#{@ingest_url}"
app_id => "#{@app_id}"
app_key => "#{@app_key}"
app_tenant => "#{@tenant_id}"
database => "#{@database}"
table => "#{@table_with_mapping}"
json_mapping => "#{@mapping_name}"
}
kusto {
path => "nomaptmp%{+YYYY-MM-dd-HH-mm}.txt"
ingest_url => "#{@ingest_url}"
app_id => "#{@app_id}"
app_key => "#{@app_key}"
app_tenant => "#{@tenant_id}"
database => "#{@database}"
table => "#{@table_without_mapping}"
}
}
}
}
end

def create_table_and_mapping
puts "Creating table #{@table}"
@query_client.execute(@database, ".drop table #{@table} ifexists")
sleep(1)
@query_client.execute(@database, ".create table #{@table} #{@columns}")
@query_client.execute(@database, ".alter table #{@table} policy ingestionbatching @'{\"MaximumBatchingTimeSpan\":\"00:00:10\", \"MaximumNumberOfItems\": 1, \"MaximumRawDataSizeMB\": 100}'
")
@query_client.execute(@database, ".create table #{@table} ingestion json mapping '#{@mapping_name}' '#{File.read("dataset_mapping.json")}'")
Array[@table_with_mapping, @table_without_mapping].each { |tableop|
puts "Creating table #{tableop}"
@query_client.execute(@database, ".drop table #{tableop} ifexists")
sleep(1)
@query_client.execute(@database, ".create table #{tableop} #{@columns}")
@query_client.execute(@database, ".alter table #{tableop} policy ingestionbatching @'{\"MaximumBatchingTimeSpan\":\"00:00:10\", \"MaximumNumberOfItems\": 1, \"MaximumRawDataSizeMB\": 100}'")
}
# Mapping only for one table
@query_client.execute(@database, ".create table #{@table_with_mapping} ingestion json mapping '#{@mapping_name}' '#{File.read("dataset_mapping.json")}'")
end


def drop_and_cleanup
Array[@table_with_mapping, @table_without_mapping].each { |tableop|
puts "Dropping table #{tableop}"
@query_client.execute(@database, ".drop table #{tableop} ifexists")
sleep(1)
}
end

def run_logstash
Expand All @@ -76,44 +97,43 @@ def run_logstash
def assert_data
max_timeout = 10
csv_data = CSV.read(@csv_file)

(0...max_timeout).each do |_|
begin
sleep(5)
query = @query_client.execute(@database, "#{@table} | sort by rownumber asc")
result = query.getPrimaryResults()
raise "Wrong count - expected #{csv_data.length}, got #{result.count()}" unless result.count() == csv_data.length
rescue Exception => e
puts "Error: #{e}"
end
(0...csv_data.length).each do |i|
result.next()
puts "Item #{i}"
(0...@column_count).each do |j|
csv_item = csv_data[i][j]
result_item = result.getObject(j) == nil ? "null" : result.getString(j)

#special cases for data that is different in csv vs kusto
if j == 4 #kusto boolean field
csv_item = csv_item.to_s == "1" ? "true" : "false"
elsif j == 12 # date formatting
csv_item = csv_item.sub(".0000000", "")
elsif j == 15 # numbers as text
result_item = i.to_s
elsif j == 17 #null
next
Array[@table_with_mapping, @table_without_mapping].each { |tableop|
puts "Validating results for table #{tableop}"
(0...max_timeout).each do |_|
begin
sleep(5)
query = @query_client.execute(@database, "#{tableop} | sort by rownumber asc")
result = query.getPrimaryResults()
raise "Wrong count - expected #{csv_data.length}, got #{result.count()} in table #{tableop}" unless result.count() == csv_data.length
rescue Exception => e
puts "Error: #{e}"
end
(0...csv_data.length).each do |i|
result.next()
puts "Item #{i}"
(0...@column_count).each do |j|
csv_item = csv_data[i][j]
result_item = result.getObject(j) == nil ? "null" : result.getString(j)
#special cases for data that is different in csv vs kusto
if j == 4 #kusto boolean field
csv_item = csv_item.to_s == "1" ? "true" : "false"
elsif j == 12 # date formatting
csv_item = csv_item.sub(".0000000", "")
elsif j == 15 # numbers as text
result_item = i.to_s
elsif j == 17 #null
next
end
puts " csv[#{j}] = #{csv_item}"
puts " result[#{j}] = #{result_item}"
raise "Result Doesn't match csv in table #{tableop}" unless csv_item == result_item
end
puts " csv[#{j}] = #{csv_item}"
puts " result[#{j}] = #{result_item}"
raise "Result Doesn't match csv" unless csv_item == result_item
puts ""
end
puts ""
return
end
return

end
raise "Failed after timeouts"

raise "Failed after timeouts"
}
end

def start
Expand All @@ -122,8 +142,8 @@ def start
create_table_and_mapping
run_logstash
assert_data
end

drop_and_cleanup
end
end

E2E::new().start
7 changes: 4 additions & 3 deletions lib/logstash/outputs/kusto.rb
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ class LogStash::Outputs::Kusto < LogStash::Outputs::Base
# Mapping name - Used by Kusto to map each attribute from incoming event JSON strings to the appropriate column in the table.
# Note that this must be in JSON format, as this is the interface between Logstash and Kusto
# Make this optional as name resolution in the JSON mapping can be done based on attribute names in the incoming event JSON strings
config :json_mapping, validate: :string, , default: nil
config :json_mapping, validate: :string, default: nil

# Mapping name - deprecated, use json_mapping
config :mapping, validate: :string, deprecated: true
Expand Down Expand Up @@ -127,11 +127,12 @@ def register
@io_mutex = Mutex.new

final_mapping = json_mapping
if final_mapping.empty?
if final_mapping.nil? || final_mapping.empty?
final_mapping = mapping
end

# TODO: add id to the tmp path to support multiple outputs of the same type
# TODO: add id to the tmp path to support multiple outputs of the same type.
# TODO: Fix final_mapping when dynamic routing is supported
# add fields from the meta that will note the destination of the events in the file
@path = if dynamic_event_routing
File.expand_path("#{path}.%{[@metadata][database]}.%{[@metadata][table]}.%{[@metadata][final_mapping]}")
Expand Down

0 comments on commit 1dc7edc

Please sign in to comment.