Skip to content

Commit

Permalink
Added max_retries and failed_items_path() configs
Browse files Browse the repository at this point in the history
Updated buffer_flush and upload()
  • Loading branch information
MonishkaDas committed Oct 28, 2024
1 parent 5941adf commit 0be738e
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 142 deletions.
8 changes: 7 additions & 1 deletion lib/logstash/outputs/kusto.rb
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,17 @@ class LogStash::Outputs::Kusto < LogStash::Outputs::Base
# Maximum interval (in seconds) before the buffer gets flushed, defaults to 10
config :max_interval, validate: :number, default: 10

# Maximum number of retries before the flush fails, defaults to 3
config :max_retries, validate: :number, default: 3

# Path to store failed items, defaults to nil
config :failed_items_path, validate: :string, default: nil

default :codec, 'json_lines'

def register
# Initialize the custom buffer with size and interval
@buffer = LogStash::Outputs::CustomSizeBasedBuffer.new(@max_size, @max_interval) do |events|
@buffer = LogStash::Outputs::CustomSizeBasedBuffer.new(@max_size, @max_interval, @max_retries, @failed_items_path) do |events|
flush_buffer(events)
end

Expand Down
170 changes: 67 additions & 103 deletions lib/logstash/outputs/kusto/custom_size_based_buffer.rb
Original file line number Diff line number Diff line change
@@ -1,179 +1,143 @@
require 'logger'
require 'thread'
require 'fileutils'
require 'securerandom'
require 'csv'

module LogStash
module Outputs
class CustomSizeBasedBuffer
def initialize(max_size_mb, max_interval, &flush_callback)
def initialize(max_size_mb = 10, max_interval = 10, max_retries = 3, failed_items_path = nil, &flush_callback)
@buffer_config = {
max_size: max_size_mb * 1024 * 1024, # Convert MB to bytes
max_interval: max_interval,
buffer_dir: './tmp/buffer_storage/',
max_retries: max_retries,
failed_items_path: failed_items_path,
logger: Logger.new(STDOUT)
}
@buffer_state = {
pending_items: [],
pending_size: 0,
last_flush: Time.now.to_i,
timer: nil,
network_down: false
}
@flush_callback = flush_callback
@shutdown = false
@pending_mutex = Mutex.new
@flush_mutex = Mutex.new
load_buffer_from_files
@buffer_config[:logger].info("CustomSizeBasedBuffer initialized with max_size: #{max_size_mb} MB, max_interval: #{max_interval} seconds")

# Start the timer thread after a delay to ensure initializations are completed
Thread.new do
sleep(10)
@buffer_state[:timer] = Thread.new do
timer: Thread.new do
loop do
sleep(@buffer_config[:max_interval])
buffer_flush(force: true)
end
end
end
}
@flush_callback = flush_callback
@pending_mutex = Mutex.new
@flush_mutex = Mutex.new
@buffer_config[:logger].info("CustomSizeBasedBuffer initialized with max_size: #{max_size_mb} MB, max_interval: #{max_interval} seconds, max_retries: #{max_retries}, failed_items_path: #{failed_items_path}")
end

def <<(event)
while buffer_full? do
sleep 0.1
end
def <<(event)
while buffer_full? do
sleep 0.1
end

@pending_mutex.synchronize do
@buffer_state[:pending_items] << event
@buffer_state[:pending_size] += event.bytesize
end

# Trigger a flush if the buffer size exceeds the maximum size
if buffer_full?
buffer_flush(force: true)
end
end

def shutdown
@buffer_config[:logger].info("Shutting down buffer")
@shutdown = true
@buffer_state[:timer].kill
buffer_flush(final: true)
clear_buffer_files
end

private
private

def buffer_full?
@pending_mutex.synchronize do
@buffer_state[:pending_size] >= @buffer_config[:max_size]
end
end

def buffer_flush(options = {})
force = options[:force] || options[:final]
final = options[:final]
def buffer_full?
@pending_mutex.synchronize do
@buffer_state[:pending_size] >= @buffer_config[:max_size]
end
end

if final
@flush_mutex.lock
elsif !@flush_mutex.try_lock
return 0
end
def buffer_flush(options = {})
force = options[:force] || options[:final]
final = options[:final]

items_flushed = 0
if final
@flush_mutex.lock
elsif !@flush_mutex.try_lock
return 0
end

begin
outgoing_items = []
outgoing_size = 0

@pending_mutex.synchronize do
return 0 if @buffer_state[:pending_size] == 0

time_since_last_flush = Time.now.to_i - @buffer_state[:last_flush]
time_since_last_flush = Time.now.to_i - @buffer_state[:last_flush]

if !force && @buffer_state[:pending_size] < @buffer_config[:max_size] && time_since_last_flush < @buffer_config[:max_interval]
return 0
end

if force
@buffer_config[:logger].info("Time-based flush triggered after #{@buffer_config[:max_interval]} seconds")
elsif @buffer_state[:pending_size] >= @buffer_config[:max_size]
@buffer_config[:logger].info("Size-based flush triggered at #{@buffer_state[:pending_size]} bytes was reached")
else
@buffer_config[:logger].info("Flush triggered without specific condition")
if time_since_last_flush >= @buffer_config[:max_interval]
@buffer_config[:logger].info("Time-based flush triggered after #{@buffer_config[:max_interval]} seconds")
else
@buffer_config[:logger].info("Size-based flush triggered when #{@buffer_state[:pending_size]} bytes was reached")
end
end

outgoing_items = @buffer_state[:pending_items].dup
outgoing_size = @buffer_state[:pending_size]
@buffer_state[:pending_items] = []
@buffer_state[:pending_size] = 0
buffer_initialize
end

retries = 0
begin
@buffer_config[:logger].info("Flushing: #{outgoing_items.size} items and #{outgoing_size} bytes to the network")
@flush_callback.call(outgoing_items) # Pass the list of events to the callback
@buffer_state[:network_down] = false # Reset network status after successful flush
flush_buffer_files # Flush buffer files if any exist
@buffer_state[:last_flush] = Time.now.to_i
@buffer_config[:logger].info("Flush completed. Flushed #{outgoing_items.size} events, #{outgoing_size} bytes")
rescue => e
@buffer_config[:logger].error("Flush failed: #{e.message}")
@buffer_state[:network_down] = true
save_buffer_to_file(outgoing_items)
retries += 1
if retries <= @buffer_config[:max_retries]
@buffer_config[:logger].error("Flush failed: #{e.message}. \nRetrying (#{retries}/#{@buffer_config[:max_retries]})...")
sleep 1
retry
else
@buffer_config[:logger].error("Max retries reached. Failed to flush #{outgoing_items.size} items and #{outgoing_size} bytes")
handle_failed_flush(outgoing_items, e.message)
end
end

@buffer_state[:last_flush] = Time.now.to_i
@buffer_config[:logger].info("Flush completed. Flushed #{outgoing_items.size} events, #{outgoing_size} bytes")

items_flushed = outgoing_items.size
ensure
@flush_mutex.unlock
end

items_flushed
end

def save_buffer_to_file(events)
buffer_state_copy = {
pending_items: events,
pending_size: events.sum(&:bytesize)
}

::FileUtils.mkdir_p(@buffer_config[:buffer_dir]) # Ensure directory exists
file_path = ::File.join(@buffer_config[:buffer_dir], "buffer_state_#{Time.now.to_i}_#{SecureRandom.uuid}.log")
::File.open(file_path, 'w') do |file|
file.write(Marshal.dump(buffer_state_copy))
end
@buffer_config[:logger].info("Saved buffer state to file: #{file_path}")
end

def load_buffer_from_files
Dir.glob(::File.join(@buffer_config[:buffer_dir], 'buffer_state_*.log')).each do |file_path|
def handle_failed_flush(items, error_message)
if @buffer_config[:failed_items_path]
begin
buffer_state = Marshal.load(::File.read(file_path))
@buffer_state[:pending_items].concat(buffer_state[:pending_items])
@buffer_state[:pending_size] += buffer_state[:pending_size]
::File.delete(file_path)
rescue => e
@buffer_config[:logger].error("Failed to load buffer from file: #{e.message}")
end
end
@buffer_config[:logger].info("Loaded buffer state from files")
end

def flush_buffer_files
Dir.glob(::File.join(@buffer_config[:buffer_dir], 'buffer_state_*.log')).each do |file_path|
begin
buffer_state = Marshal.load(::File.read(file_path))
@buffer_config[:logger].info("Flushed from file: #{file_path}")
@flush_callback.call(buffer_state[:pending_items])
::File.delete(file_path)
@buffer_config[:logger].info("Flushed and deleted buffer state file: #{file_path}")
::File.open(@buffer_config[:failed_items_path], 'a') do |file|
items.each do |item|
file.puts(item)
end
end
@buffer_config[:logger].info("Failed items stored in #{@buffer_config[:failed_items_path]}")
rescue => e
@buffer_config[:logger].error("Failed to flush buffer state file: #{e.message}")
break
@buffer_config[:logger].error("Failed to store items: #{e.message}")
end
else
@buffer_config[:logger].warn("No failed_items_path configured. Data loss may occur.")
end
end

def clear_buffer_files
Dir.glob(::File.join(@buffer_config[:buffer_dir], 'buffer_state_*.log')).each do |file_path|
::File.delete(file_path)
end
@buffer_config[:logger].info("Cleared all buffer state files")
def buffer_initialize
@buffer_state[:pending_items] = []
@buffer_state[:pending_size] = 0
end
end
end
Expand Down
52 changes: 14 additions & 38 deletions lib/logstash/outputs/kusto/ingestor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ def initialize(kusto_logstash_configuration, logger, threadpool = DEFAULT_THREAD
end

@ingestion_properties = kusto_java.ingest.IngestionProperties.new(@kusto_logstash_configuration.kusto_ingest.database, @kusto_logstash_configuration.kusto_ingest.table)

@ingestion_properties.setReportLevel(Java::ComMicrosoftAzureKustoIngest::IngestionProperties::IngestionReportLevel::FAILURES_AND_SUCCESSES)
@ingestion_properties.setReportMethod(Java::ComMicrosoftAzureKustoIngest::IngestionProperties::IngestionReportMethod::TABLE)
if @kusto_logstash_configuration.kusto_ingest.is_mapping_ref_provided
@logger.debug('Using mapping reference.', @kusto_logstash_configuration.kusto_ingest.json_mapping)
@ingestion_properties.setIngestionMapping(@kusto_logstash_configuration.kusto_ingest.json_mapping, kusto_java.ingest.IngestionMapping::IngestionMappingKind::JSON)
Expand All @@ -101,59 +102,34 @@ def upload_async(data)
exception = e
end
end

# Wait for the task to complete and check for exceptions
@workers_pool.shutdown
@workers_pool.wait_for_termination

if exception
@logger.error('StandardError in upload_async.', exception: exception.class, message: exception.message, backtrace: exception.backtrace)
raise exception
end

raise exception if exception
rescue Exception => e
@logger.error('StandardError in upload_async.', exception: e.class, message: e.message, backtrace: e.backtrace)
raise e
end

def upload(data)
@logger.info("Sending data to Kusto")

@logger.debug("Sending data to Kusto")
if data.size > 0
ingestionLatch = java.util.concurrent.CountDownLatch.new(1)

Thread.new do
begin
data_source_info = Java::com.microsoft.azure.kusto.ingest.source.StreamSourceInfo.new(java.io.ByteArrayInputStream.new(data.to_java_bytes))
ingestion_result = @kusto_client.ingestFromStream(data_source_info, @ingestion_properties)

# Check the ingestion status
status = ingestion_result.getIngestionStatusCollection.get(0)
if status.status != Java::com.microsoft.azure.kusto.ingest.result.OperationStatus::Queued
raise "Failed upload: #{status.errorCodeString}"
end
@logger.info("Final ingestion status: #{status.status}")
rescue => e
@logger.error('Error during ingestFromStream.', exception: e.class, message: e.message, backtrace: e.backtrace)
if e.message.include?("network")
raise e
end
ensure
ingestionLatch.countDown()
end
end

# Wait for the ingestion to complete with a timeout
if !ingestionLatch.await(30, java.util.concurrent.TimeUnit::SECONDS)
@logger.error('Ingestion timed out, possible network issue.')
raise 'Ingestion timed out, possible network issue.'
begin
data_source_info = Java::com.microsoft.azure.kusto.ingest.source.StreamSourceInfo.new(java.io.ByteArrayInputStream.new(data.to_java_bytes))
result = @kusto_client.ingestFromStream(data_source_info, @ingestion_properties)
rescue => e
@logger.error('Uploading failed.', exception: e.class, message: e.message, backtrace: e.backtrace)
raise e
end
else
@logger.warn("Data is empty and is not ingested.")
end
@logger.info("Data sent to Kusto.")

@logger.debug("Data sent to Kusto.")
rescue => e
@logger.error('Uploading failed.', exception: e.class, message: e.message, backtrace: e.backtrace)
raise e # Raise the original error if ingestion fails
raise e
end

def stop
Expand Down

0 comments on commit 0be738e

Please sign in to comment.