diff --git a/lib/logstash/outputs/kusto.rb b/lib/logstash/outputs/kusto.rb index c07144d..bfea63e 100755 --- a/lib/logstash/outputs/kusto.rb +++ b/lib/logstash/outputs/kusto.rb @@ -109,16 +109,18 @@ def multi_receive_encoded(events_and_encoded) end def close + @logger.info("Closing Kusto output plugin") @flusher.stop unless @flusher.nil? @cleaner.stop unless @cleaner.nil? @buffer.shutdown @ingestor.stop unless @ingestor.nil? + @logger.info("Kusto output plugin closed") end private def flush_buffer(events) return if events.empty? - + @logger.info("flush_buffer with #{events.size} events") begin @ingestor.upload_async(events.join) rescue => e diff --git a/lib/logstash/outputs/kusto/custom_size_based_buffer.rb b/lib/logstash/outputs/kusto/custom_size_based_buffer.rb index 264131a..3b893c0 100644 --- a/lib/logstash/outputs/kusto/custom_size_based_buffer.rb +++ b/lib/logstash/outputs/kusto/custom_size_based_buffer.rb @@ -1,3 +1,5 @@ +require 'logger' + module LogStash module Outputs class CustomSizeBasedBuffer @@ -11,13 +13,20 @@ def initialize(max_size, max_interval, &flush_callback) @shutdown = false @flusher_condition = ConditionVariable.new + # Initialize logger + @logger = Logger.new(STDOUT) + @logger.level = Logger::DEBUG + start_flusher_thread end def <<(event) @mutex.synchronize do @buffer << event - flush if @buffer.size >= @max_size + if @buffer.size >= @max_size + @logger.debug("Size-based flush triggered") + flush + end end end @@ -38,6 +47,7 @@ def start_flusher_thread @mutex.synchronize do break if @shutdown if Time.now - @last_flush_time >= @max_interval + @logger.debug("Time-based flush triggered") flush end @flusher_condition.wait(@mutex, @max_interval) # Wait for either the interval or shutdown signal @@ -50,6 +60,7 @@ def start_flusher_thread def flush_if_needed @mutex.synchronize do if Time.now - @last_flush_time >= @max_interval + @logger.debug("Time-based flush triggered in flush_if_needed") flush end end @@ -59,11 +70,12 @@ def flush return if @buffer.empty? begin + @logger.debug("Flushing buffer with #{@buffer.size} events") @flush_callback.call(@buffer) rescue => e # Log the error and continue, - puts "Error during flush: #{e.message}" - puts e.backtrace.join("\n") + @logger.error("Error during flush: #{e.message}") + @logger.error(e.backtrace.join("\n")) ensure @buffer.clear @last_flush_time = Time.now