From df78334078f20b0b1d43bef542e7234d7603f6ea Mon Sep 17 00:00:00 2001 From: monishkadas-ms Date: Thu, 12 Sep 2024 22:02:06 +0530 Subject: [PATCH] added custom_size_based_buffer class Added the feature for size and time based flushing of buffer. Added config options for max_interval and max_size. Once either one is reached the events stored in the buffer will be flushed. --- build.gradle | 17 ++++++- lib/logstash/outputs/kusto.rb | 19 ++++++- .../outputs/kusto/custom_size_based_buffer.rb | 50 +++++++++++++++++++ 3 files changed, 82 insertions(+), 4 deletions(-) create mode 100644 lib/logstash/outputs/kusto/custom_size_based_buffer.rb diff --git a/build.gradle b/build.gradle index e6ae8b6..402730d 100644 --- a/build.gradle +++ b/build.gradle @@ -10,7 +10,13 @@ apply plugin: 'maven-publish' // The gemspec contains the gem metadata to build and package the gem. The gradle build serves as a mechanism of getting these "vendor" files required for the gem. // The alternative is to use ruby-maven gem to package, but this runs into classpath conflicts/issues with the logstash plugin. group "org.logstash.outputs" -version Files.readAllLines(Paths.get("version")).first() + +def versionFile = Paths.get("version") +if (Files.exists(versionFile)) { + version = Files.readAllLines(versionFile).first() +} else { + version = "2.0.7" +} repositories { mavenCentral() @@ -116,7 +122,14 @@ task vendor { String vendorPathPrefix = "vendor/jar-dependencies" configurations.runtimeClasspath.allDependencies.each { dep -> println("Copying ${dep.group}:${dep.name}:${dep.version}") - File f = configurations.runtimeClasspath.filter { it.absolutePath.contains("${dep.group}/${dep.name}/${dep.version}") }.singleFile + def files = configurations.runtimeClasspath.filter { file -> + file.name.matches("${dep.name}-${dep.version}.*\\.jar") + } + if (files.isEmpty()) { + println("Warning: No files found for ${dep.group}:${dep.name}:${dep.version}") + return + } + File f = files.singleFile String groupPath = dep.group.replaceAll('\\.', '/') File newJarFile = file("${vendorPathPrefix}/${groupPath}/${dep.name}/${dep.version}/${dep.name}-${dep.version}.jar") newJarFile.mkdirs() diff --git a/lib/logstash/outputs/kusto.rb b/lib/logstash/outputs/kusto.rb index cd61d5d..741226e 100755 --- a/lib/logstash/outputs/kusto.rb +++ b/lib/logstash/outputs/kusto.rb @@ -6,6 +6,7 @@ require 'logstash/outputs/kusto/ingestor' require 'logstash/outputs/kusto/interval' +require "logstash/outputs/kusto/custom_size_based_buffer" ## # This plugin sends messages to Azure Kusto in batches. @@ -95,6 +96,9 @@ class LogStash::Outputs::Kusto < LogStash::Outputs::Base # Mapping name - deprecated, use json_mapping config :mapping, validate: :string, deprecated: true + config :max_size, validate => :number, default => 1000 + + config :max_interval, validate => :number, default => 60 # Determines if local files used for temporary storage will be deleted # after upload is successful @@ -123,6 +127,9 @@ class LogStash::Outputs::Kusto < LogStash::Outputs::Base def register require 'fileutils' # For mkdir_p + @buffer = LogStash::Outputs::CustomSizeBasedBuffer.new(@max_size, @max_interval) do |events| + flush(events) + end @files = {} @io_mutex = Mutex.new @@ -198,9 +205,17 @@ def root_directory public def multi_receive_encoded(events_and_encoded) - encoded_by_path = Hash.new { |h, k| h[k] = [] } - events_and_encoded.each do |event, encoded| + @buffer << { event: event, encoded: encoded } + end + end + + def flush(events) + encoded_by_path = Hash.new { |h, k| h[k] = [] } + + events.each do |event_data| + event = event_data[:event] + encoded = event_data[:encoded] file_output_path = event_path(event) encoded_by_path[file_output_path] << encoded end diff --git a/lib/logstash/outputs/kusto/custom_size_based_buffer.rb b/lib/logstash/outputs/kusto/custom_size_based_buffer.rb new file mode 100644 index 0000000..1a599ed --- /dev/null +++ b/lib/logstash/outputs/kusto/custom_size_based_buffer.rb @@ -0,0 +1,50 @@ +module LogStash + module Outputs + class CustomSizeBasedBuffer + def initialize(max_size, max_interval, &flush_callback) + @max_size = max_size + @max_interval = max_interval + @flush_callback = flush_callback + @buffer = [] + @mutex = Mutex.new + @last_flush_time = Time.now + + start_flusher_thread + end + + def <<(event) + @mutex.synchronize do + @buffer << event + flush if @buffer.size >= @max_size + end + end + + private + def start_flusher_thread + Thread.new do + loop do + sleep @max_interval + flush_if_needed + end + end + end + + def flush_if_needed + @mutex.synchronize do + if Time.now - @last_flush_time >= @max_interval + flush + end + end + end + + def flush + return if @buffer.empty? + + @flush_callback.call(@buffer) + @buffer.clear + @last_flush_time = Time.now + end + end + end + end + \ No newline at end of file