Skip to content

Commit

Permalink
added custom_size_based_buffer class
Browse files Browse the repository at this point in the history
Added the feature for size and time based flushing of buffer.
Added config options for max_interval and max_size. Once either one is
reached the events stored in the buffer will be flushed.

Updated kusto.rb
  • Loading branch information
monishkadas-ms authored and MonishkaDas committed Sep 19, 2024
1 parent cbb7444 commit 5b2af15
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 36 deletions.
8 changes: 7 additions & 1 deletion lib/logstash/outputs/kusto.rb
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,12 @@ def flush(events)
end
end

public
def shutdown
@buffer.shutdown
@ingestor.stop unless @ingestor.nil?
end

def close
@flusher.stop unless @flusher.nil?
@cleaner.stop unless @cleaner.nil?
Expand All @@ -252,7 +258,7 @@ def close
end
end

@ingestor.stop unless @ingestor.nil?
shutdown
end

private
Expand Down
94 changes: 59 additions & 35 deletions lib/logstash/outputs/kusto/custom_size_based_buffer.rb
Original file line number Diff line number Diff line change
@@ -1,50 +1,74 @@
module LogStash
module Outputs
class CustomSizeBasedBuffer
def initialize(max_size, max_interval, &flush_callback)
@max_size = max_size
@max_interval = max_interval
@flush_callback = flush_callback
@buffer = []
@mutex = Mutex.new
@last_flush_time = Time.now

start_flusher_thread
module Outputs
class CustomSizeBasedBuffer
def initialize(max_size, max_interval, &flush_callback)
@max_size = max_size
@max_interval = max_interval
@flush_callback = flush_callback
@buffer = []
@mutex = Mutex.new
@last_flush_time = Time.now
@shutdown = false
@flusher_condition = ConditionVariable.new

start_flusher_thread
end

def <<(event)
@mutex.synchronize do
@buffer << event
flush if @buffer.size >= @max_size
end

def <<(event)
@mutex.synchronize do
@buffer << event
flush if @buffer.size >= @max_size
end
end

def shutdown
@mutex.synchronize do
@shutdown = true
@flusher_condition.signal # Wake up the flusher thread
end

private
def start_flusher_thread
Thread.new do
loop do
sleep @max_interval
flush_if_needed
@flusher_thread.join
flush # Ensure final flush after shutdown
end

private

def start_flusher_thread
@flusher_thread = Thread.new do
loop do
@mutex.synchronize do
break if @shutdown
if Time.now - @last_flush_time >= @max_interval
flush
end
@flusher_condition.wait(@mutex, @max_interval) # Wait for either the interval or shutdown signal
end
end
end

def flush_if_needed
@mutex.synchronize do
if Time.now - @last_flush_time >= @max_interval
flush
end
end


def flush_if_needed
@mutex.synchronize do
if Time.now - @last_flush_time >= @max_interval
flush
end
end

def flush
return if @buffer.empty?

end

def flush
return if @buffer.empty?

begin
@flush_callback.call(@buffer)
rescue => e
# Log the error and continue,
puts "Error during flush: #{e.message}"
puts e.backtrace.join("\n")
ensure
@buffer.clear
@last_flush_time = Time.now
end
end
end
end
end

0 comments on commit 5b2af15

Please sign in to comment.