diff --git a/lib/logstash/outputs/kusto/custom_size_based_buffer.rb b/lib/logstash/outputs/kusto/custom_size_based_buffer.rb index 11de9a9..c371408 100644 --- a/lib/logstash/outputs/kusto/custom_size_based_buffer.rb +++ b/lib/logstash/outputs/kusto/custom_size_based_buffer.rb @@ -23,6 +23,8 @@ def initialize(max_size_mb, max_interval, &flush_callback) } @flush_callback = flush_callback @shutdown = false + @pending_mutex = Mutex.new + @flush_mutex = Mutex.new @buffer_config[:logger].info("CustomSizeBasedBuffer initialized with max_size: #{max_size_mb} MB, max_interval: #{max_interval} seconds") end @@ -31,8 +33,10 @@ def <<(event) sleep 0.1 end - @buffer_state[:pending_items] << event - @buffer_state[:pending_size] += event.bytesize + @pending_mutex.synchronize do + @buffer_state[:pending_items] << event + @buffer_state[:pending_size] += event.bytesize + end buffer_flush end @@ -47,39 +51,53 @@ def shutdown private def buffer_full? - @buffer_state[:pending_size] >= @buffer_config[:max_size] + @pending_mutex.synchronize do + @buffer_state[:pending_size] >= @buffer_config[:max_size] + end end def buffer_flush(options = {}) force = options[:force] || options[:final] final = options[:final] - if @buffer_state[:pending_size] == 0 + if final + @flush_mutex.lock + elsif !@flush_mutex.try_lock return 0 end - time_since_last_flush = Time.now.to_i - @buffer_state[:last_flush] + items_flushed = 0 - if !force && @buffer_state[:pending_size] < @buffer_config[:max_size] && time_since_last_flush < @buffer_config[:max_interval] - return 0 - end + begin + @pending_mutex.synchronize do + return 0 if @buffer_state[:pending_size] == 0 - if force - @buffer_config[:logger].info("Time-based flush triggered after #{@buffer_config[:max_interval]} seconds") - elsif @buffer_state[:pending_size] >= @buffer_config[:max_size] - @buffer_config[:logger].info("Size-based flush triggered at #{@buffer_state[:pending_size]} bytes was reached") - end + time_since_last_flush = Time.now.to_i - @buffer_state[:last_flush] - outgoing_items = @buffer_state[:pending_items] - outgoing_size = @buffer_state[:pending_size] - buffer_initialize + return 0 if !force && @buffer_state[:pending_size] < @buffer_config[:max_size] && time_since_last_flush < @buffer_config[:max_interval] - @flush_callback.call(outgoing_items) # Pass the list of events to the callback + if force + @buffer_config[:logger].info("Time-based flush triggered after #{@buffer_config[:max_interval]} seconds") + elsif @buffer_state[:pending_size] >= @buffer_config[:max_size] + @buffer_config[:logger].info("Size-based flush triggered at #{@buffer_state[:pending_size]} bytes was reached") + end - @buffer_state[:last_flush] = Time.now.to_i - @buffer_config[:logger].info("Flush completed. Flushed #{outgoing_items.size} events") + outgoing_items = @buffer_state[:pending_items].dup + outgoing_size = @buffer_state[:pending_size] + buffer_initialize + + @flush_callback.call(outgoing_items) # Pass the list of events to the callback + + @buffer_state[:last_flush] = Time.now.to_i + @buffer_config[:logger].info("Flush completed. Flushed #{outgoing_items.size} events, #{outgoing_size} bytes") + + items_flushed = outgoing_items.size + end + ensure + @flush_mutex.unlock + end - outgoing_items.size + items_flushed end def buffer_initialize