Skip to content

Commit

Permalink
* Revision with fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
ag-ramachandran committed Dec 14, 2023
1 parent 78d5c70 commit 4465392
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 64 deletions.
13 changes: 5 additions & 8 deletions lib/logstash/outputs/kusto.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

require 'logstash/outputs/kusto/ingestor'
require 'logstash/outputs/kusto/interval'
require 'logstash/outputs/kusto/kustoLogstashConfiguration'

##
# This plugin sends messages to Azure Kusto in batches.
Expand Down Expand Up @@ -152,17 +153,13 @@ def register
end
@failure_path = File.join(@file_root, @filename_failure)

executor = Concurrent::ThreadPoolExecutor.new(min_threads: 1,
max_threads: upload_concurrent_count,
max_queue: upload_queue_size,
fallback_policy: :caller_runs)

@ingestor = Ingestor.new(ingest_url, app_id, app_key, app_tenant, managed_identity, database, table,
final_mapping, delete_temp_files, proxy_host, proxy_port, proxy_protocol, proxy_aad_only, @logger, executor)
kustoLogstashConfiguration = LogStash::Outputs::KustoInternal::KustoLogstashConfiguration.new(ingest_url, app_id, app_key, app_tenant, managed_identity, database, table, json_mapping, delete_temp_files, proxy_host, proxy_port, proxy_protocol, proxy_aad_only, @logger)
kustoLogstashConfiguration.validate_config()
executor = Concurrent::ThreadPoolExecutor.new(min_threads: 1, max_threads: upload_concurrent_count, max_queue: upload_queue_size, fallback_policy: :caller_runs)
@ingestor = Ingestor.new(kustoLogstashConfiguration, @logger, executor)

# send existing files
recover_past_files if recovery

@last_stale_cleanup_cycle = Time.now

@flush_interval = @flush_interval.to_i
Expand Down
42 changes: 18 additions & 24 deletions lib/logstash/outputs/kusto/ingestor.rb
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
# encoding: utf-8

require 'logstash/outputs/base'
require 'logstash/namespace'
require 'logstash/errors'
require 'logstash/outputs/kusto/kustoLogstashConfiguration'
require 'logstash/outputs/kusto/kustoAadProvider'

class LogStash::Outputs::Kusto < LogStash::Outputs::Base
##
Expand All @@ -18,27 +19,24 @@ class Ingestor
fallback_policy: :caller_runs
)
LOW_QUEUE_LENGTH = 3
FIELD_REF = /%\{[^}]+\}/

def initialize(ingest_url, app_id, app_key, app_tenant, managed_identity_id, database, table, json_mapping, delete_local, proxy_host , proxy_port , proxy_protocol,proxy_aad_only, logger, threadpool = DEFAULT_THREADPOOL)
@workers_pool = threadpool
def initialize(kustoLogstashConfiguration, logger, threadpool = DEFAULT_THREADPOOL)
@logger = logger
@kustoLogstashConfiguration = LogStash::Outputs::KustoInternal::KustoLogstashConfiguration.new(ingest_url, app_id, app_key, app_tenant, managed_identity_id, database, table, json_mapping, delete_local, proxy_host , proxy_port , proxy_protocol,proxy_aad_only, logger)
@kustoLogstashConfiguration.validate_config()
@workers_pool = threadpool
@kustoLogstashConfiguration = kustoLogstashConfiguration
@logger.info('Preparing Kusto resources.')
@ingestion_properties = get_ingestion_properties()
if @kustoLogstashConfiguration.proxy_aad_only
@kustoAadTokenProvider = LogStash::Outputs::KustoInternal::KustoAadTokenProvider.new(@kustoLogstashConfiguration)
end
@delete_local = delete_local
@logger.debug('Kusto resources are ready.')
end

def get_kusto_client()
if @kusto_client.nil?
if @kusto_client.nil? || (@kustoLogstashConfiguration.proxy_aad_only && @kustoAadTokenProvider.is_saved_token_need_refresh())
kusto_client = create_kusto_client()
end
return kusto_client
return @kusto_client
end

def create_kusto_client()
Expand All @@ -48,15 +46,15 @@ def create_kusto_client()
kusto_connection_string = if @kustoLogstashConfiguration.is_managed_identity
if @kustoLogstashConfiguration.is_system_assigned_managed_identity
@logger.info('Using system managed identity.')
kusto_java.data.auth.ConnectionStringBuilder.createWithAadManagedIdentity(ingest_url)
kusto_java.data.auth.ConnectionStringBuilder.createWithAadManagedIdentity(@kustoLogstashConfiguration.ingest_url)
else
@logger.info('Using user managed identity.')
kusto_java.data.auth.ConnectionStringBuilder.createWithAadManagedIdentity(ingest_url, managed_identity_id)
kusto_java.data.auth.ConnectionStringBuilder.createWithAadManagedIdentity(@kustoLogstashConfiguration.ingest_url, @kustoLogstashConfiguration.managed_identity_id)
end
elsif @kustoLogstashConfiguration.proxy_aad_only
kusto_java.data.auth.ConnectionStringBuilder.createWithAccessToken(ingest_url, )
kusto_java.data.auth.ConnectionStringBuilder.createWithAccessToken(@kustoLogstashConfiguration.ingest_url,@kustoLogstashConfiguration.get_aad_token_bearer())
else
kusto_java.data.auth.ConnectionStringBuilder.createWithAadApplicationCredentials(ingest_url, app_id, app_key.value, app_tenant)
kusto_java.data.auth.ConnectionStringBuilder.createWithAadApplicationCredentials(@kustoLogstashConfiguration.ingest_url, @kustoLogstashConfiguration.app_id, @kustoLogstashConfiguration.app_key.value, @kustoLogstashConfiguration.app_tenant)
end
#
@logger.debug(Gem.loaded_specs.to_s)
Expand All @@ -70,25 +68,21 @@ def create_kusto_client()
kusto_connection_string.setConnectorDetails("Logstash",version_for_tracing.to_s,"","",false,"", tuple_utils.Pair.emptyArray());

@kusto_client = begin
if is_direct_conn
if @kustoLogstashConfiguration.is_direct_conn || @kustoLogstashConfiguration.proxy_aad_only
kusto_java.ingest.IngestClientFactory.createClient(kusto_connection_string)
else
kusto_http_client_properties = kusto_java.data.HttpClientProperties.builder().proxy(apache_http.HttpHost.new(proxy_host,proxy_port,proxy_protocol)).build()
kusto_http_client_properties = kusto_java.data.HttpClientProperties.builder().proxy(apache_http.HttpHost.new(@kustoLogstashConfiguration.proxy_host,@kustoLogstashConfiguration.proxy_port,@kustoLogstashConfiguration.proxy_protocol)).build()
kusto_java.ingest.IngestClientFactory.createClient(kusto_connection_string, kusto_http_client_properties)
end
end





end

def get_ingestion_properties()
kusto_java = Java::com.microsoft.azure.kusto
ingestion_properties = kusto_java.ingest.IngestionProperties.new(@kustoLogstashConfiguration.database, @kustoLogstashConfiguration.table)
if @kustoLogstashConfiguration.is_mapping_ref_provided
@logger.debug('Using mapping reference.', json_mapping)
ingestion_properties.setIngestionMapping(json_mapping, kusto_java.ingest.IngestionMapping::IngestionMappingKind::JSON)
@logger.debug('Using mapping reference.', @kustoLogstashConfiguration.json_mapping)
ingestion_properties.setIngestionMapping(@kustoLogstashConfiguration.json_mapping, kusto_java.ingest.IngestionMapping::IngestionMappingKind::JSON)
ingestion_properties.setDataFormat(kusto_java.ingest.IngestionProperties::DataFormat::JSON)
else
@logger.debug('No mapping reference provided. Columns will be mapped by names in the logstash output')
Expand All @@ -115,7 +109,7 @@ def upload(path, delete_on_success)
@logger.debug("Sending file to kusto: #{path}. size: #{file_size}")
if file_size > 0
file_source_info = Java::com.microsoft.azure.kusto.ingest.source.FileSourceInfo.new(path, 0); # 0 - let the sdk figure out the size of the file
@kusto_client.ingestFromFile(file_source_info, @ingestion_properties)
get_kusto_client().ingestFromFile(file_source_info, @ingestion_properties)
else
@logger.warn("File #{path} is an empty file and is not ingested.")
end
Expand All @@ -140,4 +134,4 @@ def stop
@workers_pool.wait_for_termination(nil) # block until its done
end
end
end
end
45 changes: 23 additions & 22 deletions lib/logstash/outputs/kusto/kustoAadProvider.rb
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
# encoding: utf - 8
require "logstash/kusto/kustoLogstashConfiguration"
# encoding:utf-8
require 'rest-client'
require 'json'
require 'openssl'
require 'base64'
require 'time'
require 'logstash/outputs/kusto/kustoLogstashConfiguration'


module LogStash
module Outputs
Expand All @@ -19,45 +20,45 @@ def initialize(kustoLogstashConfiguration)
@token_request_body = sprintf("client_id=%s&scope=%s&client_secret=%s&grant_type=client_credentials", kustoLogstashConfiguration.app_id, scope, kustoLogstashConfiguration.app_key)
@token_request_uri = sprintf("%s/%s/oauth2/v2.0/token", aad_uri, kustoLogstashConfiguration.app_tenant)
@token_state = {
: access_token => nil,
: expiry_time => nil,
: token_details_mutex => Mutex.new,
}
:access_token => nil,
:expiry_time => nil,
:token_details_mutex => Mutex.new,
}
end # def initialize

# Public methods
public

def get_aad_token_bearer()
@token_state[: token_details_mutex].synchronize
do
if is_saved_token_need_refresh()
refresh_saved_token()
end
return @token_state[: access_token]
@token_state[:token_details_mutex].synchronize do
if is_saved_token_need_refresh()
refresh_saved_token()
end
return @token_state[:access_token]
end
end # def get_aad_token_bearer

# Private methods
private

def is_saved_token_need_refresh()
return @token_state[: access_token].nil ? || @token_state[: expiry_time].nil ? || @token_state[: expiry_time] <= Time.now
return @token_state[:access_token].nil? || @token_state[:expiry_time].nil? || @token_state[:expiry_time] <= Time.now
end # def is_saved_token_need_refresh

# Private methods
private

def refresh_saved_token()
@logger.info("aad token expired - refreshing token.")
token_response = post_token_request()
@token_state[: access_token] = token_response["access_token"]
@token_state[: expiry_time] = get_token_expiry_time(token_response["expires_in"])
@token_state[:access_token] = token_response["access_token"]
@token_state[:expiry_time] = get_token_expiry_time(token_response["expires_in"])
end # def refresh_saved_token

def get_token_expiry_time(expires_in_seconds)
if (expires_in_seconds.nil ? || expires_in_seconds <= 0)
if (expires_in_seconds.nil? || expires_in_seconds <= 0)
return Time.now + (60 * 60 * 24) # Refresh anyway in 24 hours
else
return Time.now + expires_in_seconds - 1;
# Decrease by 1 second to be on the safe side
return Time.now + expires_in_seconds - 30 * 60;
# Decrease by 30 seconds to be on the safe side
end
end # def get_token_expiry_time

Expand All @@ -69,8 +70,8 @@ def post_token_request()
begin
proxy_aad = sprintf("%s://%s:%s", @kustoLogstashConfiguration.proxy_protocol, @kustoLogstashConfiguration.proxy_host, @kustoLogstashConfiguration.proxy_port)
# Post REST request
response = RestClient::Request.execute(method:: post, url: @token_request_uri, payload: @token_request_body, headers: headers,
proxy: proxy_aad)
response = RestClient::Request.execute(method::post, url:@token_request_uri, payload:@token_request_body, headers:headers,
proxy:proxy_aad)

if (response.code == 200 || response.code == 201)
return JSON.parse(response.body)
Expand Down
25 changes: 15 additions & 10 deletions lib/logstash/outputs/kusto/kustoLogstashConfiguration.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
# encoding: utf-8
# A class just having all the configurations wrapped into a seperate object

module LogStash
module Outputs
module KustoInternal
Expand Down Expand Up @@ -46,24 +45,30 @@ def validate_config()
end

if @database =~ FIELD_REF
@logger.error('database config value should not be dynamic.', database)
raise LogStash::ConfigurationError.new('database config value should not be dynamic.')
end
@logger.error('database config value should not be dynamic.', database)
raise LogStash::ConfigurationError.new('database config value should not be dynamic.')
end

if @table =~ FIELD_REF
@logger.error('table config value should not be dynamic.', table)
raise LogStash::ConfigurationError.new('table config value should not be dynamic.')
@logger.error('table config value should not be dynamic.', table)
raise LogStash::ConfigurationError.new('table config value should not be dynamic.')
end

if @json_mapping =~ FIELD_REF
@logger.error('json_mapping config value should not be dynamic.', json_mapping)
raise LogStash::ConfigurationError.new('json_mapping config value should not be dynamic.')
@logger.error('json_mapping config value should not be dynamic.', json_mapping)
raise LogStash::ConfigurationError.new('json_mapping config value should not be dynamic.')
end

if not(["https", "http"].include? @proxy_protocol)
@logger.error('proxy_protocol has to be http or https.', proxy_protocol)
raise LogStash::ConfigurationError.new('proxy_protocol has to be http or https.')
@logger.error('proxy_protocol has to be http or https.', proxy_protocol)
raise LogStash::ConfigurationError.new('proxy_protocol has to be http or https.')
end

if @proxy_aad_only && @is_direct_conn
@logger.error('proxy_aad_only is true, but proxy parameters (Host,Port,Protocol) are missing.',proxy_host,proxy_port,proxy_protocol)
raise LogStash::ConfigurationError.new('proxy_aad_only is true, but proxy parameters (Host,Port,Protocol) are missing.')
end

# If all validation pass then configuration is valid
return true
end #validate_config()
Expand Down
1 change: 1 addition & 0 deletions logstash-output-kusto.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ Gem::Specification.new do |s|
s.metadata = { "logstash_plugin" => "true", "logstash_group" => "output" }

# Gem dependencies
s.add_runtime_dependency "rest-client", ">= 2.1.0"
s.add_runtime_dependency 'logstash-core', '>= 8.3.0'
s.add_runtime_dependency "logstash-core-plugin-api", ">= 1.60", "<= 2.99"
s.add_runtime_dependency 'logstash-codec-json_lines'
Expand Down

0 comments on commit 4465392

Please sign in to comment.