Skip to content

Commit

Permalink
Added tests in kusto_spec.rb
Browse files Browse the repository at this point in the history
  • Loading branch information
MonishkaDas committed Oct 2, 2024
1 parent ed85986 commit 43f9bea
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 47 deletions.
10 changes: 7 additions & 3 deletions lib/logstash/outputs/kusto.rb
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,11 @@ def register
public
def multi_receive_encoded(events_and_encoded)
events_and_encoded.each do |event, encoded|
@buffer << encoded
begin
@buffer << encoded
rescue => e
@logger.error("Error processing event: #{e.message}")
end
end
end

Expand All @@ -114,10 +118,10 @@ def close
@cleaner.stop unless @cleaner.nil?
@buffer.shutdown
@ingestor.stop unless @ingestor.nil?
@logger.info("Kusto output plugin closed")
@logger.info("Kusto output plugin Closed")
end

private
public
def flush_buffer(events)
return if events.empty?
@logger.info("flush_buffer with #{events.size} events")
Expand Down
44 changes: 0 additions & 44 deletions spec/outputs/kusto/ingestor_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -92,50 +92,6 @@
RSpec.configuration.reporter.message("Completed test: one of appid or managedid has to be provided with empty managed identity and appid")
end
end

end

# describe 'receiving events' do

# context 'with non-zero flush interval' do
# let(:temporary_output_file) { Stud::Temporary.pathname }

# let(:event_count) { 100 }
# let(:flush_interval) { 5 }

# let(:events) do
# event_count.times.map do |idx|
# LogStash::Event.new('subject' => idx)
# end
# end

# let(:output) { described_class.new(options.merge( {'path' => temporary_output_file, 'flush_interval' => flush_interval, 'delete_temp_files' => false } )) }

# before(:each) { output.register }

# after(:each) do
# output.close
# File.exist?(temporary_output_file) && File.unlink(temporary_output_file)
# File.exist?(temporary_output_file + '.kusto') && File.unlink(temporary_output_file + '.kusto')
# end

# it 'eventually flushes without receiving additional events' do
# output.multi_receive_encoded(events)

# # events should not all be flushed just yet...
# expect(File.read(temporary_output_file)).to satisfy("have less than #{event_count} lines") do |contents|
# contents && contents.lines.count < event_count
# end

# # wait for the flusher to run...
# sleep(flush_interval + 1)

# # events should all be flushed
# expect(File.read(temporary_output_file)).to satisfy("have exactly #{event_count} lines") do |contents|
# contents && contents.lines.count == event_count
# end
# end
# end

# end
end
120 changes: 120 additions & 0 deletions spec/outputs/kusto_spec.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# encoding: utf-8
require_relative "../spec_helpers.rb"
require 'logstash/outputs/kusto'
require 'logstash/codecs/plain'
require 'logstash/event'
Expand All @@ -20,6 +21,125 @@
"max_interval" => 10
} }

describe '#initialize' do
it 'initializes with the correct options' do
RSpec.configuration.reporter.message("Running test: initializes with the correct options")
kusto = described_class.new(options.merge("app_key" => LogStash::Util::Password.new("mykey")))
expect(kusto.instance_variable_get(:@path)).to eq("./kusto_tst/%{+YYYY-MM-dd-HH-mm}")
expect(kusto.instance_variable_get(:@ingest_url)).to eq("https://ingest-sdkse2etest.eastus.kusto.windows.net/")
expect(kusto.instance_variable_get(:@app_id)).to eq("myid")
expect(kusto.instance_variable_get(:@app_key).value).to eq("mykey")
expect(kusto.instance_variable_get(:@app_tenant)).to eq("mytenant")
expect(kusto.instance_variable_get(:@database)).to eq("mydatabase")
expect(kusto.instance_variable_get(:@table)).to eq("mytable")
expect(kusto.instance_variable_get(:@json_mapping)).to eq("mymapping")
expect(kusto.instance_variable_get(:@proxy_host)).to eq("localhost")
expect(kusto.instance_variable_get(:@proxy_port)).to eq(3128)
expect(kusto.instance_variable_get(:@proxy_protocol)).to eq("https")
expect(kusto.instance_variable_get(:@max_size)).to eq(2000)
expect(kusto.instance_variable_get(:@max_interval)).to eq(10)
RSpec.configuration.reporter.message("Completed test: initializes with the correct options")
end
end

describe '#multi_receive_encoded' do
it 'processes events and adds them to the buffer' do
RSpec.configuration.reporter.message("Running test: processes events and adds them to the buffer")
kusto = described_class.new(options)
kusto.register

events = [LogStash::Event.new("message" => "test1"), LogStash::Event.new("message" => "test2")]
encoded_events = events.map { |e| [e, e.to_json] }
kusto.multi_receive_encoded(encoded_events)

buffer = kusto.instance_variable_get(:@buffer)
expect(buffer.instance_variable_get(:@buffer).size).to eq(2)
RSpec.configuration.reporter.message("Completed test: processes events and adds them to the buffer")
end

it 'handles errors during event processing' do
RSpec.configuration.reporter.message("Running test: handles errors during event processing")
kusto = described_class.new(options)
kusto.register

allow(kusto.instance_variable_get(:@buffer)).to receive(:<<).and_raise(StandardError.new("Test error"))
events = [LogStash::Event.new("message" => "test1")]
encoded_events = events.map { |e| [e, e.to_json] }

expect { kusto.multi_receive_encoded(encoded_events) }.not_to raise_error
RSpec.configuration.reporter.message("Completed test: handles errors during event processing")
end
end

describe '#register' do
it 'raises an error for invalid configurations' do
RSpec.configuration.reporter.message("Running test: raises an error for invalid configurations")
invalid_options = options.merge("ingest_url" => nil)
expect { described_class.new(invalid_options).register }.to raise_error(LogStash::ConfigurationError)
RSpec.configuration.reporter.message("Completed test: raises an error for invalid configurations")
end
end

describe '#flush_buffer' do
it 'handles errors during buffer flushing' do
RSpec.configuration.reporter.message("Running test: handles errors during buffer flushing")
kusto = described_class.new(options)
kusto.register

allow(kusto.instance_variable_get(:@ingestor)).to receive(:upload_async).and_raise(StandardError.new("Test error"))
events = [LogStash::Event.new("message" => "test1")]
encoded_events = events.map { |e| [e, e.to_json] }
kusto.multi_receive_encoded(encoded_events)

expect { kusto.flush_buffer(encoded_events) }.not_to raise_error
RSpec.configuration.reporter.message("Completed test: handles errors during buffer flushing")
end

it 'flushes the buffer when max_size is reached' do
RSpec.configuration.reporter.message("Running test: flushes the buffer when max_size is reached")
kusto = described_class.new(options.merge("max_size" => 1)) # Set max_size to 1MB for testing
kusto.register

events = [LogStash::Event.new("message" => "test1")]
encoded_events = events.map { |e| [e, e.to_json] }
expect(kusto.instance_variable_get(:@ingestor)).to receive(:upload_async).with(anything)
kusto.multi_receive_encoded(encoded_events)
kusto.flush_buffer(encoded_events) # Pass the encoded events
RSpec.configuration.reporter.message("Completed test: flushes the buffer when max_size is reached")
end

it 'flushes the buffer when max_interval is reached' do
RSpec.configuration.reporter.message("Running test: flushes the buffer when max_interval is reached")
kusto = described_class.new(options.merge("max_interval" => 1)) # Set max_interval to 1 second for testing
kusto.register

events = [LogStash::Event.new("message" => "test1")]
encoded_events = events.map { |e| [e, e.to_json] }
kusto.multi_receive_encoded(encoded_events)
sleep(2) # Wait for the interval to pass

expect(kusto.instance_variable_get(:@ingestor)).to receive(:upload_async).with(anything)
kusto.flush_buffer(encoded_events) # Pass the encoded events
RSpec.configuration.reporter.message("Completed test: flushes the buffer when max_interval is reached")
end

it 'eventually flushes without receiving additional events based on max_interval' do
RSpec.configuration.reporter.message("Running test: eventually flushes without receiving additional events based on max_interval")
kusto = described_class.new(options.merge("max_interval" => 1)) # Set max_interval to 1 second for testing
kusto.register

events = [LogStash::Event.new("message" => "test1")]
encoded_events = events.map { |e| [e, e.to_json] }
kusto.multi_receive_encoded(encoded_events)

# Wait for the interval to pass
sleep(2)

expect(kusto.instance_variable_get(:@ingestor)).to receive(:upload_async).with(anything)
kusto.flush_buffer(encoded_events) # Pass the encoded events
RSpec.configuration.reporter.message("Completed test: eventually flushes without receiving additional events based on max_interval")
end
end

describe '#close' do
it 'shuts down the buffer and ingestor' do
Expand Down

0 comments on commit 43f9bea

Please sign in to comment.