Skip to content

Commit

Permalink
* Refactor to classes
Browse files Browse the repository at this point in the history
  • Loading branch information
ag-ramachandran committed Oct 18, 2024
1 parent ca981ba commit 9e4ad27
Show file tree
Hide file tree
Showing 8 changed files with 499 additions and 295 deletions.
22 changes: 11 additions & 11 deletions lib/logstash/outputs/kusto.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
require 'logstash/outputs/kusto/ingestor'
require 'logstash/outputs/kusto/interval'
require 'logstash/outputs/kusto/custom_size_based_buffer'
require 'logstash/outputs/kusto/kustoLogstashConfiguration'

##
# This plugin sends messages to Azure Kusto in batches.
Expand Down Expand Up @@ -77,8 +78,8 @@ class LogStash::Outputs::Kusto < LogStash::Outputs::Base
default :codec, 'json_lines'

def register
# Initialize the custom buffer with size, interval, and buffer file
@buffer = LogStash::Outputs::CustomSizeBasedBuffer.new(@max_size, @max_interval) do |events|
# Initialize the custom buffer with size and interval
@buffer = LogStash::Outputs::KustoInternal::CustomSizeBasedBuffer.new(@max_size, @max_interval) do |events|
flush_buffer(events)
end

Expand All @@ -91,13 +92,13 @@ def register
max_threads: upload_concurrent_count,
max_queue: upload_queue_size,
fallback_policy: :caller_runs)
@ingestor = Ingestor.new(ingest_url, app_id, app_key, app_tenant, managed_identity, cli_auth, database, table, final_mapping, proxy_host, proxy_port, proxy_protocol, @logger, executor)

# Deprecation warning for path
if @path
@logger.warn("The 'path' configuration option is deprecated and will be removed in a future release.")
end

kusto_ingest_base = LogStash::Outputs::KustoInternal::KustoIngestConfiguration.new(ingest_url, database, table, final_mapping)
kusto_auth_base = LogStash::Outputs::KustoInternal::KustoAuthConfiguration.new(app_id, app_key, app_tenant, managed_identity, cli_auth)
kusto_proxy_base = LogStash::Outputs::KustoInternal::KustoProxyConfiguration.new(proxy_host , proxy_port , proxy_protocol, false)
@kusto_logstash_configuration = LogStash::Outputs::KustoInternal::KustoLogstashConfiguration.new(kusto_ingest_base, kusto_auth_base , kusto_proxy_base, logger)
@ingestor = Ingestor.new(@kusto_logstash_configuration, @logger, executor)

end


Expand All @@ -114,8 +115,7 @@ def multi_receive_encoded(events_and_encoded)

def close
@logger.info("Closing Kusto output plugin")

begin
begin
@buffer.shutdown unless @buffer.nil?
@logger.info("Buffer shutdown") unless @buffer.nil?
rescue => e
Expand Down
146 changes: 74 additions & 72 deletions lib/logstash/outputs/kusto/custom_size_based_buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,69 +2,70 @@
require 'thread'

module LogStash
module Outputs
class CustomSizeBasedBuffer
def initialize(max_size_mb, max_interval, &flush_callback)
@buffer_config = {
max_size: max_size_mb * 1024 * 1024, # Convert MB to bytes
max_interval: max_interval,
logger: Logger.new(STDOUT)
}
@buffer_state = {
pending_items: [],
pending_size: 0,
last_flush: Time.now.to_i,
timer: Thread.new do
loop do
sleep(@buffer_config[:max_interval])
buffer_flush(force: true)
end
end
}
@flush_callback = flush_callback
@shutdown = false
@pending_mutex = Mutex.new
@flush_mutex = Mutex.new
@buffer_config[:logger].info("CustomSizeBasedBuffer initialized with max_size: #{max_size_mb} MB, max_interval: #{max_interval} seconds")
end

def <<(event)
while buffer_full? do
sleep 0.1
end

@pending_mutex.synchronize do
@buffer_state[:pending_items] << event
@buffer_state[:pending_size] += event.bytesize
end

buffer_flush
end

def shutdown
@buffer_config[:logger].info("Shutting down buffer")
@shutdown = true
@buffer_state[:timer].kill
buffer_flush(final: true)
end

private

def buffer_full?
@pending_mutex.synchronize do
@buffer_state[:pending_size] >= @buffer_config[:max_size]
end
end

def buffer_flush(options = {})
force = options[:force] || options[:final]
final = options[:final]

if final
@flush_mutex.lock
elsif !@flush_mutex.try_lock
return 0
end
module Outputs
module KustoInternal
class CustomSizeBasedBuffer
def initialize(max_size_mb, max_interval, &flush_callback)
@buffer_config = {
max_size: max_size_mb * 1024 * 1024, # Convert MB to bytes
max_interval: max_interval,
logger: Logger.new(STDOUT)
}
@buffer_state = {
pending_items: [],
pending_size: 0,
last_flush: Time.now.to_i,
timer: Thread.new do
loop do
sleep(@buffer_config[:max_interval])
buffer_flush(force: true)
end
end
}
@flush_callback = flush_callback
@shutdown = false
@pending_mutex = Mutex.new
@flush_mutex = Mutex.new
@buffer_config[:logger].info("CustomSizeBasedBuffer initialized with max_size: #{max_size_mb} MB, max_interval: #{max_interval} seconds")
end

def <<(event)
while buffer_full? do
sleep 0.1
end

@pending_mutex.synchronize do
@buffer_state[:pending_items] << event
@buffer_state[:pending_size] += event.bytesize
end

buffer_flush
end

def shutdown
@buffer_config[:logger].info("Shutting down buffer")
@shutdown = true
@buffer_state[:timer].kill
buffer_flush(final: true)
end

private

def buffer_full?
@pending_mutex.synchronize do
@buffer_state[:pending_size] >= @buffer_config[:max_size]
end
end

def buffer_flush(options = {})
force = options[:force] || options[:final]
final = options[:final]

if final
@flush_mutex.lock
elsif !@flush_mutex.try_lock
return 0
end

items_flushed = 0
max_retries = 5
Expand All @@ -77,7 +78,7 @@ def buffer_flush(options = {})
@pending_mutex.synchronize do
return 0 if @buffer_state[:pending_size] == 0

time_since_last_flush = Time.now.to_i - @buffer_state[:last_flush]
time_since_last_flush = Time.now.to_i - @buffer_state[:last_flush]

if !force && @buffer_state[:pending_size] < @buffer_config[:max_size] && time_since_last_flush < @buffer_config[:max_interval]
return 0
Expand Down Expand Up @@ -121,13 +122,14 @@ def buffer_flush(options = {})
@flush_mutex.unlock
end

items_flushed
end
items_flushed
end

def buffer_initialize
@buffer_state[:pending_items] = []
@buffer_state[:pending_size] = 0
end
end
end
def buffer_initialize
@buffer_state[:pending_items] = []
@buffer_state[:pending_size] = 0
end
end
end
end
end
73 changes: 21 additions & 52 deletions lib/logstash/outputs/kusto/ingestor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,38 +20,39 @@ class Ingestor
LOW_QUEUE_LENGTH = 3
FIELD_REF = /%\{[^}]+\}/

def initialize(ingest_url, app_id, app_key, app_tenant, managed_identity_id, cli_auth, database, table, json_mapping, proxy_host , proxy_port , proxy_protocol,logger, threadpool = DEFAULT_THREADPOOL)
def initialize(kusto_logstash_configuration, logger, threadpool = DEFAULT_THREADPOOL)
@workers_pool = threadpool
@logger = logger
validate_config(database, table, json_mapping,proxy_protocol,app_id, app_key, managed_identity_id,cli_auth)
#Validate and assign
kusto_logstash_configuration.validate_config()
@kusto_logstash_configuration = kusto_logstash_configuration

@logger.info('Preparing Kusto resources.')

kusto_java = Java::com.microsoft.azure.kusto
apache_http = Java::org.apache.http
# kusto_connection_string = kusto_java.data.auth.ConnectionStringBuilder.createWithAadApplicationCredentials(ingest_url, app_id, app_key.value, app_tenant)
# If there is managed identity, use it. This means the AppId and AppKey are empty/nil
# If there is CLI Auth, use that instead of managed identity
is_managed_identity = (app_id.nil? && app_key.nil? && !cli_auth)

is_managed_identity = @kusto_logstash_configuration.kusto_auth.is_managed_identity
# If it is system managed identity, propagate the system identity
is_system_assigned_managed_identity = is_managed_identity && 0 == "system".casecmp(managed_identity_id)
is_system_assigned_managed_identity = @kusto_logstash_configuration.kusto_auth.is_system_assigned_managed_identity
# Is it direct connection
is_direct_conn = (proxy_host.nil? || proxy_host.empty?)
is_direct_conn = @kusto_logstash_configuration.kusto_proxy.is_direct_conn
# Create a connection string
kusto_connection_string = if is_managed_identity
if is_system_assigned_managed_identity
@logger.info('Using system managed identity.')
kusto_java.data.auth.ConnectionStringBuilder.createWithAadManagedIdentity(ingest_url)
kusto_java.data.auth.ConnectionStringBuilder.createWithAadManagedIdentity(@kusto_logstash_configuration.kusto_ingest.ingest_url)
else
@logger.info('Using user managed identity.')
kusto_java.data.auth.ConnectionStringBuilder.createWithAadManagedIdentity(ingest_url, managed_identity_id)
kusto_java.data.auth.ConnectionStringBuilder.createWithAadManagedIdentity(@kusto_logstash_configuration.kusto_ingest.ingest_url, @kusto_logstash_configuration.kusto_ingest.managed_identity_id)
end
else
if cli_auth
if @kusto_logstash_configuration.kusto_auth.cli_auth
@logger.warn('*Use of CLI Auth is only for dev-test scenarios. This is ***NOT RECOMMENDED*** for production*')
kusto_java.data.auth.ConnectionStringBuilder.createWithAzureCli(ingest_url)
kusto_java.data.auth.ConnectionStringBuilder.createWithAzureCli(@kusto_logstash_configuration.kusto_ingest.ingest_url)
else
@logger.info('Using app id and app key.')
kusto_java.data.auth.ConnectionStringBuilder.createWithAadApplicationCredentials(ingest_url, app_id, app_key.value, app_tenant)
kusto_java.data.auth.ConnectionStringBuilder.createWithAadApplicationCredentials(@kusto_logstash_configuration.kusto_ingest.ingest_url, @kusto_logstash_configuration.kusto_auth.app_id, @kusto_logstash_configuration.kusto_auth.app_key.value, @kusto_logstash_configuration.kusto_auth.app_tenant)
end
end
#
Expand All @@ -63,22 +64,22 @@ def initialize(ingest_url, app_id, app_key, app_tenant, managed_identity_id, cli
tuple_utils = Java::org.apache.commons.lang3.tuple
# kusto_connection_string.setClientVersionForTracing(name_for_tracing)
version_for_tracing=Gem.loaded_specs['logstash-output-kusto']&.version || "unknown"
kusto_connection_string.setConnectorDetails("Logstash",version_for_tracing.to_s,"","",false,"", tuple_utils.Pair.emptyArray());
kusto_connection_string.setConnectorDetails("Logstash",version_for_tracing.to_s,name_for_tracing.to_s,version_for_tracing.to_s,false,"", tuple_utils.Pair.emptyArray());

@kusto_client = begin
if is_direct_conn
kusto_java.ingest.IngestClientFactory.createClient(kusto_connection_string)
else
kusto_http_client_properties = kusto_java.data.HttpClientProperties.builder().proxy(apache_http.HttpHost.new(proxy_host,proxy_port,proxy_protocol)).build()
kusto_http_client_properties = kusto_java.data.HttpClientProperties.builder().proxy(apache_http.HttpHost.new(@kusto_logstash_configuration.kusto_proxy.proxy_host,@kusto_logstash_configuration.kusto_proxy.proxy_port,@kusto_logstash_configuration.kusto_proxy.proxy_protocol)).build()
kusto_java.ingest.IngestClientFactory.createClient(kusto_connection_string, kusto_http_client_properties)
end
end

@ingestion_properties = kusto_java.ingest.IngestionProperties.new(database, table)
is_mapping_ref_provided = !(json_mapping.nil? || json_mapping.empty?)
if is_mapping_ref_provided
@logger.debug('Using mapping reference.', json_mapping)
@ingestion_properties.setIngestionMapping(json_mapping, kusto_java.ingest.IngestionMapping::IngestionMappingKind::JSON)
@ingestion_properties = kusto_java.ingest.IngestionProperties.new(@kusto_logstash_configuration.kusto_ingest.database, @kusto_logstash_configuration.kusto_ingest.table)

if @kusto_logstash_configuration.kusto_ingest.is_mapping_ref_provided
@logger.debug('Using mapping reference.', @kusto_logstash_configuration.kusto_ingest.json_mapping)
@ingestion_properties.setIngestionMapping(@kusto_logstash_configuration.kusto_ingest.json_mapping, kusto_java.ingest.IngestionMapping::IngestionMappingKind::JSON)
@ingestion_properties.setDataFormat(kusto_java.ingest.IngestionProperties::DataFormat::JSON)
else
@logger.debug('No mapping reference provided. Columns will be mapped by names in the logstash output')
Expand All @@ -87,38 +88,6 @@ def initialize(ingest_url, app_id, app_key, app_tenant, managed_identity_id, cli
@logger.debug('Kusto resources are ready.')
end

def validate_config(database, table, json_mapping, proxy_protocol, app_id, app_key, managed_identity_id,cli_auth)
# Add an additional validation and fail this upfront
if app_id.nil? && app_key.nil? && managed_identity_id.nil?
if cli_auth
@logger.info('Using CLI Auth, this is only for dev-test scenarios. This is ***NOT RECOMMENDED*** for production')
else
@logger.error('managed_identity_id is not provided and app_id/app_key is empty.')
raise LogStash::ConfigurationError.new('managed_identity_id is not provided and app_id/app_key is empty.')
end
end
if database =~ FIELD_REF
@logger.error('database config value should not be dynamic.', database)
raise LogStash::ConfigurationError.new('database config value should not be dynamic.')
end

if table =~ FIELD_REF
@logger.error('table config value should not be dynamic.', table)
raise LogStash::ConfigurationError.new('table config value should not be dynamic.')
end

if json_mapping =~ FIELD_REF
@logger.error('json_mapping config value should not be dynamic.', json_mapping)
raise LogStash::ConfigurationError.new('json_mapping config value should not be dynamic.')
end

if not(["https", "http"].include? proxy_protocol)
@logger.error('proxy_protocol has to be http or https.', proxy_protocol)
raise LogStash::ConfigurationError.new('proxy_protocol has to be http or https.')
end

end

def upload_async(data)
if @workers_pool.remaining_capacity <= LOW_QUEUE_LENGTH
@logger.warn("Ingestor queue capacity is running low with #{@workers_pool.remaining_capacity} free slots.")
Expand Down
Loading

0 comments on commit 9e4ad27

Please sign in to comment.