From 78d5c704d7124bf6da4f1a374926fdf28860ccbe Mon Sep 17 00:00:00 2001 From: Ramachandran A G Date: Wed, 13 Dec 2023 21:11:24 +0530 Subject: [PATCH] * Start work on refactor --- .gitignore | 1 + lib/logstash/outputs/kusto.rb | 6 +- lib/logstash/outputs/kusto/ingestor.rb | 93 +++++------- .../outputs/kusto/kustoAadProvider.rb | 97 +++++++++++++ .../kusto/kustoLogstashConfiguration.rb | 132 ++++++++++++++++++ spec/outputs/kusto/ingestor_spec.rb | 131 ----------------- .../kusto/kustoLogstashConfiguration_spec.rb | 95 +++++++++++++ 7 files changed, 363 insertions(+), 192 deletions(-) create mode 100644 lib/logstash/outputs/kusto/kustoAadProvider.rb create mode 100644 lib/logstash/outputs/kusto/kustoLogstashConfiguration.rb delete mode 100755 spec/outputs/kusto/ingestor_spec.rb create mode 100755 spec/outputs/kusto/kustoLogstashConfiguration_spec.rb diff --git a/.gitignore b/.gitignore index cd3e886..fc91a8e 100644 --- a/.gitignore +++ b/.gitignore @@ -60,3 +60,4 @@ gradle/wrapper/gradle-wrapper.properties rspec.xml e2e/output_file.txt logs.txt +local-run.sh diff --git a/lib/logstash/outputs/kusto.rb b/lib/logstash/outputs/kusto.rb index dacb442..66962ae 100755 --- a/lib/logstash/outputs/kusto.rb +++ b/lib/logstash/outputs/kusto.rb @@ -118,6 +118,9 @@ class LogStash::Outputs::Kusto < LogStash::Outputs::Base # Check Proxy URL can be over http or https. Dowe need it this way or ignore this & remove this config :proxy_protocol, validate: :string, required: false , default: 'http' + # Use proxy for AAD only. If true, the plugin will use the proxy only for AAD authentication and will not use it for the actual data transfer. + config :proxy_aad_only, validate: :boolean, required: false , default: false + default :codec, 'json_lines' def register @@ -154,7 +157,8 @@ def register max_queue: upload_queue_size, fallback_policy: :caller_runs) - @ingestor = Ingestor.new(ingest_url, app_id, app_key, app_tenant, managed_identity, database, table, final_mapping, delete_temp_files, proxy_host, proxy_port,proxy_protocol, @logger, executor) + @ingestor = Ingestor.new(ingest_url, app_id, app_key, app_tenant, managed_identity, database, table, + final_mapping, delete_temp_files, proxy_host, proxy_port, proxy_protocol, proxy_aad_only, @logger, executor) # send existing files recover_past_files if recovery diff --git a/lib/logstash/outputs/kusto/ingestor.rb b/lib/logstash/outputs/kusto/ingestor.rb index 3a0b4a5..986a383 100755 --- a/lib/logstash/outputs/kusto/ingestor.rb +++ b/lib/logstash/outputs/kusto/ingestor.rb @@ -20,30 +20,41 @@ class Ingestor LOW_QUEUE_LENGTH = 3 FIELD_REF = /%\{[^}]+\}/ - def initialize(ingest_url, app_id, app_key, app_tenant, managed_identity_id, database, table, json_mapping, delete_local, proxy_host , proxy_port , proxy_protocol,logger, threadpool = DEFAULT_THREADPOOL) + def initialize(ingest_url, app_id, app_key, app_tenant, managed_identity_id, database, table, json_mapping, delete_local, proxy_host , proxy_port , proxy_protocol,proxy_aad_only, logger, threadpool = DEFAULT_THREADPOOL) @workers_pool = threadpool @logger = logger - validate_config(database, table, json_mapping,proxy_protocol,app_id, app_key, managed_identity_id) + @kustoLogstashConfiguration = LogStash::Outputs::KustoInternal::KustoLogstashConfiguration.new(ingest_url, app_id, app_key, app_tenant, managed_identity_id, database, table, json_mapping, delete_local, proxy_host , proxy_port , proxy_protocol,proxy_aad_only, logger) + @kustoLogstashConfiguration.validate_config() @logger.info('Preparing Kusto resources.') + @ingestion_properties = get_ingestion_properties() + if @kustoLogstashConfiguration.proxy_aad_only + @kustoAadTokenProvider = LogStash::Outputs::KustoInternal::KustoAadTokenProvider.new(@kustoLogstashConfiguration) + end + @delete_local = delete_local + @logger.debug('Kusto resources are ready.') + end + + def get_kusto_client() + if @kusto_client.nil? + kusto_client = create_kusto_client() + end + return kusto_client + end + def create_kusto_client() kusto_java = Java::com.microsoft.azure.kusto apache_http = Java::org.apache.http - # kusto_connection_string = kusto_java.data.auth.ConnectionStringBuilder.createWithAadApplicationCredentials(ingest_url, app_id, app_key.value, app_tenant) - # If there is managed identity, use it. This means the AppId and AppKey are empty/nil - is_managed_identity = (app_id.nil? && app_key.nil?) - # If it is system managed identity, propagate the system identity - is_system_assigned_managed_identity = is_managed_identity && 0 == "system".casecmp(managed_identity_id) - # Is it direct connection - is_direct_conn = (proxy_host.nil? || proxy_host.empty?) # Create a connection string - kusto_connection_string = if is_managed_identity - if is_system_assigned_managed_identity + kusto_connection_string = if @kustoLogstashConfiguration.is_managed_identity + if @kustoLogstashConfiguration.is_system_assigned_managed_identity @logger.info('Using system managed identity.') kusto_java.data.auth.ConnectionStringBuilder.createWithAadManagedIdentity(ingest_url) else @logger.info('Using user managed identity.') kusto_java.data.auth.ConnectionStringBuilder.createWithAadManagedIdentity(ingest_url, managed_identity_id) end + elsif @kustoLogstashConfiguration.proxy_aad_only + kusto_java.data.auth.ConnectionStringBuilder.createWithAccessToken(ingest_url, ) else kusto_java.data.auth.ConnectionStringBuilder.createWithAadApplicationCredentials(ingest_url, app_id, app_key.value, app_tenant) end @@ -67,45 +78,22 @@ def initialize(ingest_url, app_id, app_key, app_tenant, managed_identity_id, dat end end - @ingestion_properties = kusto_java.ingest.IngestionProperties.new(database, table) - is_mapping_ref_provided = !(json_mapping.nil? || json_mapping.empty?) - if is_mapping_ref_provided - @logger.debug('Using mapping reference.', json_mapping) - @ingestion_properties.setIngestionMapping(json_mapping, kusto_java.ingest.IngestionMapping::IngestionMappingKind::JSON) - @ingestion_properties.setDataFormat(kusto_java.ingest.IngestionProperties::DataFormat::JSON) - else - @logger.debug('No mapping reference provided. Columns will be mapped by names in the logstash output') - end - @delete_local = delete_local - @logger.debug('Kusto resources are ready.') - end - def validate_config(database, table, json_mapping, proxy_protocol, app_id, app_key, managed_identity_id) - # Add an additional validation and fail this upfront - if app_id.nil? && app_key.nil? && managed_identity_id.nil? - @logger.error('managed_identity_id is not provided and app_id/app_key is empty.') - raise LogStash::ConfigurationError.new('managed_identity_id is not provided and app_id/app_key is empty.') - end - if database =~ FIELD_REF - @logger.error('database config value should not be dynamic.', database) - raise LogStash::ConfigurationError.new('database config value should not be dynamic.') - end - if table =~ FIELD_REF - @logger.error('table config value should not be dynamic.', table) - raise LogStash::ConfigurationError.new('table config value should not be dynamic.') - end - if json_mapping =~ FIELD_REF - @logger.error('json_mapping config value should not be dynamic.', json_mapping) - raise LogStash::ConfigurationError.new('json_mapping config value should not be dynamic.') - end - if not(["https", "http"].include? proxy_protocol) - @logger.error('proxy_protocol has to be http or https.', proxy_protocol) - raise LogStash::ConfigurationError.new('proxy_protocol has to be http or https.') - end + end + def get_ingestion_properties() + ingestion_properties = kusto_java.ingest.IngestionProperties.new(@kustoLogstashConfiguration.database, @kustoLogstashConfiguration.table) + if @kustoLogstashConfiguration.is_mapping_ref_provided + @logger.debug('Using mapping reference.', json_mapping) + ingestion_properties.setIngestionMapping(json_mapping, kusto_java.ingest.IngestionMapping::IngestionMappingKind::JSON) + ingestion_properties.setDataFormat(kusto_java.ingest.IngestionProperties::DataFormat::JSON) + else + @logger.debug('No mapping reference provided. Columns will be mapped by names in the logstash output') + end + return ingestion_properties end def upload_async(path, delete_on_success) @@ -125,21 +113,6 @@ def upload_async(path, delete_on_success) def upload(path, delete_on_success) file_size = File.size(path) @logger.debug("Sending file to kusto: #{path}. size: #{file_size}") - - # TODO: dynamic routing - # file_metadata = path.partition('.kusto.').last - # file_metadata_parts = file_metadata.split('.') - - # if file_metadata_parts.length == 3 - # # this is the number we expect - database, table, json_mapping - # database = file_metadata_parts[0] - # table = file_metadata_parts[1] - # json_mapping = file_metadata_parts[2] - - # local_ingestion_properties = Java::KustoIngestionProperties.new(database, table) - # local_ingestion_properties.addJsonMappingName(json_mapping) - # end - if file_size > 0 file_source_info = Java::com.microsoft.azure.kusto.ingest.source.FileSourceInfo.new(path, 0); # 0 - let the sdk figure out the size of the file @kusto_client.ingestFromFile(file_source_info, @ingestion_properties) diff --git a/lib/logstash/outputs/kusto/kustoAadProvider.rb b/lib/logstash/outputs/kusto/kustoAadProvider.rb new file mode 100644 index 0000000..5f11f1b --- /dev/null +++ b/lib/logstash/outputs/kusto/kustoAadProvider.rb @@ -0,0 +1,97 @@ +# encoding: utf - 8 +require "logstash/kusto/kustoLogstashConfiguration" +require 'rest-client' +require 'json' +require 'openssl' +require 'base64' +require 'time' + +module LogStash + module Outputs + module KustoInternal + class KustoAadTokenProvider + def initialize(kustoLogstashConfiguration) + @kustoLogstashConfiguration = kustoLogstashConfiguration + # Perform the auth initialization + scope = CGI.escape(sprintf("%s/.default",kustoLogstashConfiguration.ingest_url)) + @logger = logger + @aad_uri = "https://login.microsoftonline.com" + @token_request_body = sprintf("client_id=%s&scope=%s&client_secret=%s&grant_type=client_credentials", kustoLogstashConfiguration.app_id, scope, kustoLogstashConfiguration.app_key) + @token_request_uri = sprintf("%s/%s/oauth2/v2.0/token", aad_uri, kustoLogstashConfiguration.app_tenant) + @token_state = { + : access_token => nil, + : expiry_time => nil, + : token_details_mutex => Mutex.new, + } + end # def initialize + + # Public methods + public + + def get_aad_token_bearer() + @token_state[: token_details_mutex].synchronize + do + if is_saved_token_need_refresh() + refresh_saved_token() + end + return @token_state[: access_token] + end + end # def get_aad_token_bearer + + # Private methods + private + + def is_saved_token_need_refresh() + return @token_state[: access_token].nil ? || @token_state[: expiry_time].nil ? || @token_state[: expiry_time] <= Time.now + end # def is_saved_token_need_refresh + + def refresh_saved_token() + @logger.info("aad token expired - refreshing token.") + token_response = post_token_request() + @token_state[: access_token] = token_response["access_token"] + @token_state[: expiry_time] = get_token_expiry_time(token_response["expires_in"]) + end # def refresh_saved_token + + def get_token_expiry_time(expires_in_seconds) + if (expires_in_seconds.nil ? || expires_in_seconds <= 0) + return Time.now + (60 * 60 * 24) # Refresh anyway in 24 hours + else + return Time.now + expires_in_seconds - 1; + # Decrease by 1 second to be on the safe side + end + end # def get_token_expiry_time + + # Post the given json to Azure Loganalytics + def post_token_request() + # Create REST request header + headers = get_header() + while true + begin + proxy_aad = sprintf("%s://%s:%s", @kustoLogstashConfiguration.proxy_protocol, @kustoLogstashConfiguration.proxy_host, @kustoLogstashConfiguration.proxy_port) + # Post REST request + response = RestClient::Request.execute(method:: post, url: @token_request_uri, payload: @token_request_body, headers: headers, + proxy: proxy_aad) + + if (response.code == 200 || response.code == 201) + return JSON.parse(response.body) + end + rescue RestClient::ExceptionWithResponse => ewr + @logger.error("Exception while authenticating with AAD API ['#{ewr.response}']") + rescue Exception => ex + @logger.trace("Exception while authenticating with AAD API ['#{ex}']") + end + @logger.error("Error while authenticating with AAD ('#{@aad_uri}'), retrying in 10 seconds.") + sleep 10 + end + end # def post_token_request + # Create a header + def get_header() + return { + 'Content-Type' => 'application/x-www-form-urlencoded', + } + end # def get_header + end # class KustoAadTokenProvider + end # module Kusto + end # module Outputs +end # module LogStash + diff --git a/lib/logstash/outputs/kusto/kustoLogstashConfiguration.rb b/lib/logstash/outputs/kusto/kustoLogstashConfiguration.rb new file mode 100644 index 0000000..4fb8dec --- /dev/null +++ b/lib/logstash/outputs/kusto/kustoLogstashConfiguration.rb @@ -0,0 +1,132 @@ +# encoding: utf-8 +# A class just having all the configurations wrapped into a seperate object + +module LogStash + module Outputs + module KustoInternal + class KustoLogstashConfiguration + FIELD_REF = /%\{[^}]+\}/ + def initialize(ingest_url, app_id, app_key, app_tenant, managed_identity_id, database, table, json_mapping, delete_local, proxy_host , proxy_port , proxy_protocol, proxy_aad_only, logger) + @logger = logger + # For ingestion + @ingest_url = ingest_url + @database = database + @table = table + @json_mapping = json_mapping + @is_mapping_ref_provided = !(json_mapping.nil? || json_mapping.empty?) + # Authentication configuration + @app_id = app_id + @app_key = app_key + @app_tenant = app_tenant + @managed_identity_id = managed_identity_id + # proxy configuration + @proxy_host = proxy_host + @proxy_port = proxy_port + @proxy_protocol = proxy_protocol + @proxy_aad_only = proxy_aad_only + # Fields that are derived from the configuration + @is_managed_identity = app_id.to_s.empty? && app_key.to_s.empty? + # If it is system managed identity, propagate the system identity + @is_system_assigned_managed_identity = is_managed_identity && 0 == "system".casecmp(managed_identity_id) + # Is it direct connection + @is_direct_conn = (proxy_host.nil? || proxy_host.empty?) + @logger.info("Kusto configuration initialized.") + end # def initialize + + def validate_config() + # Add an additional validation and fail this upfront + if @app_id.to_s.empty? && @app_key.to_s.empty? && @managed_identity_id.to_s.empty? + @logger.error('managed_identity_id is not provided and app_id/app_key is empty.') + raise LogStash::ConfigurationError.new('managed_identity_id is not provided and app_id/app_key is empty.') + end + # If proxy AAD is required and the proxy configuration is not provided - fail + if @proxy_aad_only && @is_direct_conn + @logger.error('proxy_aad_only can be used only when proxy is configured.', proxy_aad_only) + raise LogStash::ConfigurationError.new('proxy_aad_only can be used only when proxy is configured.') + end + + if @database =~ FIELD_REF + @logger.error('database config value should not be dynamic.', database) + raise LogStash::ConfigurationError.new('database config value should not be dynamic.') + end + + if @table =~ FIELD_REF + @logger.error('table config value should not be dynamic.', table) + raise LogStash::ConfigurationError.new('table config value should not be dynamic.') + end + + if @json_mapping =~ FIELD_REF + @logger.error('json_mapping config value should not be dynamic.', json_mapping) + raise LogStash::ConfigurationError.new('json_mapping config value should not be dynamic.') + end + + if not(["https", "http"].include? @proxy_protocol) + @logger.error('proxy_protocol has to be http or https.', proxy_protocol) + raise LogStash::ConfigurationError.new('proxy_protocol has to be http or https.') + end + # If all validation pass then configuration is valid + return true + end #validate_config() + + # Getters for all the attributes defined in this class + def ingest_url + @ingest_url + end + def database + @database + end + def table + @table + end + def json_mapping + @json_mapping + end + def is_mapping_ref_provided + @is_mapping_ref_provided + end + # Authentication configuration + def app_id + @app_id + end + def app_key + @app_key + end + def app_tenant + @app_tenant + end + def managed_identity_id + @managed_identity_id + end + + def is_managed_identity + @is_managed_identity + end + + def is_system_assigned_managed_identity + @is_system_assigned_managed_identity + end + + # proxy configuration + def proxy_host + @proxy_host + end + + def proxy_port + @proxy_port + end + + def proxy_protocol + @proxy_protocol + end + + def proxy_aad_only + @proxy_aad_only + end + + def is_direct_conn + @is_direct_conn + end + end # class KustoLogstashConfiguration + end # module Kusto + end # module Outputs +end # module LogStash \ No newline at end of file diff --git a/spec/outputs/kusto/ingestor_spec.rb b/spec/outputs/kusto/ingestor_spec.rb deleted file mode 100755 index cc0263c..0000000 --- a/spec/outputs/kusto/ingestor_spec.rb +++ /dev/null @@ -1,131 +0,0 @@ -# encoding: utf-8 -require_relative "../../spec_helpers.rb" -require 'logstash/outputs/kusto' -require 'logstash/outputs/kusto/ingestor' - -describe LogStash::Outputs::Kusto::Ingestor do - - let(:ingest_url) { "https://ingest-sdkse2etest.eastus.kusto.windows.net/" } - let(:app_id) { "myid" } - let(:app_key) { LogStash::Util::Password.new("mykey") } - let(:app_tenant) { "mytenant" } - let(:managed_identity) { "managed_identity" } - let(:database) { "mydatabase" } - let(:table) { "mytable" } - let(:proxy_host) { "localhost" } - let(:proxy_port) { 80 } - let(:proxy_protocol) { "http" } - let(:json_mapping) { "mymapping" } - let(:delete_local) { false } - let(:logger) { spy('logger') } - - describe '#initialize' do - - it 'does not throw an error when initializing' do - # note that this will cause an internal error since connection is being tried. - # however we still want to test that all the java stuff is working as expected - expect { - ingestor = described_class.new(ingest_url, app_id, app_key, app_tenant, managed_identity, database, table, json_mapping, delete_local, proxy_host, proxy_port,proxy_protocol, logger) - ingestor.stop - }.not_to raise_error - end - - dynamic_name_array = ['/a%{name}/', '/a %{name}/', '/a- %{name}/', '/a- %{name}'] - - context 'doesnt allow database to have some dynamic part' do - dynamic_name_array.each do |test_database| - it "with database: #{test_database}" do - expect { - ingestor = described_class.new(ingest_url, app_id, app_key, app_tenant, managed_identity, test_database, table, json_mapping, delete_local, proxy_host, proxy_port,proxy_protocol,logger) - ingestor.stop - }.to raise_error(LogStash::ConfigurationError) - end - end - end - - context 'doesnt allow table to have some dynamic part' do - dynamic_name_array.each do |test_table| - it "with database: #{test_table}" do - expect { - ingestor = described_class.new(ingest_url, app_id, app_key, app_tenant, managed_identity,database, test_table, json_mapping, delete_local, proxy_host, proxy_port,proxy_protocol,logger) - ingestor.stop - }.to raise_error(LogStash::ConfigurationError) - end - end - end - - context 'doesnt allow mapping to have some dynamic part' do - dynamic_name_array.each do |json_mapping| - it "with database: #{json_mapping}" do - expect { - ingestor = described_class.new(ingest_url, app_id, app_key, app_tenant, managed_identity,database, table, json_mapping, delete_local, proxy_host, proxy_port,proxy_protocol,logger) - ingestor.stop - }.to raise_error(LogStash::ConfigurationError) - end - end - end - - context 'proxy protocol has to be http or https' do - it "with proxy protocol: socks" do - expect { - ingestor = described_class.new(ingest_url, app_id, app_key, app_tenant, managed_identity,database, table, json_mapping, delete_local, proxy_host, proxy_port,'socks',logger) - ingestor.stop - }.to raise_error(LogStash::ConfigurationError) - end - end - - context 'one of appid or managedid has to be provided' do - it "with empty managed identity and appid" do - expect { - ingestor = described_class.new(ingest_url, "", app_key, app_tenant, "",database, table, json_mapping, delete_local, proxy_host, proxy_port,'socks',logger) - ingestor.stop - }.to raise_error(LogStash::ConfigurationError) - end - end - - end - - # describe 'receiving events' do - - # context 'with non-zero flush interval' do - # let(:temporary_output_file) { Stud::Temporary.pathname } - - # let(:event_count) { 100 } - # let(:flush_interval) { 5 } - - # let(:events) do - # event_count.times.map do |idx| - # LogStash::Event.new('subject' => idx) - # end - # end - - # let(:output) { described_class.new(options.merge( {'path' => temporary_output_file, 'flush_interval' => flush_interval, 'delete_temp_files' => false } )) } - - # before(:each) { output.register } - - # after(:each) do - # output.close - # File.exist?(temporary_output_file) && File.unlink(temporary_output_file) - # File.exist?(temporary_output_file + '.kusto') && File.unlink(temporary_output_file + '.kusto') - # end - - # it 'eventually flushes without receiving additional events' do - # output.multi_receive_encoded(events) - - # # events should not all be flushed just yet... - # expect(File.read(temporary_output_file)).to satisfy("have less than #{event_count} lines") do |contents| - # contents && contents.lines.count < event_count - # end - - # # wait for the flusher to run... - # sleep(flush_interval + 1) - - # # events should all be flushed - # expect(File.read(temporary_output_file)).to satisfy("have exactly #{event_count} lines") do |contents| - # contents && contents.lines.count == event_count - # end - # end - # end - - # end -end diff --git a/spec/outputs/kusto/kustoLogstashConfiguration_spec.rb b/spec/outputs/kusto/kustoLogstashConfiguration_spec.rb new file mode 100755 index 0000000..70a6f0e --- /dev/null +++ b/spec/outputs/kusto/kustoLogstashConfiguration_spec.rb @@ -0,0 +1,95 @@ +# encoding: utf-8 +require_relative "../../spec_helpers.rb" +require 'logstash/outputs/kusto' +require 'logstash/outputs/kusto/kustoLogstashConfiguration' + +describe LogStash::Outputs::KustoInternal::KustoLogstashConfiguration do + + let(:ingest_url) { "https://ingest-sdkse2etest.eastus.kusto.windows.net/" } + let(:app_id) { "myid" } + let(:app_key) { LogStash::Util::Password.new("mykey") } + let(:app_tenant) { "mytenant" } + let(:managed_identity) { "managed_identity" } + let(:database) { "mydatabase" } + let(:table) { "mytable" } + let(:proxy_host) { "localhost" } + let(:proxy_port) { 80 } + let(:proxy_protocol) { "http" } + let(:json_mapping) { "mymapping" } + let(:delete_local) { false } + let(:logger) { spy(:logger) } + let(:proxy_aad_only) { false } + + describe '#initialize' do + it 'does not throw an error when initializing' do + # note that this will cause an internal error since connection is being tried. + # however we still want to test that all the java stuff is working as expected + expect { + kustoLogstashOutputConfiguration = described_class.new(ingest_url, app_id, app_key, app_tenant, managed_identity, database, table, json_mapping, delete_local, proxy_host , proxy_port , proxy_protocol, proxy_aad_only, logger) + kustoLogstashOutputConfiguration.validate_config() + }.not_to raise_error + end + + dynamic_name_array = ['/a%{name}/', '/a %{name}/', '/a- %{name}/', '/a- %{name}'] + + context 'doesnt allow database to have some dynamic part' do + dynamic_name_array.each do |test_database| + it "with database: #{test_database}" do + expect { + kustoLogstashOutputConfiguration = described_class.new(ingest_url, app_id, app_key, app_tenant, managed_identity, test_database, table, json_mapping, delete_local, proxy_host, proxy_port,proxy_protocol, proxy_aad_only,logger) + kustoLogstashOutputConfiguration.validate_config() + }.to raise_error(LogStash::ConfigurationError) + end + end + end + + context 'doesnt allow table to have some dynamic part' do + dynamic_name_array.each do |test_table| + it "with database: #{test_table}" do + expect { + kustoLogstashOutputConfiguration = described_class.new(ingest_url, app_id, app_key, app_tenant, managed_identity,database, test_table, json_mapping, delete_local, proxy_host, proxy_port,proxy_protocol, proxy_aad_only,logger) + kustoLogstashOutputConfiguration.validate_config() + }.to raise_error(LogStash::ConfigurationError) + end + end + end + + context 'doesnt allow mapping to have some dynamic part' do + dynamic_name_array.each do |json_mapping| + it "with database: #{json_mapping}" do + expect { + kustoLogstashOutputConfiguration = described_class.new(ingest_url, app_id, app_key, app_tenant, managed_identity,database, table, json_mapping, delete_local, proxy_host, proxy_port,proxy_protocol, proxy_aad_only,logger) + kustoLogstashOutputConfiguration.validate_config() + }.to raise_error(LogStash::ConfigurationError) + end + end + end + + context 'proxy protocol has to be http or https' do + it "with proxy protocol: socks" do + expect { + kustoLogstashOutputConfiguration = described_class.new(ingest_url, app_id, app_key, app_tenant, managed_identity,database, table, json_mapping, delete_local, proxy_host, proxy_port,'socks', proxy_aad_only,logger) + kustoLogstashOutputConfiguration.validate_config() + }.to raise_error(LogStash::ConfigurationError) + end + end + + context 'one of appid or managedid has to be provided' do + it "with empty managed identity and appid" do + expect { + kustoLogstashOutputConfiguration = described_class.new(ingest_url, "", app_key, app_tenant, "",database, table, json_mapping, delete_local, proxy_host, proxy_port,'socks', proxy_aad_only,logger) + kustoLogstashOutputConfiguration.validate_config() + }.to raise_error(LogStash::ConfigurationError) + end + end + + context 'if proxy_aad is provided' do + it "proxy details should be provided" do + expect { + kustoLogstashOutputConfiguration = described_class.new(ingest_url, "", app_key, app_tenant, "",database, table, json_mapping, delete_local, nil, nil,'https', true,logger) + kustoLogstashOutputConfiguration.validate_config() + }.to raise_error(LogStash::ConfigurationError) + end + end + end +end