From 4c7b32402e3d5d8dda090954d784586b32fc15da Mon Sep 17 00:00:00 2001 From: Ramachandran A G Date: Wed, 11 Oct 2023 12:28:32 +0530 Subject: [PATCH] * Attempt to address issue #50 with managed identity --- README.md | 3 ++- lib/logstash/outputs/kusto.rb | 8 +++--- lib/logstash/outputs/kusto/ingestor.rb | 35 ++++++++++++++++++++++---- spec/outputs/kusto/ingestor_spec.rb | 20 +++++++++++---- 4 files changed, 52 insertions(+), 14 deletions(-) diff --git a/README.md b/README.md index d03df7e..a4e9765 100755 --- a/README.md +++ b/README.md @@ -53,7 +53,8 @@ More information about configuring Logstash can be found in the [logstash config | --- | --- | --- | | **path** | The plugin writes events to temporary files before sending them to ADX. This parameter includes a path where files should be written and a time expression for file rotation to trigger an upload to the ADX service. The example above shows how to rotate the files every minute and check the Logstash docs for more information on time expressions. | Required | **ingest_url** | The Kusto endpoint for ingestion-related communication. See it on the Azure Portal.| Required| -| **app_id, app_key, app_tenant**| Credentials required to connect to the ADX service. Be sure to use an application with 'ingest' privileges. | Required| +| **app_id, app_key, app_tenant**| Credentials required to connect to the ADX service. Be sure to use an application with 'ingest' privileges. | Optional| +| **managed_identity**| Managed identity id (ClientId GUID in case of User managed identity or system in case of system managed identity). The id needs to have 'ingest' privileges | Optional| | **database**| Database name to place events | Required | | **table** | Target table name to place events | Required | **json_mapping** | Maps each attribute from incoming event JSON strings to the appropriate column in the table. Note that this must be in JSON format, as this is the interface between Logstash and Kusto | Required | diff --git a/lib/logstash/outputs/kusto.rb b/lib/logstash/outputs/kusto.rb index a22e203..298a71f 100755 --- a/lib/logstash/outputs/kusto.rb +++ b/lib/logstash/outputs/kusto.rb @@ -73,11 +73,13 @@ class LogStash::Outputs::Kusto < LogStash::Outputs::Base # The following are the credentails used to connect to the Kusto service # application id - config :app_id, validate: :string, required: true + config :app_id, validate: :string, required: false # application key (secret) - config :app_key, validate: :password, required: true + config :app_key, validate: :password, required: false # aad tenant id config :app_tenant, validate: :string, default: nil + # managed identity id + config :managed_identity, validate: :string, default: nil # The following are the data settings that impact where events are written to # Database name @@ -150,7 +152,7 @@ def register max_queue: upload_queue_size, fallback_policy: :caller_runs) - @ingestor = Ingestor.new(ingest_url, app_id, app_key, app_tenant, database, table, final_mapping, delete_temp_files, proxy_host, proxy_port,proxy_protocol, @logger, executor) + @ingestor = Ingestor.new(ingest_url, app_id, app_key, app_tenant, managed_identity, database, table, final_mapping, delete_temp_files, proxy_host, proxy_port,proxy_protocol, @logger, executor) # send existing files recover_past_files if recovery diff --git a/lib/logstash/outputs/kusto/ingestor.rb b/lib/logstash/outputs/kusto/ingestor.rb index 8e0e5ad..00710c7 100755 --- a/lib/logstash/outputs/kusto/ingestor.rb +++ b/lib/logstash/outputs/kusto/ingestor.rb @@ -20,15 +20,35 @@ class Ingestor LOW_QUEUE_LENGTH = 3 FIELD_REF = /%\{[^}]+\}/ - def initialize(ingest_url, app_id, app_key, app_tenant, database, table, json_mapping, delete_local, proxy_host , proxy_port , proxy_protocol,logger, threadpool = DEFAULT_THREADPOOL) + def initialize(ingest_url, app_id, app_key, app_tenant, managed_identity_id, database, table, json_mapping, delete_local, proxy_host , proxy_port , proxy_protocol,logger, threadpool = DEFAULT_THREADPOOL) @workers_pool = threadpool @logger = logger - validate_config(database, table, json_mapping,proxy_protocol) + validate_config(database, table, json_mapping,proxy_protocol,app_id, app_key, managed_identity_id) @logger.info('Preparing Kusto resources.') kusto_java = Java::com.microsoft.azure.kusto apache_http = Java::org.apache.http - kusto_connection_string = kusto_java.data.auth.ConnectionStringBuilder.createWithAadApplicationCredentials(ingest_url, app_id, app_key.value, app_tenant) + # kusto_connection_string = kusto_java.data.auth.ConnectionStringBuilder.createWithAadApplicationCredentials(ingest_url, app_id, app_key.value, app_tenant) + # If there is managed identity, use it. This means the AppId and AppKey are empty/nil + is_managed_identity = (app_id.nil? && app_key.empty?) + # If it is system managed identity, propagate the system identity + is_system_assigned_managed_identity = is_managed_identity && 0 == "system".casecmp(managed_identity_id) + # Is it direct connection + is_direct_conn = (proxy_host.nil? || proxy_host.empty?) + # Create a connection string + kusto_connection_string = begin + if is_managed_identity + if is_system_assigned_managed_identity + @logger.info('Using system managed identity.') + kusto_java.data.auth.ConnectionStringBuilder.createWithAadManagedIdentity(ingest_url) + else + @logger.info('Using user managed identity.') + kusto_java.data.auth.ConnectionStringBuilder.createWithAadManagedIdentity(ingest_url, managed_identity_id) + end + else + kusto_java.data.auth.ConnectionStringBuilder.createWithAadApplicationCredentials(ingest_url, app_id, app_key.value, app_tenant) + end + end # @logger.debug(Gem.loaded_specs.to_s) # Unfortunately there's no way to avoid using the gem/plugin name directly... @@ -41,7 +61,7 @@ def initialize(ingest_url, app_id, app_key, app_tenant, database, table, json_ma kusto_connection_string.setConnectorDetails("Logstash",version_for_tracing.to_s,"","",false,"", tuple_utils.Pair.emptyArray()); @kusto_client = begin - if proxy_host.nil? || proxy_host.empty? + if is_direct_conn kusto_java.ingest.IngestClientFactory.createClient(kusto_connection_string) else kusto_http_client_properties = kusto_java.data.HttpClientProperties.builder().proxy(apache_http.HttpHost.new(proxy_host,proxy_port,proxy_protocol)).build() @@ -57,7 +77,12 @@ def initialize(ingest_url, app_id, app_key, app_tenant, database, table, json_ma @logger.debug('Kusto resources are ready.') end - def validate_config(database, table, json_mapping,proxy_protocol) + def validate_config(database, table, json_mapping, proxy_protocol, app_id, app_key, managed_identity_id) + # Add an additional validation and fail this upfront + if app_id.nil? && app_key.empty? && managed_identity_id.empty? + @logger.error('managed_identity_id is not provided and app_id/app_key is empty.') + raise LogStash::ConfigurationError.new('managed_identity_id is not provided and app_id/app_key is empty.') + end if database =~ FIELD_REF @logger.error('database config value should not be dynamic.', database) raise LogStash::ConfigurationError.new('database config value should not be dynamic.') diff --git a/spec/outputs/kusto/ingestor_spec.rb b/spec/outputs/kusto/ingestor_spec.rb index 5f91e69..cc0263c 100755 --- a/spec/outputs/kusto/ingestor_spec.rb +++ b/spec/outputs/kusto/ingestor_spec.rb @@ -9,6 +9,7 @@ let(:app_id) { "myid" } let(:app_key) { LogStash::Util::Password.new("mykey") } let(:app_tenant) { "mytenant" } + let(:managed_identity) { "managed_identity" } let(:database) { "mydatabase" } let(:table) { "mytable" } let(:proxy_host) { "localhost" } @@ -24,7 +25,7 @@ # note that this will cause an internal error since connection is being tried. # however we still want to test that all the java stuff is working as expected expect { - ingestor = described_class.new(ingest_url, app_id, app_key, app_tenant, database, table, json_mapping, delete_local, proxy_host, proxy_port,proxy_protocol, logger) + ingestor = described_class.new(ingest_url, app_id, app_key, app_tenant, managed_identity, database, table, json_mapping, delete_local, proxy_host, proxy_port,proxy_protocol, logger) ingestor.stop }.not_to raise_error end @@ -35,7 +36,7 @@ dynamic_name_array.each do |test_database| it "with database: #{test_database}" do expect { - ingestor = described_class.new(ingest_url, app_id, app_key, app_tenant, test_database, table, json_mapping, delete_local, proxy_host, proxy_port,proxy_protocol,logger) + ingestor = described_class.new(ingest_url, app_id, app_key, app_tenant, managed_identity, test_database, table, json_mapping, delete_local, proxy_host, proxy_port,proxy_protocol,logger) ingestor.stop }.to raise_error(LogStash::ConfigurationError) end @@ -46,7 +47,7 @@ dynamic_name_array.each do |test_table| it "with database: #{test_table}" do expect { - ingestor = described_class.new(ingest_url, app_id, app_key, app_tenant, database, test_table, json_mapping, delete_local, proxy_host, proxy_port,proxy_protocol,logger) + ingestor = described_class.new(ingest_url, app_id, app_key, app_tenant, managed_identity,database, test_table, json_mapping, delete_local, proxy_host, proxy_port,proxy_protocol,logger) ingestor.stop }.to raise_error(LogStash::ConfigurationError) end @@ -57,7 +58,7 @@ dynamic_name_array.each do |json_mapping| it "with database: #{json_mapping}" do expect { - ingestor = described_class.new(ingest_url, app_id, app_key, app_tenant, database, table, json_mapping, delete_local, proxy_host, proxy_port,proxy_protocol,logger) + ingestor = described_class.new(ingest_url, app_id, app_key, app_tenant, managed_identity,database, table, json_mapping, delete_local, proxy_host, proxy_port,proxy_protocol,logger) ingestor.stop }.to raise_error(LogStash::ConfigurationError) end @@ -67,7 +68,16 @@ context 'proxy protocol has to be http or https' do it "with proxy protocol: socks" do expect { - ingestor = described_class.new(ingest_url, app_id, app_key, app_tenant, database, table, json_mapping, delete_local, proxy_host, proxy_port,'socks',logger) + ingestor = described_class.new(ingest_url, app_id, app_key, app_tenant, managed_identity,database, table, json_mapping, delete_local, proxy_host, proxy_port,'socks',logger) + ingestor.stop + }.to raise_error(LogStash::ConfigurationError) + end + end + + context 'one of appid or managedid has to be provided' do + it "with empty managed identity and appid" do + expect { + ingestor = described_class.new(ingest_url, "", app_key, app_tenant, "",database, table, json_mapping, delete_local, proxy_host, proxy_port,'socks',logger) ingestor.stop }.to raise_error(LogStash::ConfigurationError) end