Skip to content

Commit

Permalink
* Refactor to classes
Browse files Browse the repository at this point in the history
  • Loading branch information
ag-ramachandran committed Oct 25, 2024
1 parent 64b4d34 commit 5941adf
Show file tree
Hide file tree
Showing 8 changed files with 444 additions and 242 deletions.
18 changes: 9 additions & 9 deletions lib/logstash/outputs/kusto.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
require 'logstash/outputs/kusto/ingestor'
require 'logstash/outputs/kusto/interval'
require 'logstash/outputs/kusto/custom_size_based_buffer'
require 'logstash/outputs/kusto/kustoLogstashConfiguration'

##
# This plugin sends messages to Azure Kusto in batches.
Expand Down Expand Up @@ -91,13 +92,13 @@ def register
max_threads: upload_concurrent_count,
max_queue: upload_queue_size,
fallback_policy: :caller_runs)
@ingestor = Ingestor.new(ingest_url, app_id, app_key, app_tenant, managed_identity, cli_auth, database, table, final_mapping, proxy_host, proxy_port, proxy_protocol, @logger, executor)

# Deprecation warning for path
if @path
@logger.warn("The 'path' configuration option is deprecated and will be removed in a future release.")
end

kusto_ingest_base = LogStash::Outputs::KustoInternal::KustoIngestConfiguration.new(ingest_url, database, table, final_mapping)
kusto_auth_base = LogStash::Outputs::KustoInternal::KustoAuthConfiguration.new(app_id, app_key, app_tenant, managed_identity, cli_auth)
kusto_proxy_base = LogStash::Outputs::KustoInternal::KustoProxyConfiguration.new(proxy_host , proxy_port , proxy_protocol, false)
@kusto_logstash_configuration = LogStash::Outputs::KustoInternal::KustoLogstashConfiguration.new(kusto_ingest_base, kusto_auth_base , kusto_proxy_base, logger)
@ingestor = Ingestor.new(@kusto_logstash_configuration, @logger, executor)

end


Expand All @@ -114,8 +115,7 @@ def multi_receive_encoded(events_and_encoded)

def close
@logger.info("Closing Kusto output plugin")

begin
begin
@buffer.shutdown unless @buffer.nil?
@logger.info("Buffer shutdown") unless @buffer.nil?
rescue => e
Expand Down
42 changes: 21 additions & 21 deletions lib/logstash/outputs/kusto/custom_size_based_buffer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@ def initialize(max_size_mb, max_interval, &flush_callback)
end
end

def <<(event)
while buffer_full? do
sleep 0.1
end
def <<(event)
while buffer_full? do
sleep 0.1
end

@pending_mutex.synchronize do
@buffer_state[:pending_items] << event
Expand All @@ -58,23 +58,23 @@ def shutdown
clear_buffer_files
end

private
private

def buffer_full?
@pending_mutex.synchronize do
@buffer_state[:pending_size] >= @buffer_config[:max_size]
end
end
def buffer_full?
@pending_mutex.synchronize do
@buffer_state[:pending_size] >= @buffer_config[:max_size]
end
end

def buffer_flush(options = {})
force = options[:force] || options[:final]
final = options[:final]
def buffer_flush(options = {})
force = options[:force] || options[:final]
final = options[:final]

if final
@flush_mutex.lock
elsif !@flush_mutex.try_lock
return 0
end
if final
@flush_mutex.lock
elsif !@flush_mutex.try_lock
return 0
end

items_flushed = 0

Expand All @@ -85,7 +85,7 @@ def buffer_flush(options = {})
@pending_mutex.synchronize do
return 0 if @buffer_state[:pending_size] == 0

time_since_last_flush = Time.now.to_i - @buffer_state[:last_flush]
time_since_last_flush = Time.now.to_i - @buffer_state[:last_flush]

if !force && @buffer_state[:pending_size] < @buffer_config[:max_size] && time_since_last_flush < @buffer_config[:max_interval]
return 0
Expand Down Expand Up @@ -123,8 +123,8 @@ def buffer_flush(options = {})
@flush_mutex.unlock
end

items_flushed
end
items_flushed
end

def save_buffer_to_file(events)
buffer_state_copy = {
Expand Down
73 changes: 21 additions & 52 deletions lib/logstash/outputs/kusto/ingestor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,38 +20,39 @@ class Ingestor
LOW_QUEUE_LENGTH = 3
FIELD_REF = /%\{[^}]+\}/

def initialize(ingest_url, app_id, app_key, app_tenant, managed_identity_id, cli_auth, database, table, json_mapping, proxy_host , proxy_port , proxy_protocol,logger, threadpool = DEFAULT_THREADPOOL)
def initialize(kusto_logstash_configuration, logger, threadpool = DEFAULT_THREADPOOL)
@workers_pool = threadpool
@logger = logger
validate_config(database, table, json_mapping,proxy_protocol,app_id, app_key, managed_identity_id,cli_auth)
#Validate and assign
kusto_logstash_configuration.validate_config()
@kusto_logstash_configuration = kusto_logstash_configuration

@logger.info('Preparing Kusto resources.')

kusto_java = Java::com.microsoft.azure.kusto
apache_http = Java::org.apache.http
# kusto_connection_string = kusto_java.data.auth.ConnectionStringBuilder.createWithAadApplicationCredentials(ingest_url, app_id, app_key.value, app_tenant)
# If there is managed identity, use it. This means the AppId and AppKey are empty/nil
# If there is CLI Auth, use that instead of managed identity
is_managed_identity = (app_id.nil? && app_key.nil? && !cli_auth)

is_managed_identity = @kusto_logstash_configuration.kusto_auth.is_managed_identity
# If it is system managed identity, propagate the system identity
is_system_assigned_managed_identity = is_managed_identity && 0 == "system".casecmp(managed_identity_id)
is_system_assigned_managed_identity = @kusto_logstash_configuration.kusto_auth.is_system_assigned_managed_identity
# Is it direct connection
is_direct_conn = (proxy_host.nil? || proxy_host.empty?)
is_direct_conn = @kusto_logstash_configuration.kusto_proxy.is_direct_conn
# Create a connection string
kusto_connection_string = if is_managed_identity
if is_system_assigned_managed_identity
@logger.info('Using system managed identity.')
kusto_java.data.auth.ConnectionStringBuilder.createWithAadManagedIdentity(ingest_url)
kusto_java.data.auth.ConnectionStringBuilder.createWithAadManagedIdentity(@kusto_logstash_configuration.kusto_ingest.ingest_url)
else
@logger.info('Using user managed identity.')
kusto_java.data.auth.ConnectionStringBuilder.createWithAadManagedIdentity(ingest_url, managed_identity_id)
kusto_java.data.auth.ConnectionStringBuilder.createWithAadManagedIdentity(@kusto_logstash_configuration.kusto_ingest.ingest_url, @kusto_logstash_configuration.kusto_ingest.managed_identity_id)
end
else
if cli_auth
if @kusto_logstash_configuration.kusto_auth.cli_auth
@logger.warn('*Use of CLI Auth is only for dev-test scenarios. This is ***NOT RECOMMENDED*** for production*')
kusto_java.data.auth.ConnectionStringBuilder.createWithAzureCli(ingest_url)
kusto_java.data.auth.ConnectionStringBuilder.createWithAzureCli(@kusto_logstash_configuration.kusto_ingest.ingest_url)
else
@logger.info('Using app id and app key.')
kusto_java.data.auth.ConnectionStringBuilder.createWithAadApplicationCredentials(ingest_url, app_id, app_key.value, app_tenant)
kusto_java.data.auth.ConnectionStringBuilder.createWithAadApplicationCredentials(@kusto_logstash_configuration.kusto_ingest.ingest_url, @kusto_logstash_configuration.kusto_auth.app_id, @kusto_logstash_configuration.kusto_auth.app_key.value, @kusto_logstash_configuration.kusto_auth.app_tenant)
end
end
@logger.debug(Gem.loaded_specs.to_s)
Expand All @@ -62,22 +63,22 @@ def initialize(ingest_url, app_id, app_key, app_tenant, managed_identity_id, cli
tuple_utils = Java::org.apache.commons.lang3.tuple
# kusto_connection_string.setClientVersionForTracing(name_for_tracing)
version_for_tracing=Gem.loaded_specs['logstash-output-kusto']&.version || "unknown"
kusto_connection_string.setConnectorDetails("Logstash",version_for_tracing.to_s,"","",false,"", tuple_utils.Pair.emptyArray());
kusto_connection_string.setConnectorDetails("Logstash",version_for_tracing.to_s,name_for_tracing.to_s,version_for_tracing.to_s,false,"", tuple_utils.Pair.emptyArray());

@kusto_client = begin
if is_direct_conn
kusto_java.ingest.IngestClientFactory.createClient(kusto_connection_string)
else
kusto_http_client_properties = kusto_java.data.HttpClientProperties.builder().proxy(apache_http.HttpHost.new(proxy_host,proxy_port,proxy_protocol)).build()
kusto_http_client_properties = kusto_java.data.HttpClientProperties.builder().proxy(apache_http.HttpHost.new(@kusto_logstash_configuration.kusto_proxy.proxy_host,@kusto_logstash_configuration.kusto_proxy.proxy_port,@kusto_logstash_configuration.kusto_proxy.proxy_protocol)).build()
kusto_java.ingest.IngestClientFactory.createClient(kusto_connection_string, kusto_http_client_properties)
end
end

@ingestion_properties = kusto_java.ingest.IngestionProperties.new(database, table)
is_mapping_ref_provided = !(json_mapping.nil? || json_mapping.empty?)
if is_mapping_ref_provided
@logger.debug('Using mapping reference.', json_mapping)
@ingestion_properties.setIngestionMapping(json_mapping, kusto_java.ingest.IngestionMapping::IngestionMappingKind::JSON)
@ingestion_properties = kusto_java.ingest.IngestionProperties.new(@kusto_logstash_configuration.kusto_ingest.database, @kusto_logstash_configuration.kusto_ingest.table)

if @kusto_logstash_configuration.kusto_ingest.is_mapping_ref_provided
@logger.debug('Using mapping reference.', @kusto_logstash_configuration.kusto_ingest.json_mapping)
@ingestion_properties.setIngestionMapping(@kusto_logstash_configuration.kusto_ingest.json_mapping, kusto_java.ingest.IngestionMapping::IngestionMappingKind::JSON)
@ingestion_properties.setDataFormat(kusto_java.ingest.IngestionProperties::DataFormat::JSON)
else
@logger.debug('No mapping reference provided. Columns will be mapped by names in the logstash output')
Expand All @@ -86,38 +87,6 @@ def initialize(ingest_url, app_id, app_key, app_tenant, managed_identity_id, cli
@logger.debug('Kusto resources are ready.')
end

def validate_config(database, table, json_mapping, proxy_protocol, app_id, app_key, managed_identity_id,cli_auth)
# Add an additional validation and fail this upfront
if app_id.nil? && app_key.nil? && managed_identity_id.nil?
if cli_auth
@logger.info('Using CLI Auth, this is only for dev-test scenarios. This is ***NOT RECOMMENDED*** for production')
else
@logger.error('managed_identity_id is not provided and app_id/app_key is empty.')
raise LogStash::ConfigurationError.new('managed_identity_id is not provided and app_id/app_key is empty.')
end
end
if database =~ FIELD_REF
@logger.error('database config value should not be dynamic.', database)
raise LogStash::ConfigurationError.new('database config value should not be dynamic.')
end

if table =~ FIELD_REF
@logger.error('table config value should not be dynamic.', table)
raise LogStash::ConfigurationError.new('table config value should not be dynamic.')
end

if json_mapping =~ FIELD_REF
@logger.error('json_mapping config value should not be dynamic.', json_mapping)
raise LogStash::ConfigurationError.new('json_mapping config value should not be dynamic.')
end

if not(["https", "http"].include? proxy_protocol)
@logger.error('proxy_protocol has to be http or https.', proxy_protocol)
raise LogStash::ConfigurationError.new('proxy_protocol has to be http or https.')
end

end

def upload_async(data)
if @workers_pool.remaining_capacity <= LOW_QUEUE_LENGTH
@logger.warn("Ingestor queue capacity is running low with #{@workers_pool.remaining_capacity} free slots.")
Expand Down
126 changes: 63 additions & 63 deletions lib/logstash/outputs/kusto/interval.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,77 +5,77 @@
require 'logstash/errors'

class LogStash::Outputs::Kusto < LogStash::Outputs::Base
##
# Bare-bones utility for running a block of code at an interval.
#
class Interval
##
# Initializes a new Interval with the given arguments and starts it
# before returning it.
#
# @param interval [Integer] (see: Interval#initialize)
# @param procsy [#call] (see: Interval#initialize)
#
# @return [Interval]
#
def self.start(interval, procsy)
new(interval, procsy).tap(&:start)
end
##
# Bare-bones utility for running a block of code at an interval.
#
class Interval
##
# Initializes a new Interval with the given arguments and starts it
# before returning it.
#
# @param interval [Integer] (see: Interval#initialize)
# @param procsy [#call] (see: Interval#initialize)
#
# @return [Interval]
#
def self.start(interval, procsy)
new(interval, procsy).tap(&:start)
end

##
# @param interval [Integer]: time in seconds to wait between calling the given proc
# @param procsy [#call]: proc or lambda to call periodically; must not raise exceptions.
def initialize(interval, procsy)
@interval = interval
@procsy = procsy
##
# @param interval [Integer]: time in seconds to wait between calling the given proc
# @param procsy [#call]: proc or lambda to call periodically; must not raise exceptions.
def initialize(interval, procsy)
@interval = interval
@procsy = procsy

# Mutex, ConditionVariable, etc.
@mutex = Mutex.new
@sleeper = ConditionVariable.new
end
# Mutex, ConditionVariable, etc.
@mutex = Mutex.new
@sleeper = ConditionVariable.new
end

##
# Starts the interval, or returns if it has already been started.
#
# @return [void]
def start
@mutex.synchronize do
return if @thread && @thread.alive?
##
# Starts the interval, or returns if it has already been started.
#
# @return [void]
def start
@mutex.synchronize do
return if @thread && @thread.alive?

@thread = Thread.new { run }
end
end
@thread = Thread.new { run }
end
end

##
# Stop the interval.
# Does not interrupt if execution is in-progress.
def stop
@mutex.synchronize do
@stopped = true
end
##
# Stop the interval.
# Does not interrupt if execution is in-progress.
def stop
@mutex.synchronize do
@stopped = true
end

@thread && @thread.join
end
@thread && @thread.join
end

##
# @return [Boolean]
def alive?
@thread && @thread.alive?
end
##
# @return [Boolean]
def alive?
@thread && @thread.alive?
end

private
private

def run
@mutex.synchronize do
loop do
@sleeper.wait(@mutex, @interval)
break if @stopped
def run
@mutex.synchronize do
loop do
@sleeper.wait(@mutex, @interval)
break if @stopped

@procsy.call
end
end
ensure
@sleeper.broadcast
end
end
@procsy.call
end
end
ensure
@sleeper.broadcast
end
end
end
Loading

0 comments on commit 5941adf

Please sign in to comment.