From 446539246921236cae45e83aaf25ee0c84151acd Mon Sep 17 00:00:00 2001 From: Ramachandran A G Date: Thu, 14 Dec 2023 11:39:32 +0530 Subject: [PATCH] * Revision with fixes --- lib/logstash/outputs/kusto.rb | 13 +++--- lib/logstash/outputs/kusto/ingestor.rb | 42 ++++++++--------- .../outputs/kusto/kustoAadProvider.rb | 45 ++++++++++--------- .../kusto/kustoLogstashConfiguration.rb | 25 ++++++----- logstash-output-kusto.gemspec | 1 + 5 files changed, 62 insertions(+), 64 deletions(-) diff --git a/lib/logstash/outputs/kusto.rb b/lib/logstash/outputs/kusto.rb index 66962ae..808c8ef 100755 --- a/lib/logstash/outputs/kusto.rb +++ b/lib/logstash/outputs/kusto.rb @@ -6,6 +6,7 @@ require 'logstash/outputs/kusto/ingestor' require 'logstash/outputs/kusto/interval' +require 'logstash/outputs/kusto/kustoLogstashConfiguration' ## # This plugin sends messages to Azure Kusto in batches. @@ -152,17 +153,13 @@ def register end @failure_path = File.join(@file_root, @filename_failure) - executor = Concurrent::ThreadPoolExecutor.new(min_threads: 1, - max_threads: upload_concurrent_count, - max_queue: upload_queue_size, - fallback_policy: :caller_runs) - - @ingestor = Ingestor.new(ingest_url, app_id, app_key, app_tenant, managed_identity, database, table, - final_mapping, delete_temp_files, proxy_host, proxy_port, proxy_protocol, proxy_aad_only, @logger, executor) + kustoLogstashConfiguration = LogStash::Outputs::KustoInternal::KustoLogstashConfiguration.new(ingest_url, app_id, app_key, app_tenant, managed_identity, database, table, json_mapping, delete_temp_files, proxy_host, proxy_port, proxy_protocol, proxy_aad_only, @logger) + kustoLogstashConfiguration.validate_config() + executor = Concurrent::ThreadPoolExecutor.new(min_threads: 1, max_threads: upload_concurrent_count, max_queue: upload_queue_size, fallback_policy: :caller_runs) + @ingestor = Ingestor.new(kustoLogstashConfiguration, @logger, executor) # send existing files recover_past_files if recovery - @last_stale_cleanup_cycle = Time.now @flush_interval = @flush_interval.to_i diff --git a/lib/logstash/outputs/kusto/ingestor.rb b/lib/logstash/outputs/kusto/ingestor.rb index 986a383..a19756f 100755 --- a/lib/logstash/outputs/kusto/ingestor.rb +++ b/lib/logstash/outputs/kusto/ingestor.rb @@ -1,8 +1,9 @@ # encoding: utf-8 - require 'logstash/outputs/base' require 'logstash/namespace' require 'logstash/errors' +require 'logstash/outputs/kusto/kustoLogstashConfiguration' +require 'logstash/outputs/kusto/kustoAadProvider' class LogStash::Outputs::Kusto < LogStash::Outputs::Base ## @@ -18,27 +19,24 @@ class Ingestor fallback_policy: :caller_runs ) LOW_QUEUE_LENGTH = 3 - FIELD_REF = /%\{[^}]+\}/ - def initialize(ingest_url, app_id, app_key, app_tenant, managed_identity_id, database, table, json_mapping, delete_local, proxy_host , proxy_port , proxy_protocol,proxy_aad_only, logger, threadpool = DEFAULT_THREADPOOL) - @workers_pool = threadpool + def initialize(kustoLogstashConfiguration, logger, threadpool = DEFAULT_THREADPOOL) @logger = logger - @kustoLogstashConfiguration = LogStash::Outputs::KustoInternal::KustoLogstashConfiguration.new(ingest_url, app_id, app_key, app_tenant, managed_identity_id, database, table, json_mapping, delete_local, proxy_host , proxy_port , proxy_protocol,proxy_aad_only, logger) - @kustoLogstashConfiguration.validate_config() + @workers_pool = threadpool + @kustoLogstashConfiguration = kustoLogstashConfiguration @logger.info('Preparing Kusto resources.') @ingestion_properties = get_ingestion_properties() if @kustoLogstashConfiguration.proxy_aad_only @kustoAadTokenProvider = LogStash::Outputs::KustoInternal::KustoAadTokenProvider.new(@kustoLogstashConfiguration) end - @delete_local = delete_local @logger.debug('Kusto resources are ready.') end def get_kusto_client() - if @kusto_client.nil? + if @kusto_client.nil? || (@kustoLogstashConfiguration.proxy_aad_only && @kustoAadTokenProvider.is_saved_token_need_refresh()) kusto_client = create_kusto_client() end - return kusto_client + return @kusto_client end def create_kusto_client() @@ -48,15 +46,15 @@ def create_kusto_client() kusto_connection_string = if @kustoLogstashConfiguration.is_managed_identity if @kustoLogstashConfiguration.is_system_assigned_managed_identity @logger.info('Using system managed identity.') - kusto_java.data.auth.ConnectionStringBuilder.createWithAadManagedIdentity(ingest_url) + kusto_java.data.auth.ConnectionStringBuilder.createWithAadManagedIdentity(@kustoLogstashConfiguration.ingest_url) else @logger.info('Using user managed identity.') - kusto_java.data.auth.ConnectionStringBuilder.createWithAadManagedIdentity(ingest_url, managed_identity_id) + kusto_java.data.auth.ConnectionStringBuilder.createWithAadManagedIdentity(@kustoLogstashConfiguration.ingest_url, @kustoLogstashConfiguration.managed_identity_id) end elsif @kustoLogstashConfiguration.proxy_aad_only - kusto_java.data.auth.ConnectionStringBuilder.createWithAccessToken(ingest_url, ) + kusto_java.data.auth.ConnectionStringBuilder.createWithAccessToken(@kustoLogstashConfiguration.ingest_url,@kustoLogstashConfiguration.get_aad_token_bearer()) else - kusto_java.data.auth.ConnectionStringBuilder.createWithAadApplicationCredentials(ingest_url, app_id, app_key.value, app_tenant) + kusto_java.data.auth.ConnectionStringBuilder.createWithAadApplicationCredentials(@kustoLogstashConfiguration.ingest_url, @kustoLogstashConfiguration.app_id, @kustoLogstashConfiguration.app_key.value, @kustoLogstashConfiguration.app_tenant) end # @logger.debug(Gem.loaded_specs.to_s) @@ -70,25 +68,21 @@ def create_kusto_client() kusto_connection_string.setConnectorDetails("Logstash",version_for_tracing.to_s,"","",false,"", tuple_utils.Pair.emptyArray()); @kusto_client = begin - if is_direct_conn + if @kustoLogstashConfiguration.is_direct_conn || @kustoLogstashConfiguration.proxy_aad_only kusto_java.ingest.IngestClientFactory.createClient(kusto_connection_string) else - kusto_http_client_properties = kusto_java.data.HttpClientProperties.builder().proxy(apache_http.HttpHost.new(proxy_host,proxy_port,proxy_protocol)).build() + kusto_http_client_properties = kusto_java.data.HttpClientProperties.builder().proxy(apache_http.HttpHost.new(@kustoLogstashConfiguration.proxy_host,@kustoLogstashConfiguration.proxy_port,@kustoLogstashConfiguration.proxy_protocol)).build() kusto_java.ingest.IngestClientFactory.createClient(kusto_connection_string, kusto_http_client_properties) end end - - - - - end def get_ingestion_properties() + kusto_java = Java::com.microsoft.azure.kusto ingestion_properties = kusto_java.ingest.IngestionProperties.new(@kustoLogstashConfiguration.database, @kustoLogstashConfiguration.table) if @kustoLogstashConfiguration.is_mapping_ref_provided - @logger.debug('Using mapping reference.', json_mapping) - ingestion_properties.setIngestionMapping(json_mapping, kusto_java.ingest.IngestionMapping::IngestionMappingKind::JSON) + @logger.debug('Using mapping reference.', @kustoLogstashConfiguration.json_mapping) + ingestion_properties.setIngestionMapping(@kustoLogstashConfiguration.json_mapping, kusto_java.ingest.IngestionMapping::IngestionMappingKind::JSON) ingestion_properties.setDataFormat(kusto_java.ingest.IngestionProperties::DataFormat::JSON) else @logger.debug('No mapping reference provided. Columns will be mapped by names in the logstash output') @@ -115,7 +109,7 @@ def upload(path, delete_on_success) @logger.debug("Sending file to kusto: #{path}. size: #{file_size}") if file_size > 0 file_source_info = Java::com.microsoft.azure.kusto.ingest.source.FileSourceInfo.new(path, 0); # 0 - let the sdk figure out the size of the file - @kusto_client.ingestFromFile(file_source_info, @ingestion_properties) + get_kusto_client().ingestFromFile(file_source_info, @ingestion_properties) else @logger.warn("File #{path} is an empty file and is not ingested.") end @@ -140,4 +134,4 @@ def stop @workers_pool.wait_for_termination(nil) # block until its done end end -end +end \ No newline at end of file diff --git a/lib/logstash/outputs/kusto/kustoAadProvider.rb b/lib/logstash/outputs/kusto/kustoAadProvider.rb index 5f11f1b..668d931 100644 --- a/lib/logstash/outputs/kusto/kustoAadProvider.rb +++ b/lib/logstash/outputs/kusto/kustoAadProvider.rb @@ -1,10 +1,11 @@ -# encoding: utf - 8 -require "logstash/kusto/kustoLogstashConfiguration" +# encoding:utf-8 require 'rest-client' require 'json' require 'openssl' require 'base64' require 'time' +require 'logstash/outputs/kusto/kustoLogstashConfiguration' + module LogStash module Outputs @@ -19,45 +20,45 @@ def initialize(kustoLogstashConfiguration) @token_request_body = sprintf("client_id=%s&scope=%s&client_secret=%s&grant_type=client_credentials", kustoLogstashConfiguration.app_id, scope, kustoLogstashConfiguration.app_key) @token_request_uri = sprintf("%s/%s/oauth2/v2.0/token", aad_uri, kustoLogstashConfiguration.app_tenant) @token_state = { - : access_token => nil, - : expiry_time => nil, - : token_details_mutex => Mutex.new, - } + :access_token => nil, + :expiry_time => nil, + :token_details_mutex => Mutex.new, + } end # def initialize # Public methods public def get_aad_token_bearer() - @token_state[: token_details_mutex].synchronize - do - if is_saved_token_need_refresh() - refresh_saved_token() - end - return @token_state[: access_token] + @token_state[:token_details_mutex].synchronize do + if is_saved_token_need_refresh() + refresh_saved_token() + end + return @token_state[:access_token] end end # def get_aad_token_bearer - # Private methods - private def is_saved_token_need_refresh() - return @token_state[: access_token].nil ? || @token_state[: expiry_time].nil ? || @token_state[: expiry_time] <= Time.now + return @token_state[:access_token].nil? || @token_state[:expiry_time].nil? || @token_state[:expiry_time] <= Time.now end # def is_saved_token_need_refresh + # Private methods + private + def refresh_saved_token() @logger.info("aad token expired - refreshing token.") token_response = post_token_request() - @token_state[: access_token] = token_response["access_token"] - @token_state[: expiry_time] = get_token_expiry_time(token_response["expires_in"]) + @token_state[:access_token] = token_response["access_token"] + @token_state[:expiry_time] = get_token_expiry_time(token_response["expires_in"]) end # def refresh_saved_token def get_token_expiry_time(expires_in_seconds) - if (expires_in_seconds.nil ? || expires_in_seconds <= 0) + if (expires_in_seconds.nil? || expires_in_seconds <= 0) return Time.now + (60 * 60 * 24) # Refresh anyway in 24 hours else - return Time.now + expires_in_seconds - 1; - # Decrease by 1 second to be on the safe side + return Time.now + expires_in_seconds - 30 * 60; + # Decrease by 30 seconds to be on the safe side end end # def get_token_expiry_time @@ -69,8 +70,8 @@ def post_token_request() begin proxy_aad = sprintf("%s://%s:%s", @kustoLogstashConfiguration.proxy_protocol, @kustoLogstashConfiguration.proxy_host, @kustoLogstashConfiguration.proxy_port) # Post REST request - response = RestClient::Request.execute(method:: post, url: @token_request_uri, payload: @token_request_body, headers: headers, - proxy: proxy_aad) + response = RestClient::Request.execute(method::post, url:@token_request_uri, payload:@token_request_body, headers:headers, + proxy:proxy_aad) if (response.code == 200 || response.code == 201) return JSON.parse(response.body) diff --git a/lib/logstash/outputs/kusto/kustoLogstashConfiguration.rb b/lib/logstash/outputs/kusto/kustoLogstashConfiguration.rb index 4fb8dec..e7d8ed5 100644 --- a/lib/logstash/outputs/kusto/kustoLogstashConfiguration.rb +++ b/lib/logstash/outputs/kusto/kustoLogstashConfiguration.rb @@ -1,6 +1,5 @@ # encoding: utf-8 # A class just having all the configurations wrapped into a seperate object - module LogStash module Outputs module KustoInternal @@ -46,24 +45,30 @@ def validate_config() end if @database =~ FIELD_REF - @logger.error('database config value should not be dynamic.', database) - raise LogStash::ConfigurationError.new('database config value should not be dynamic.') - end + @logger.error('database config value should not be dynamic.', database) + raise LogStash::ConfigurationError.new('database config value should not be dynamic.') + end if @table =~ FIELD_REF - @logger.error('table config value should not be dynamic.', table) - raise LogStash::ConfigurationError.new('table config value should not be dynamic.') + @logger.error('table config value should not be dynamic.', table) + raise LogStash::ConfigurationError.new('table config value should not be dynamic.') end if @json_mapping =~ FIELD_REF - @logger.error('json_mapping config value should not be dynamic.', json_mapping) - raise LogStash::ConfigurationError.new('json_mapping config value should not be dynamic.') + @logger.error('json_mapping config value should not be dynamic.', json_mapping) + raise LogStash::ConfigurationError.new('json_mapping config value should not be dynamic.') end if not(["https", "http"].include? @proxy_protocol) - @logger.error('proxy_protocol has to be http or https.', proxy_protocol) - raise LogStash::ConfigurationError.new('proxy_protocol has to be http or https.') + @logger.error('proxy_protocol has to be http or https.', proxy_protocol) + raise LogStash::ConfigurationError.new('proxy_protocol has to be http or https.') + end + + if @proxy_aad_only && @is_direct_conn + @logger.error('proxy_aad_only is true, but proxy parameters (Host,Port,Protocol) are missing.',proxy_host,proxy_port,proxy_protocol) + raise LogStash::ConfigurationError.new('proxy_aad_only is true, but proxy parameters (Host,Port,Protocol) are missing.') end + # If all validation pass then configuration is valid return true end #validate_config() diff --git a/logstash-output-kusto.gemspec b/logstash-output-kusto.gemspec index af083c1..c586c9d 100755 --- a/logstash-output-kusto.gemspec +++ b/logstash-output-kusto.gemspec @@ -20,6 +20,7 @@ Gem::Specification.new do |s| s.metadata = { "logstash_plugin" => "true", "logstash_group" => "output" } # Gem dependencies + s.add_runtime_dependency "rest-client", ">= 2.1.0" s.add_runtime_dependency 'logstash-core', '>= 8.3.0' s.add_runtime_dependency "logstash-core-plugin-api", ">= 1.60", "<= 2.99" s.add_runtime_dependency 'logstash-codec-json_lines'