Skip to content

Commit

Permalink
Updated custom_size_based_buffer.rb
Browse files Browse the repository at this point in the history
  • Loading branch information
MonishkaDas committed Oct 16, 2024
1 parent fce671f commit 379da3b
Showing 1 changed file with 38 additions and 20 deletions.
58 changes: 38 additions & 20 deletions lib/logstash/outputs/kusto/custom_size_based_buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ def initialize(max_size_mb, max_interval, &flush_callback)
}
@flush_callback = flush_callback
@shutdown = false
@pending_mutex = Mutex.new
@flush_mutex = Mutex.new
@buffer_config[:logger].info("CustomSizeBasedBuffer initialized with max_size: #{max_size_mb} MB, max_interval: #{max_interval} seconds")
end

Expand All @@ -31,8 +33,10 @@ def <<(event)
sleep 0.1
end

@buffer_state[:pending_items] << event
@buffer_state[:pending_size] += event.bytesize
@pending_mutex.synchronize do
@buffer_state[:pending_items] << event
@buffer_state[:pending_size] += event.bytesize
end

buffer_flush
end
Expand All @@ -47,39 +51,53 @@ def shutdown
private

def buffer_full?
@buffer_state[:pending_size] >= @buffer_config[:max_size]
@pending_mutex.synchronize do
@buffer_state[:pending_size] >= @buffer_config[:max_size]
end
end

def buffer_flush(options = {})
force = options[:force] || options[:final]
final = options[:final]

if @buffer_state[:pending_size] == 0
if final
@flush_mutex.lock
elsif !@flush_mutex.try_lock
return 0
end

time_since_last_flush = Time.now.to_i - @buffer_state[:last_flush]
items_flushed = 0

if !force && @buffer_state[:pending_size] < @buffer_config[:max_size] && time_since_last_flush < @buffer_config[:max_interval]
return 0
end
begin
@pending_mutex.synchronize do
return 0 if @buffer_state[:pending_size] == 0

if force
@buffer_config[:logger].info("Time-based flush triggered after #{@buffer_config[:max_interval]} seconds")
elsif @buffer_state[:pending_size] >= @buffer_config[:max_size]
@buffer_config[:logger].info("Size-based flush triggered at #{@buffer_state[:pending_size]} bytes was reached")
end
time_since_last_flush = Time.now.to_i - @buffer_state[:last_flush]

outgoing_items = @buffer_state[:pending_items]
outgoing_size = @buffer_state[:pending_size]
buffer_initialize
return 0 if !force && @buffer_state[:pending_size] < @buffer_config[:max_size] && time_since_last_flush < @buffer_config[:max_interval]

@flush_callback.call(outgoing_items) # Pass the list of events to the callback
if force
@buffer_config[:logger].info("Time-based flush triggered after #{@buffer_config[:max_interval]} seconds")
elsif @buffer_state[:pending_size] >= @buffer_config[:max_size]
@buffer_config[:logger].info("Size-based flush triggered at #{@buffer_state[:pending_size]} bytes was reached")
end

@buffer_state[:last_flush] = Time.now.to_i
@buffer_config[:logger].info("Flush completed. Flushed #{outgoing_items.size} events")
outgoing_items = @buffer_state[:pending_items].dup
outgoing_size = @buffer_state[:pending_size]
buffer_initialize

@flush_callback.call(outgoing_items) # Pass the list of events to the callback

@buffer_state[:last_flush] = Time.now.to_i
@buffer_config[:logger].info("Flush completed. Flushed #{outgoing_items.size} events, #{outgoing_size} bytes")

items_flushed = outgoing_items.size
end
ensure
@flush_mutex.unlock
end

outgoing_items.size
items_flushed
end

def buffer_initialize
Expand Down

0 comments on commit 379da3b

Please sign in to comment.