From 0be738e368a3c92620c94f2615b9efc5a51ebeb3 Mon Sep 17 00:00:00 2001 From: MonishkaDas Date: Sun, 27 Oct 2024 15:42:21 +0530 Subject: [PATCH] Added max_retries and failed_items_path() configs Updated buffer_flush and upload() --- lib/logstash/outputs/kusto.rb | 8 +- .../outputs/kusto/custom_size_based_buffer.rb | 170 +++++++----------- lib/logstash/outputs/kusto/ingestor.rb | 52 ++---- 3 files changed, 88 insertions(+), 142 deletions(-) diff --git a/lib/logstash/outputs/kusto.rb b/lib/logstash/outputs/kusto.rb index 18a1d50..246980d 100755 --- a/lib/logstash/outputs/kusto.rb +++ b/lib/logstash/outputs/kusto.rb @@ -75,11 +75,17 @@ class LogStash::Outputs::Kusto < LogStash::Outputs::Base # Maximum interval (in seconds) before the buffer gets flushed, defaults to 10 config :max_interval, validate: :number, default: 10 + # Maximum number of retries before the flush fails, defaults to 3 + config :max_retries, validate: :number, default: 3 + + # Path to store failed items, defaults to nil + config :failed_items_path, validate: :string, default: nil + default :codec, 'json_lines' def register # Initialize the custom buffer with size and interval - @buffer = LogStash::Outputs::CustomSizeBasedBuffer.new(@max_size, @max_interval) do |events| + @buffer = LogStash::Outputs::CustomSizeBasedBuffer.new(@max_size, @max_interval, @max_retries, @failed_items_path) do |events| flush_buffer(events) end diff --git a/lib/logstash/outputs/kusto/custom_size_based_buffer.rb b/lib/logstash/outputs/kusto/custom_size_based_buffer.rb index 6a01227..646d61a 100644 --- a/lib/logstash/outputs/kusto/custom_size_based_buffer.rb +++ b/lib/logstash/outputs/kusto/custom_size_based_buffer.rb @@ -1,82 +1,74 @@ require 'logger' require 'thread' -require 'fileutils' -require 'securerandom' +require 'csv' module LogStash module Outputs class CustomSizeBasedBuffer - def initialize(max_size_mb, max_interval, &flush_callback) + def initialize(max_size_mb = 10, max_interval = 10, max_retries = 3, failed_items_path = nil, &flush_callback) @buffer_config = { max_size: max_size_mb * 1024 * 1024, # Convert MB to bytes max_interval: max_interval, - buffer_dir: './tmp/buffer_storage/', + max_retries: max_retries, + failed_items_path: failed_items_path, logger: Logger.new(STDOUT) } @buffer_state = { pending_items: [], pending_size: 0, last_flush: Time.now.to_i, - timer: nil, - network_down: false - } - @flush_callback = flush_callback - @shutdown = false - @pending_mutex = Mutex.new - @flush_mutex = Mutex.new - load_buffer_from_files - @buffer_config[:logger].info("CustomSizeBasedBuffer initialized with max_size: #{max_size_mb} MB, max_interval: #{max_interval} seconds") - - # Start the timer thread after a delay to ensure initializations are completed - Thread.new do - sleep(10) - @buffer_state[:timer] = Thread.new do + timer: Thread.new do loop do sleep(@buffer_config[:max_interval]) buffer_flush(force: true) end end - end + } + @flush_callback = flush_callback + @pending_mutex = Mutex.new + @flush_mutex = Mutex.new + @buffer_config[:logger].info("CustomSizeBasedBuffer initialized with max_size: #{max_size_mb} MB, max_interval: #{max_interval} seconds, max_retries: #{max_retries}, failed_items_path: #{failed_items_path}") end - def <<(event) - while buffer_full? do - sleep 0.1 - end + def <<(event) + while buffer_full? do + sleep 0.1 + end @pending_mutex.synchronize do @buffer_state[:pending_items] << event @buffer_state[:pending_size] += event.bytesize end + + # Trigger a flush if the buffer size exceeds the maximum size + if buffer_full? + buffer_flush(force: true) + end end def shutdown @buffer_config[:logger].info("Shutting down buffer") - @shutdown = true @buffer_state[:timer].kill buffer_flush(final: true) - clear_buffer_files end - private + private - def buffer_full? - @pending_mutex.synchronize do - @buffer_state[:pending_size] >= @buffer_config[:max_size] - end - end - - def buffer_flush(options = {}) - force = options[:force] || options[:final] - final = options[:final] + def buffer_full? + @pending_mutex.synchronize do + @buffer_state[:pending_size] >= @buffer_config[:max_size] + end + end - if final - @flush_mutex.lock - elsif !@flush_mutex.try_lock - return 0 - end + def buffer_flush(options = {}) + force = options[:force] || options[:final] + final = options[:final] - items_flushed = 0 + if final + @flush_mutex.lock + elsif !@flush_mutex.try_lock + return 0 + end begin outgoing_items = [] @@ -84,96 +76,68 @@ def buffer_flush(options = {}) @pending_mutex.synchronize do return 0 if @buffer_state[:pending_size] == 0 - - time_since_last_flush = Time.now.to_i - @buffer_state[:last_flush] + time_since_last_flush = Time.now.to_i - @buffer_state[:last_flush] if !force && @buffer_state[:pending_size] < @buffer_config[:max_size] && time_since_last_flush < @buffer_config[:max_interval] return 0 end if force - @buffer_config[:logger].info("Time-based flush triggered after #{@buffer_config[:max_interval]} seconds") - elsif @buffer_state[:pending_size] >= @buffer_config[:max_size] - @buffer_config[:logger].info("Size-based flush triggered at #{@buffer_state[:pending_size]} bytes was reached") - else - @buffer_config[:logger].info("Flush triggered without specific condition") + if time_since_last_flush >= @buffer_config[:max_interval] + @buffer_config[:logger].info("Time-based flush triggered after #{@buffer_config[:max_interval]} seconds") + else + @buffer_config[:logger].info("Size-based flush triggered when #{@buffer_state[:pending_size]} bytes was reached") + end end outgoing_items = @buffer_state[:pending_items].dup outgoing_size = @buffer_state[:pending_size] - @buffer_state[:pending_items] = [] - @buffer_state[:pending_size] = 0 + buffer_initialize end + retries = 0 begin + @buffer_config[:logger].info("Flushing: #{outgoing_items.size} items and #{outgoing_size} bytes to the network") @flush_callback.call(outgoing_items) # Pass the list of events to the callback - @buffer_state[:network_down] = false # Reset network status after successful flush - flush_buffer_files # Flush buffer files if any exist + @buffer_state[:last_flush] = Time.now.to_i + @buffer_config[:logger].info("Flush completed. Flushed #{outgoing_items.size} events, #{outgoing_size} bytes") rescue => e - @buffer_config[:logger].error("Flush failed: #{e.message}") - @buffer_state[:network_down] = true - save_buffer_to_file(outgoing_items) + retries += 1 + if retries <= @buffer_config[:max_retries] + @buffer_config[:logger].error("Flush failed: #{e.message}. \nRetrying (#{retries}/#{@buffer_config[:max_retries]})...") + sleep 1 + retry + else + @buffer_config[:logger].error("Max retries reached. Failed to flush #{outgoing_items.size} items and #{outgoing_size} bytes") + handle_failed_flush(outgoing_items, e.message) + end end - @buffer_state[:last_flush] = Time.now.to_i - @buffer_config[:logger].info("Flush completed. Flushed #{outgoing_items.size} events, #{outgoing_size} bytes") - - items_flushed = outgoing_items.size ensure @flush_mutex.unlock end - - items_flushed - end - - def save_buffer_to_file(events) - buffer_state_copy = { - pending_items: events, - pending_size: events.sum(&:bytesize) - } - - ::FileUtils.mkdir_p(@buffer_config[:buffer_dir]) # Ensure directory exists - file_path = ::File.join(@buffer_config[:buffer_dir], "buffer_state_#{Time.now.to_i}_#{SecureRandom.uuid}.log") - ::File.open(file_path, 'w') do |file| - file.write(Marshal.dump(buffer_state_copy)) - end - @buffer_config[:logger].info("Saved buffer state to file: #{file_path}") end - def load_buffer_from_files - Dir.glob(::File.join(@buffer_config[:buffer_dir], 'buffer_state_*.log')).each do |file_path| + def handle_failed_flush(items, error_message) + if @buffer_config[:failed_items_path] begin - buffer_state = Marshal.load(::File.read(file_path)) - @buffer_state[:pending_items].concat(buffer_state[:pending_items]) - @buffer_state[:pending_size] += buffer_state[:pending_size] - ::File.delete(file_path) - rescue => e - @buffer_config[:logger].error("Failed to load buffer from file: #{e.message}") - end - end - @buffer_config[:logger].info("Loaded buffer state from files") - end - - def flush_buffer_files - Dir.glob(::File.join(@buffer_config[:buffer_dir], 'buffer_state_*.log')).each do |file_path| - begin - buffer_state = Marshal.load(::File.read(file_path)) - @buffer_config[:logger].info("Flushed from file: #{file_path}") - @flush_callback.call(buffer_state[:pending_items]) - ::File.delete(file_path) - @buffer_config[:logger].info("Flushed and deleted buffer state file: #{file_path}") + ::File.open(@buffer_config[:failed_items_path], 'a') do |file| + items.each do |item| + file.puts(item) + end + end + @buffer_config[:logger].info("Failed items stored in #{@buffer_config[:failed_items_path]}") rescue => e - @buffer_config[:logger].error("Failed to flush buffer state file: #{e.message}") - break + @buffer_config[:logger].error("Failed to store items: #{e.message}") end + else + @buffer_config[:logger].warn("No failed_items_path configured. Data loss may occur.") end end - def clear_buffer_files - Dir.glob(::File.join(@buffer_config[:buffer_dir], 'buffer_state_*.log')).each do |file_path| - ::File.delete(file_path) - end - @buffer_config[:logger].info("Cleared all buffer state files") + def buffer_initialize + @buffer_state[:pending_items] = [] + @buffer_state[:pending_size] = 0 end end end diff --git a/lib/logstash/outputs/kusto/ingestor.rb b/lib/logstash/outputs/kusto/ingestor.rb index 392cb0a..f06a282 100755 --- a/lib/logstash/outputs/kusto/ingestor.rb +++ b/lib/logstash/outputs/kusto/ingestor.rb @@ -75,7 +75,8 @@ def initialize(kusto_logstash_configuration, logger, threadpool = DEFAULT_THREAD end @ingestion_properties = kusto_java.ingest.IngestionProperties.new(@kusto_logstash_configuration.kusto_ingest.database, @kusto_logstash_configuration.kusto_ingest.table) - + @ingestion_properties.setReportLevel(Java::ComMicrosoftAzureKustoIngest::IngestionProperties::IngestionReportLevel::FAILURES_AND_SUCCESSES) + @ingestion_properties.setReportMethod(Java::ComMicrosoftAzureKustoIngest::IngestionProperties::IngestionReportMethod::TABLE) if @kusto_logstash_configuration.kusto_ingest.is_mapping_ref_provided @logger.debug('Using mapping reference.', @kusto_logstash_configuration.kusto_ingest.json_mapping) @ingestion_properties.setIngestionMapping(@kusto_logstash_configuration.kusto_ingest.json_mapping, kusto_java.ingest.IngestionMapping::IngestionMappingKind::JSON) @@ -101,59 +102,34 @@ def upload_async(data) exception = e end end - # Wait for the task to complete and check for exceptions @workers_pool.shutdown @workers_pool.wait_for_termination - - if exception - @logger.error('StandardError in upload_async.', exception: exception.class, message: exception.message, backtrace: exception.backtrace) - raise exception - end + + raise exception if exception rescue Exception => e @logger.error('StandardError in upload_async.', exception: e.class, message: e.message, backtrace: e.backtrace) raise e end def upload(data) - @logger.info("Sending data to Kusto") - + @logger.debug("Sending data to Kusto") if data.size > 0 - ingestionLatch = java.util.concurrent.CountDownLatch.new(1) - - Thread.new do - begin - data_source_info = Java::com.microsoft.azure.kusto.ingest.source.StreamSourceInfo.new(java.io.ByteArrayInputStream.new(data.to_java_bytes)) - ingestion_result = @kusto_client.ingestFromStream(data_source_info, @ingestion_properties) - - # Check the ingestion status - status = ingestion_result.getIngestionStatusCollection.get(0) - if status.status != Java::com.microsoft.azure.kusto.ingest.result.OperationStatus::Queued - raise "Failed upload: #{status.errorCodeString}" - end - @logger.info("Final ingestion status: #{status.status}") - rescue => e - @logger.error('Error during ingestFromStream.', exception: e.class, message: e.message, backtrace: e.backtrace) - if e.message.include?("network") - raise e - end - ensure - ingestionLatch.countDown() - end - end - - # Wait for the ingestion to complete with a timeout - if !ingestionLatch.await(30, java.util.concurrent.TimeUnit::SECONDS) - @logger.error('Ingestion timed out, possible network issue.') - raise 'Ingestion timed out, possible network issue.' + begin + data_source_info = Java::com.microsoft.azure.kusto.ingest.source.StreamSourceInfo.new(java.io.ByteArrayInputStream.new(data.to_java_bytes)) + result = @kusto_client.ingestFromStream(data_source_info, @ingestion_properties) + rescue => e + @logger.error('Uploading failed.', exception: e.class, message: e.message, backtrace: e.backtrace) + raise e end else @logger.warn("Data is empty and is not ingested.") end - @logger.info("Data sent to Kusto.") + + @logger.debug("Data sent to Kusto.") rescue => e @logger.error('Uploading failed.', exception: e.class, message: e.message, backtrace: e.backtrace) - raise e # Raise the original error if ingestion fails + raise e end def stop