Skip to content

Commit

Permalink
added custom_size_based_buffer class
Browse files Browse the repository at this point in the history
Added the feature for size and time based flushing of buffer.
Added config options for max_interval and max_size. Once either one is
reached the events stored in the buffer will be flushed.
  • Loading branch information
monishkadas-ms committed Sep 12, 2024
1 parent 7e4d38a commit df78334
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 4 deletions.
17 changes: 15 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,13 @@ apply plugin: 'maven-publish'
// The gemspec contains the gem metadata to build and package the gem. The gradle build serves as a mechanism of getting these "vendor" files required for the gem.
// The alternative is to use ruby-maven gem to package, but this runs into classpath conflicts/issues with the logstash plugin.
group "org.logstash.outputs"
version Files.readAllLines(Paths.get("version")).first()

def versionFile = Paths.get("version")
if (Files.exists(versionFile)) {
version = Files.readAllLines(versionFile).first()
} else {
version = "2.0.7"
}

repositories {
mavenCentral()
Expand Down Expand Up @@ -116,7 +122,14 @@ task vendor {
String vendorPathPrefix = "vendor/jar-dependencies"
configurations.runtimeClasspath.allDependencies.each { dep ->
println("Copying ${dep.group}:${dep.name}:${dep.version}")
File f = configurations.runtimeClasspath.filter { it.absolutePath.contains("${dep.group}/${dep.name}/${dep.version}") }.singleFile
def files = configurations.runtimeClasspath.filter { file ->
file.name.matches("${dep.name}-${dep.version}.*\\.jar")
}
if (files.isEmpty()) {
println("Warning: No files found for ${dep.group}:${dep.name}:${dep.version}")
return
}
File f = files.singleFile
String groupPath = dep.group.replaceAll('\\.', '/')
File newJarFile = file("${vendorPathPrefix}/${groupPath}/${dep.name}/${dep.version}/${dep.name}-${dep.version}.jar")
newJarFile.mkdirs()
Expand Down
19 changes: 17 additions & 2 deletions lib/logstash/outputs/kusto.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

require 'logstash/outputs/kusto/ingestor'
require 'logstash/outputs/kusto/interval'
require "logstash/outputs/kusto/custom_size_based_buffer"

##
# This plugin sends messages to Azure Kusto in batches.
Expand Down Expand Up @@ -95,6 +96,9 @@ class LogStash::Outputs::Kusto < LogStash::Outputs::Base
# Mapping name - deprecated, use json_mapping
config :mapping, validate: :string, deprecated: true

config :max_size, validate => :number, default => 1000

config :max_interval, validate => :number, default => 60

# Determines if local files used for temporary storage will be deleted
# after upload is successful
Expand Down Expand Up @@ -123,6 +127,9 @@ class LogStash::Outputs::Kusto < LogStash::Outputs::Base

def register
require 'fileutils' # For mkdir_p
@buffer = LogStash::Outputs::CustomSizeBasedBuffer.new(@max_size, @max_interval) do |events|
flush(events)
end

@files = {}
@io_mutex = Mutex.new
Expand Down Expand Up @@ -198,9 +205,17 @@ def root_directory

public
def multi_receive_encoded(events_and_encoded)
encoded_by_path = Hash.new { |h, k| h[k] = [] }

events_and_encoded.each do |event, encoded|
@buffer << { event: event, encoded: encoded }
end
end

def flush(events)
encoded_by_path = Hash.new { |h, k| h[k] = [] }

events.each do |event_data|
event = event_data[:event]
encoded = event_data[:encoded]
file_output_path = event_path(event)
encoded_by_path[file_output_path] << encoded
end
Expand Down
50 changes: 50 additions & 0 deletions lib/logstash/outputs/kusto/custom_size_based_buffer.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
module LogStash
module Outputs
class CustomSizeBasedBuffer
def initialize(max_size, max_interval, &flush_callback)
@max_size = max_size
@max_interval = max_interval
@flush_callback = flush_callback
@buffer = []
@mutex = Mutex.new
@last_flush_time = Time.now

start_flusher_thread
end

def <<(event)
@mutex.synchronize do
@buffer << event
flush if @buffer.size >= @max_size
end
end

private
def start_flusher_thread
Thread.new do
loop do
sleep @max_interval
flush_if_needed
end
end
end

def flush_if_needed
@mutex.synchronize do
if Time.now - @last_flush_time >= @max_interval
flush
end
end
end

def flush
return if @buffer.empty?

@flush_callback.call(@buffer)
@buffer.clear
@last_flush_time = Time.now
end
end
end
end

0 comments on commit df78334

Please sign in to comment.