diff --git a/lib/logstash/outputs/kusto.rb b/lib/logstash/outputs/kusto.rb index 77a1c30..9f2ea24 100755 --- a/lib/logstash/outputs/kusto.rb +++ b/lib/logstash/outputs/kusto.rb @@ -109,11 +109,6 @@ def multi_receive_encoded(events_and_encoded) rescue => e @logger.error("Error processing event: #{e.message}") end - begin - @buffer << encoded - rescue => e - @logger.error("Error processing event: #{e.message}") - end end end diff --git a/lib/logstash/outputs/kusto/custom_size_based_buffer.rb b/lib/logstash/outputs/kusto/custom_size_based_buffer.rb index 8ca083e..11de9a9 100644 --- a/lib/logstash/outputs/kusto/custom_size_based_buffer.rb +++ b/lib/logstash/outputs/kusto/custom_size_based_buffer.rb @@ -1,95 +1,90 @@ require 'logger' +require 'thread' module LogStash module Outputs class CustomSizeBasedBuffer - def initialize(max_size, max_interval, &flush_callback) - @max_size = max_size * 1024 * 1024 # Convert MB to bytes - @max_interval = max_interval + def initialize(max_size_mb, max_interval, &flush_callback) + @buffer_config = { + max_size: max_size_mb * 1024 * 1024, # Convert MB to bytes + max_interval: max_interval, + logger: Logger.new(STDOUT) + } + @buffer_state = { + pending_items: [], + pending_size: 0, + last_flush: Time.now.to_i, + timer: Thread.new do + loop do + sleep(@buffer_config[:max_interval]) + buffer_flush(force: true) + end + end + } @flush_callback = flush_callback - @buffer = [] - @buffer_size = 0 # Initialize buffer size - @mutex = Mutex.new - @last_flush_time = Time.now @shutdown = false - @flusher_condition = ConditionVariable.new - - # Initialize logger - @logger = Logger.new(STDOUT) - @logger.level = Logger::DEBUG - - start_flusher_thread + @buffer_config[:logger].info("CustomSizeBasedBuffer initialized with max_size: #{max_size_mb} MB, max_interval: #{max_interval} seconds") end def <<(event) - @mutex.synchronize do - event_size = event.bytesize - @buffer << event - @buffer_size += event_size - if @buffer_size >= @max_size - @logger.debug("Size-based flush triggered after #{@max_size} bytes was reached") - flush - end + while buffer_full? do + sleep 0.1 end + + @buffer_state[:pending_items] << event + @buffer_state[:pending_size] += event.bytesize + + buffer_flush end def shutdown - @logger.info("Shutting down CustomSizeBasedBuffer") - @mutex.synchronize do - @shutdown = true - @flusher_condition.signal # Wake up the flusher thread - end - @flusher_thread.join - @logger.info("Flusher thread joined") - flush # Ensure final flush after shutdown - @logger.info("CustomSizeBasedBuffer shutdown complete") + @buffer_config[:logger].info("Shutting down buffer") + @shutdown = true + @buffer_state[:timer].kill + buffer_flush(final: true) end private - def start_flusher_thread - @flusher_thread = Thread.new do - loop do - @mutex.synchronize do - if @shutdown - @logger.debug("Flusher thread exiting due to shutdown signal") - break - end - if Time.now - @last_flush_time >= @max_interval - @logger.debug("Time-based flush triggered after #{@max_interval} seconds") - flush - end - @flusher_condition.wait(@mutex, @max_interval) # Wait for either the interval or shutdown signal - end - end - end + def buffer_full? + @buffer_state[:pending_size] >= @buffer_config[:max_size] end + def buffer_flush(options = {}) + force = options[:force] || options[:final] + final = options[:final] - def flush_if_needed - @mutex.synchronize do - if Time.now - @last_flush_time >= @max_interval - @logger.debug("Time-based flush triggered in flush_if_needed") - flush - end + if @buffer_state[:pending_size] == 0 + return 0 end - end - def flush - return if @buffer.empty? - - begin - @logger.debug("Flushing buffer with #{@buffer.size} events and #{@buffer_size} bytes") - @flush_callback.call(@buffer) - rescue => e - # Log the error and continue, - @logger.error("Error during flush: #{e.message}") - @logger.error(e.backtrace.join("\n")) - ensure - @buffer.clear - @buffer_size = 0 # Reset buffer size - @last_flush_time = Time.now + time_since_last_flush = Time.now.to_i - @buffer_state[:last_flush] + + if !force && @buffer_state[:pending_size] < @buffer_config[:max_size] && time_since_last_flush < @buffer_config[:max_interval] + return 0 end + + if force + @buffer_config[:logger].info("Time-based flush triggered after #{@buffer_config[:max_interval]} seconds") + elsif @buffer_state[:pending_size] >= @buffer_config[:max_size] + @buffer_config[:logger].info("Size-based flush triggered at #{@buffer_state[:pending_size]} bytes was reached") + end + + outgoing_items = @buffer_state[:pending_items] + outgoing_size = @buffer_state[:pending_size] + buffer_initialize + + @flush_callback.call(outgoing_items) # Pass the list of events to the callback + + @buffer_state[:last_flush] = Time.now.to_i + @buffer_config[:logger].info("Flush completed. Flushed #{outgoing_items.size} events") + + outgoing_items.size + end + + def buffer_initialize + @buffer_state[:pending_items] = [] + @buffer_state[:pending_size] = 0 end end end