From ffc4f88f0438e465565152f841a8f65e608a3f3b Mon Sep 17 00:00:00 2001 From: MonishkaDas Date: Mon, 11 Nov 2024 18:05:25 +0530 Subject: [PATCH] Updated with new implementation Persists the incoming data during network down time. Checks for network availabilty before retrying. --- lib/logstash/outputs/kusto.rb | 10 +- .../outputs/kusto/custom_size_based_buffer.rb | 148 +++++++++++------- lib/logstash/outputs/kusto/ingestor.rb | 12 +- 3 files changed, 95 insertions(+), 75 deletions(-) diff --git a/lib/logstash/outputs/kusto.rb b/lib/logstash/outputs/kusto.rb index 6fa160b..5f36356 100755 --- a/lib/logstash/outputs/kusto.rb +++ b/lib/logstash/outputs/kusto.rb @@ -77,11 +77,6 @@ class LogStash::Outputs::Kusto < LogStash::Outputs::Base default :codec, 'json_lines' def register - # Initialize the custom buffer with size and interval - @buffer = LogStash::Outputs::CustomSizeBasedBuffer.new(@max_size, @max_interval) do |events| - flush_buffer(events) - end - @io_mutex = Mutex.new final_mapping = json_mapping @@ -98,6 +93,11 @@ def register if @path @logger.warn("The 'path' configuration option is deprecated and will be removed in a future release.") end + sleep(30) + # Initialize the custom buffer with size and interval + @buffer = LogStash::Outputs::CustomSizeBasedBuffer.new(@max_size, @max_interval) do |events| + flush_buffer(events) + end end diff --git a/lib/logstash/outputs/kusto/custom_size_based_buffer.rb b/lib/logstash/outputs/kusto/custom_size_based_buffer.rb index 7c68026..3c882cb 100644 --- a/lib/logstash/outputs/kusto/custom_size_based_buffer.rb +++ b/lib/logstash/outputs/kusto/custom_size_based_buffer.rb @@ -2,6 +2,8 @@ require 'thread' require 'fileutils' require 'securerandom' +require 'net/http' +require 'uri' module LogStash module Outputs @@ -27,23 +29,21 @@ def initialize(max_size_mb, max_interval, &flush_callback) load_buffer_from_files @buffer_config[:logger].info("CustomSizeBasedBuffer initialized with max_size: #{max_size_mb} MB, max_interval: #{max_interval} seconds") - # Start the timer thread after a delay to ensure initializations are completed - Thread.new do - sleep(10) - @buffer_state[:timer] = Thread.new do - loop do - sleep(@buffer_config[:max_interval]) - buffer_flush(force: true) - end + # Start the timer thread + @buffer_state[:timer] = Thread.new do + loop do + sleep(@buffer_config[:max_interval]) + prepare_flush(force: true) end end end def <<(event) while buffer_full? do + prepare_flush(force: true) # Flush when buffer is full sleep 0.1 end - + @pending_mutex.synchronize do @buffer_state[:pending_items] << event @buffer_state[:pending_size] += event.bytesize @@ -54,7 +54,7 @@ def shutdown @buffer_config[:logger].info("Shutting down buffer") @shutdown = true @buffer_state[:timer].kill - buffer_flush(final: true) + prepare_flush(final: true) clear_buffer_files end @@ -66,65 +66,102 @@ def buffer_full? end end - def buffer_flush(options = {}) + def prepare_flush(options = {}) force = options[:force] || options[:final] final = options[:final] - if final - @flush_mutex.lock - elsif !@flush_mutex.try_lock - return 0 - end - - items_flushed = 0 - - begin - outgoing_items = [] - outgoing_size = 0 + outgoing_items = [] + outgoing_size = 0 - @pending_mutex.synchronize do - return 0 if @buffer_state[:pending_size] == 0 + @pending_mutex.synchronize do + return 0 if @buffer_state[:pending_size] == 0 - time_since_last_flush = Time.now.to_i - @buffer_state[:last_flush] + time_since_last_flush = Time.now.to_i - @buffer_state[:last_flush] - if !force && @buffer_state[:pending_size] < @buffer_config[:max_size] && time_since_last_flush < @buffer_config[:max_interval] - return 0 - end + if !force && @buffer_state[:pending_size] < @buffer_config[:max_size] && time_since_last_flush < @buffer_config[:max_interval] + return 0 + end - if force - @buffer_config[:logger].info("Time-based flush triggered after #{@buffer_config[:max_interval]} seconds") - elsif @buffer_state[:pending_size] >= @buffer_config[:max_size] - @buffer_config[:logger].info("Size-based flush triggered at #{@buffer_state[:pending_size]} bytes was reached") - else - @buffer_config[:logger].info("Flush triggered without specific condition") - end + if time_since_last_flush >= @buffer_config[:max_interval] + @buffer_config[:logger].info("Time-based flush triggered after #{@buffer_config[:max_interval]} seconds") + else + @buffer_config[:logger].info("Size-based flush triggered at #{@buffer_state[:pending_size]} bytes was reached") + end - outgoing_items = @buffer_state[:pending_items].dup - outgoing_size = @buffer_state[:pending_size] + if @buffer_state[:network_down] + save_buffer_to_file(@buffer_state[:pending_items]) @buffer_state[:pending_items] = [] @buffer_state[:pending_size] = 0 + return 0 end + outgoing_items = @buffer_state[:pending_items].dup + outgoing_size = @buffer_state[:pending_size] + @buffer_state[:pending_items] = [] + @buffer_state[:pending_size] = 0 + end + + if Dir.glob(::File.join(@buffer_config[:buffer_dir], 'buffer_state_*.log')).any? + flush_buffer_files + end + + Thread.new { perform_flush(outgoing_items) } + end + + def perform_flush(events, file_path = nil) + if file_path + unless ::File.exist?(file_path) + return 0 + end begin - @flush_callback.call(outgoing_items) # Pass the list of events to the callback - @buffer_state[:network_down] = false # Reset network status after successful flush - flush_buffer_files # Flush buffer files if any exist + buffer_state = Marshal.load(::File.read(file_path)) + events = buffer_state[:pending_items] rescue => e - @buffer_config[:logger].error("Flush failed: #{e.message}") - @buffer_state[:network_down] = true - save_buffer_to_file(outgoing_items) + @buffer_config[:logger].error("Failed to load buffer from file: #{e.message}") + return 0 end - + end + + @flush_mutex.lock + + begin + @buffer_config[:logger].info("Flushing #{events.size} events, #{events.sum(&:bytesize)} bytes") + @flush_callback.call(events) # Pass the list of events to the callback + @buffer_state[:network_down] = false # Reset network status after successful flush @buffer_state[:last_flush] = Time.now.to_i - @buffer_config[:logger].info("Flush completed. Flushed #{outgoing_items.size} events, #{outgoing_size} bytes") - - items_flushed = outgoing_items.size + @buffer_config[:logger].info("Flush completed. Flushed #{events.size} events, #{events.sum(&:bytesize)} bytes") + + if file_path + ::File.delete(file_path) + @buffer_config[:logger].info("Flushed and deleted buffer state file: #{file_path}") + end + + rescue => e + @buffer_config[:logger].error("Flush failed: #{e.message}") + @buffer_state[:network_down] = true + + while true + sleep(2) # Wait before checking network availability again + if network_available? + @buffer_config[:logger].info("Network is back up. Retrying flush.") + retry + end + end ensure @flush_mutex.unlock end - - items_flushed end + + def network_available? + begin + uri = URI('http://www.google.com') + response = Net::HTTP.get_response(uri) + response.is_a?(Net::HTTPSuccess) + rescue + false + end + end + def save_buffer_to_file(events) buffer_state_copy = { @@ -137,7 +174,7 @@ def save_buffer_to_file(events) ::File.open(file_path, 'w') do |file| file.write(Marshal.dump(buffer_state_copy)) end - @buffer_config[:logger].info("Saved buffer state to file: #{file_path}") + @buffer_config[:logger].info("Saved #{events.size} events to file: #{file_path}") end def load_buffer_from_files @@ -156,16 +193,7 @@ def load_buffer_from_files def flush_buffer_files Dir.glob(::File.join(@buffer_config[:buffer_dir], 'buffer_state_*.log')).each do |file_path| - begin - buffer_state = Marshal.load(::File.read(file_path)) - @buffer_config[:logger].info("Flushed from file: #{file_path}") - @flush_callback.call(buffer_state[:pending_items]) - ::File.delete(file_path) - @buffer_config[:logger].info("Flushed and deleted buffer state file: #{file_path}") - rescue => e - @buffer_config[:logger].error("Failed to flush buffer state file: #{e.message}") - break - end + Thread.new { perform_flush([], file_path) } end end diff --git a/lib/logstash/outputs/kusto/ingestor.rb b/lib/logstash/outputs/kusto/ingestor.rb index e991d5a..d68edc1 100755 --- a/lib/logstash/outputs/kusto/ingestor.rb +++ b/lib/logstash/outputs/kusto/ingestor.rb @@ -156,18 +156,10 @@ def upload(data) begin data_source_info = Java::com.microsoft.azure.kusto.ingest.source.StreamSourceInfo.new(java.io.ByteArrayInputStream.new(data.to_java_bytes)) ingestion_result = @kusto_client.ingestFromStream(data_source_info, @ingestion_properties) - - # Check the ingestion status - status = ingestion_result.getIngestionStatusCollection.get(0) - if status.status != Java::com.microsoft.azure.kusto.ingest.result.OperationStatus::Queued - raise "Failed upload: #{status.errorCodeString}" - end - @logger.info("Final ingestion status: #{status.status}") + rescue => e @logger.error('Error during ingestFromStream.', exception: e.class, message: e.message, backtrace: e.backtrace) - if e.message.include?("network") - raise e - end + raise e ensure ingestionLatch.countDown() end