Skip to content

Commit

Permalink
Updated custom_size_based_buffer.rb
Browse files Browse the repository at this point in the history
  • Loading branch information
MonishkaDas committed Oct 15, 2024
1 parent 4a85b94 commit 9c84304
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 73 deletions.
5 changes: 0 additions & 5 deletions lib/logstash/outputs/kusto.rb
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,6 @@ def multi_receive_encoded(events_and_encoded)
rescue => e
@logger.error("Error processing event: #{e.message}")
end
begin
@buffer << encoded
rescue => e
@logger.error("Error processing event: #{e.message}")
end
end
end

Expand Down
131 changes: 63 additions & 68 deletions lib/logstash/outputs/kusto/custom_size_based_buffer.rb
Original file line number Diff line number Diff line change
@@ -1,95 +1,90 @@
require 'logger'
require 'thread'

module LogStash
module Outputs
class CustomSizeBasedBuffer
def initialize(max_size, max_interval, &flush_callback)
@max_size = max_size * 1024 * 1024 # Convert MB to bytes
@max_interval = max_interval
def initialize(max_size_mb, max_interval, &flush_callback)
@buffer_config = {
max_size: max_size_mb * 1024 * 1024, # Convert MB to bytes
max_interval: max_interval,
logger: Logger.new(STDOUT)
}
@buffer_state = {
pending_items: [],
pending_size: 0,
last_flush: Time.now.to_i,
timer: Thread.new do
loop do
sleep(@buffer_config[:max_interval])
buffer_flush(force: true)
end
end
}
@flush_callback = flush_callback
@buffer = []
@buffer_size = 0 # Initialize buffer size
@mutex = Mutex.new
@last_flush_time = Time.now
@shutdown = false
@flusher_condition = ConditionVariable.new

# Initialize logger
@logger = Logger.new(STDOUT)
@logger.level = Logger::DEBUG

start_flusher_thread
@buffer_config[:logger].info("CustomSizeBasedBuffer initialized with max_size: #{max_size_mb} MB, max_interval: #{max_interval} seconds")
end

def <<(event)
@mutex.synchronize do
event_size = event.bytesize
@buffer << event
@buffer_size += event_size
if @buffer_size >= @max_size
@logger.debug("Size-based flush triggered after #{@max_size} bytes was reached")
flush
end
while buffer_full? do
sleep 0.1
end

@buffer_state[:pending_items] << event
@buffer_state[:pending_size] += event.bytesize

buffer_flush
end

def shutdown
@logger.info("Shutting down CustomSizeBasedBuffer")
@mutex.synchronize do
@shutdown = true
@flusher_condition.signal # Wake up the flusher thread
end
@flusher_thread.join
@logger.info("Flusher thread joined")
flush # Ensure final flush after shutdown
@logger.info("CustomSizeBasedBuffer shutdown complete")
@buffer_config[:logger].info("Shutting down buffer")
@shutdown = true
@buffer_state[:timer].kill
buffer_flush(final: true)
end

private

def start_flusher_thread
@flusher_thread = Thread.new do
loop do
@mutex.synchronize do
if @shutdown
@logger.debug("Flusher thread exiting due to shutdown signal")
break
end
if Time.now - @last_flush_time >= @max_interval
@logger.debug("Time-based flush triggered after #{@max_interval} seconds")
flush
end
@flusher_condition.wait(@mutex, @max_interval) # Wait for either the interval or shutdown signal
end
end
end
def buffer_full?
@buffer_state[:pending_size] >= @buffer_config[:max_size]
end

def buffer_flush(options = {})
force = options[:force] || options[:final]
final = options[:final]

def flush_if_needed
@mutex.synchronize do
if Time.now - @last_flush_time >= @max_interval
@logger.debug("Time-based flush triggered in flush_if_needed")
flush
end
if @buffer_state[:pending_size] == 0
return 0
end
end

def flush
return if @buffer.empty?

begin
@logger.debug("Flushing buffer with #{@buffer.size} events and #{@buffer_size} bytes")
@flush_callback.call(@buffer)
rescue => e
# Log the error and continue,
@logger.error("Error during flush: #{e.message}")
@logger.error(e.backtrace.join("\n"))
ensure
@buffer.clear
@buffer_size = 0 # Reset buffer size
@last_flush_time = Time.now
time_since_last_flush = Time.now.to_i - @buffer_state[:last_flush]

if !force && @buffer_state[:pending_size] < @buffer_config[:max_size] && time_since_last_flush < @buffer_config[:max_interval]
return 0
end

if force
@buffer_config[:logger].info("Time-based flush triggered after #{@buffer_config[:max_interval]} seconds")
elsif @buffer_state[:pending_size] >= @buffer_config[:max_size]
@buffer_config[:logger].info("Size-based flush triggered at #{@buffer_state[:pending_size]} bytes was reached")
end

outgoing_items = @buffer_state[:pending_items]
outgoing_size = @buffer_state[:pending_size]
buffer_initialize

@flush_callback.call(outgoing_items) # Pass the list of events to the callback

@buffer_state[:last_flush] = Time.now.to_i
@buffer_config[:logger].info("Flush completed. Flushed #{outgoing_items.size} events")

outgoing_items.size
end

def buffer_initialize
@buffer_state[:pending_items] = []
@buffer_state[:pending_size] = 0
end
end
end
Expand Down

0 comments on commit 9c84304

Please sign in to comment.