Skip to content

Commit

Permalink
Updated kusto_spec.rb and ingestor_spec.rb
Browse files Browse the repository at this point in the history
Testing spec files
  • Loading branch information
MonishkaDas committed Sep 26, 2024
1 parent 531046c commit c813a5b
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 4 deletions.
4 changes: 3 additions & 1 deletion lib/logstash/outputs/kusto.rb
Original file line number Diff line number Diff line change
Expand Up @@ -109,16 +109,18 @@ def multi_receive_encoded(events_and_encoded)
end

def close
@logger.info("Closing Kusto output plugin")
@flusher.stop unless @flusher.nil?
@cleaner.stop unless @cleaner.nil?
@buffer.shutdown
@ingestor.stop unless @ingestor.nil?
@logger.info("Kusto output plugin closed")
end

private
def flush_buffer(events)
return if events.empty?

@logger.info("flush_buffer with #{events.size} events")
begin
@ingestor.upload_async(events.join)
rescue => e
Expand Down
18 changes: 15 additions & 3 deletions lib/logstash/outputs/kusto/custom_size_based_buffer.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
require 'logger'

module LogStash
module Outputs
class CustomSizeBasedBuffer
Expand All @@ -11,13 +13,20 @@ def initialize(max_size, max_interval, &flush_callback)
@shutdown = false
@flusher_condition = ConditionVariable.new

# Initialize logger
@logger = Logger.new(STDOUT)
@logger.level = Logger::DEBUG

start_flusher_thread
end

def <<(event)
@mutex.synchronize do
@buffer << event
flush if @buffer.size >= @max_size
if @buffer.size >= @max_size
@logger.debug("Size-based flush triggered")
flush
end
end
end

Expand All @@ -38,6 +47,7 @@ def start_flusher_thread
@mutex.synchronize do
break if @shutdown
if Time.now - @last_flush_time >= @max_interval
@logger.debug("Time-based flush triggered")
flush
end
@flusher_condition.wait(@mutex, @max_interval) # Wait for either the interval or shutdown signal
Expand All @@ -50,6 +60,7 @@ def start_flusher_thread
def flush_if_needed
@mutex.synchronize do
if Time.now - @last_flush_time >= @max_interval
@logger.debug("Time-based flush triggered in flush_if_needed")
flush
end
end
Expand All @@ -59,11 +70,12 @@ def flush
return if @buffer.empty?

begin
@logger.debug("Flushing buffer with #{@buffer.size} events")
@flush_callback.call(@buffer)
rescue => e
# Log the error and continue,
puts "Error during flush: #{e.message}"
puts e.backtrace.join("\n")
@logger.error("Error during flush: #{e.message}")
@logger.error(e.backtrace.join("\n"))
ensure
@buffer.clear
@last_flush_time = Time.now
Expand Down

0 comments on commit c813a5b

Please sign in to comment.