Skip to content

Commit

Permalink
Updated with new implementation
Browse files Browse the repository at this point in the history
Persists the incoming data during network down time. Checks for network
availabilty before retrying.
  • Loading branch information
MonishkaDas committed Nov 11, 2024
1 parent 64b4d34 commit ffc4f88
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 75 deletions.
10 changes: 5 additions & 5 deletions lib/logstash/outputs/kusto.rb
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,6 @@ class LogStash::Outputs::Kusto < LogStash::Outputs::Base
default :codec, 'json_lines'

def register
# Initialize the custom buffer with size and interval
@buffer = LogStash::Outputs::CustomSizeBasedBuffer.new(@max_size, @max_interval) do |events|
flush_buffer(events)
end

@io_mutex = Mutex.new

final_mapping = json_mapping
Expand All @@ -98,6 +93,11 @@ def register
if @path
@logger.warn("The 'path' configuration option is deprecated and will be removed in a future release.")
end
sleep(30)
# Initialize the custom buffer with size and interval
@buffer = LogStash::Outputs::CustomSizeBasedBuffer.new(@max_size, @max_interval) do |events|
flush_buffer(events)
end
end


Expand Down
148 changes: 88 additions & 60 deletions lib/logstash/outputs/kusto/custom_size_based_buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
require 'thread'
require 'fileutils'
require 'securerandom'
require 'net/http'
require 'uri'

module LogStash
module Outputs
Expand All @@ -27,23 +29,21 @@ def initialize(max_size_mb, max_interval, &flush_callback)
load_buffer_from_files
@buffer_config[:logger].info("CustomSizeBasedBuffer initialized with max_size: #{max_size_mb} MB, max_interval: #{max_interval} seconds")

# Start the timer thread after a delay to ensure initializations are completed
Thread.new do
sleep(10)
@buffer_state[:timer] = Thread.new do
loop do
sleep(@buffer_config[:max_interval])
buffer_flush(force: true)
end
# Start the timer thread
@buffer_state[:timer] = Thread.new do
loop do
sleep(@buffer_config[:max_interval])
prepare_flush(force: true)
end
end
end

def <<(event)
while buffer_full? do
prepare_flush(force: true) # Flush when buffer is full
sleep 0.1
end

@pending_mutex.synchronize do
@buffer_state[:pending_items] << event
@buffer_state[:pending_size] += event.bytesize
Expand All @@ -54,7 +54,7 @@ def shutdown
@buffer_config[:logger].info("Shutting down buffer")
@shutdown = true
@buffer_state[:timer].kill
buffer_flush(final: true)
prepare_flush(final: true)
clear_buffer_files
end

Expand All @@ -66,65 +66,102 @@ def buffer_full?
end
end

def buffer_flush(options = {})
def prepare_flush(options = {})
force = options[:force] || options[:final]
final = options[:final]

if final
@flush_mutex.lock
elsif !@flush_mutex.try_lock
return 0
end

items_flushed = 0

begin
outgoing_items = []
outgoing_size = 0
outgoing_items = []
outgoing_size = 0

@pending_mutex.synchronize do
return 0 if @buffer_state[:pending_size] == 0
@pending_mutex.synchronize do
return 0 if @buffer_state[:pending_size] == 0

time_since_last_flush = Time.now.to_i - @buffer_state[:last_flush]
time_since_last_flush = Time.now.to_i - @buffer_state[:last_flush]

if !force && @buffer_state[:pending_size] < @buffer_config[:max_size] && time_since_last_flush < @buffer_config[:max_interval]
return 0
end
if !force && @buffer_state[:pending_size] < @buffer_config[:max_size] && time_since_last_flush < @buffer_config[:max_interval]
return 0
end

if force
@buffer_config[:logger].info("Time-based flush triggered after #{@buffer_config[:max_interval]} seconds")
elsif @buffer_state[:pending_size] >= @buffer_config[:max_size]
@buffer_config[:logger].info("Size-based flush triggered at #{@buffer_state[:pending_size]} bytes was reached")
else
@buffer_config[:logger].info("Flush triggered without specific condition")
end
if time_since_last_flush >= @buffer_config[:max_interval]
@buffer_config[:logger].info("Time-based flush triggered after #{@buffer_config[:max_interval]} seconds")
else
@buffer_config[:logger].info("Size-based flush triggered at #{@buffer_state[:pending_size]} bytes was reached")
end

outgoing_items = @buffer_state[:pending_items].dup
outgoing_size = @buffer_state[:pending_size]
if @buffer_state[:network_down]
save_buffer_to_file(@buffer_state[:pending_items])
@buffer_state[:pending_items] = []
@buffer_state[:pending_size] = 0
return 0
end

outgoing_items = @buffer_state[:pending_items].dup
outgoing_size = @buffer_state[:pending_size]
@buffer_state[:pending_items] = []
@buffer_state[:pending_size] = 0
end

if Dir.glob(::File.join(@buffer_config[:buffer_dir], 'buffer_state_*.log')).any?
flush_buffer_files
end

Thread.new { perform_flush(outgoing_items) }
end

def perform_flush(events, file_path = nil)
if file_path
unless ::File.exist?(file_path)
return 0
end
begin
@flush_callback.call(outgoing_items) # Pass the list of events to the callback
@buffer_state[:network_down] = false # Reset network status after successful flush
flush_buffer_files # Flush buffer files if any exist
buffer_state = Marshal.load(::File.read(file_path))
events = buffer_state[:pending_items]
rescue => e
@buffer_config[:logger].error("Flush failed: #{e.message}")
@buffer_state[:network_down] = true
save_buffer_to_file(outgoing_items)
@buffer_config[:logger].error("Failed to load buffer from file: #{e.message}")
return 0
end

end

@flush_mutex.lock

begin
@buffer_config[:logger].info("Flushing #{events.size} events, #{events.sum(&:bytesize)} bytes")
@flush_callback.call(events) # Pass the list of events to the callback
@buffer_state[:network_down] = false # Reset network status after successful flush
@buffer_state[:last_flush] = Time.now.to_i
@buffer_config[:logger].info("Flush completed. Flushed #{outgoing_items.size} events, #{outgoing_size} bytes")

items_flushed = outgoing_items.size
@buffer_config[:logger].info("Flush completed. Flushed #{events.size} events, #{events.sum(&:bytesize)} bytes")

if file_path
::File.delete(file_path)
@buffer_config[:logger].info("Flushed and deleted buffer state file: #{file_path}")
end

rescue => e
@buffer_config[:logger].error("Flush failed: #{e.message}")
@buffer_state[:network_down] = true

while true
sleep(2) # Wait before checking network availability again
if network_available?
@buffer_config[:logger].info("Network is back up. Retrying flush.")
retry
end
end
ensure
@flush_mutex.unlock
end

items_flushed
end

def network_available?
begin
uri = URI('http://www.google.com')
response = Net::HTTP.get_response(uri)
response.is_a?(Net::HTTPSuccess)
rescue
false
end
end


def save_buffer_to_file(events)
buffer_state_copy = {
Expand All @@ -137,7 +174,7 @@ def save_buffer_to_file(events)
::File.open(file_path, 'w') do |file|
file.write(Marshal.dump(buffer_state_copy))
end
@buffer_config[:logger].info("Saved buffer state to file: #{file_path}")
@buffer_config[:logger].info("Saved #{events.size} events to file: #{file_path}")
end

def load_buffer_from_files
Expand All @@ -156,16 +193,7 @@ def load_buffer_from_files

def flush_buffer_files
Dir.glob(::File.join(@buffer_config[:buffer_dir], 'buffer_state_*.log')).each do |file_path|
begin
buffer_state = Marshal.load(::File.read(file_path))
@buffer_config[:logger].info("Flushed from file: #{file_path}")
@flush_callback.call(buffer_state[:pending_items])
::File.delete(file_path)
@buffer_config[:logger].info("Flushed and deleted buffer state file: #{file_path}")
rescue => e
@buffer_config[:logger].error("Failed to flush buffer state file: #{e.message}")
break
end
Thread.new { perform_flush([], file_path) }
end
end

Expand Down
12 changes: 2 additions & 10 deletions lib/logstash/outputs/kusto/ingestor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -156,18 +156,10 @@ def upload(data)
begin
data_source_info = Java::com.microsoft.azure.kusto.ingest.source.StreamSourceInfo.new(java.io.ByteArrayInputStream.new(data.to_java_bytes))
ingestion_result = @kusto_client.ingestFromStream(data_source_info, @ingestion_properties)

# Check the ingestion status
status = ingestion_result.getIngestionStatusCollection.get(0)
if status.status != Java::com.microsoft.azure.kusto.ingest.result.OperationStatus::Queued
raise "Failed upload: #{status.errorCodeString}"
end
@logger.info("Final ingestion status: #{status.status}")

rescue => e
@logger.error('Error during ingestFromStream.', exception: e.class, message: e.message, backtrace: e.backtrace)
if e.message.include?("network")
raise e
end
raise e
ensure
ingestionLatch.countDown()
end
Expand Down

0 comments on commit ffc4f88

Please sign in to comment.