Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/support managed identity #57

Merged
merged 3 commits into from
Oct 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ More information about configuring Logstash can be found in the [logstash config
| --- | --- | --- |
| **path** | The plugin writes events to temporary files before sending them to ADX. This parameter includes a path where files should be written and a time expression for file rotation to trigger an upload to the ADX service. The example above shows how to rotate the files every minute and check the Logstash docs for more information on time expressions. | Required
| **ingest_url** | The Kusto endpoint for ingestion-related communication. See it on the Azure Portal.| Required|
| **app_id, app_key, app_tenant**| Credentials required to connect to the ADX service. Be sure to use an application with 'ingest' privileges. | Required|
| **app_id, app_key, app_tenant**| Credentials required to connect to the ADX service. Be sure to use an application with 'ingest' privileges. | Optional|
| **managed_identity**| Managed Identity to authenticate. For user-based managed ID, use the Client ID GUID. For system-based, use the value `system`. The ID needs to have 'ingest' privileges on the cluster. | Optional|
| **database**| Database name to place events | Required |
| **table** | Target table name to place events | Required
| **json_mapping** | Maps each attribute from incoming event JSON strings to the appropriate column in the table. Note that this must be in JSON format, as this is the interface between Logstash and Kusto | Required |
Expand Down
8 changes: 5 additions & 3 deletions lib/logstash/outputs/kusto.rb
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,13 @@ class LogStash::Outputs::Kusto < LogStash::Outputs::Base

# The following are the credentails used to connect to the Kusto service
# application id
config :app_id, validate: :string, required: true
config :app_id, validate: :string, required: false
# application key (secret)
config :app_key, validate: :password, required: true
config :app_key, validate: :password, required: false
# aad tenant id
config :app_tenant, validate: :string, default: nil
# managed identity id
config :managed_identity, validate: :string, default: nil

# The following are the data settings that impact where events are written to
# Database name
Expand Down Expand Up @@ -150,7 +152,7 @@ def register
max_queue: upload_queue_size,
fallback_policy: :caller_runs)

@ingestor = Ingestor.new(ingest_url, app_id, app_key, app_tenant, database, table, final_mapping, delete_temp_files, proxy_host, proxy_port,proxy_protocol, @logger, executor)
@ingestor = Ingestor.new(ingest_url, app_id, app_key, app_tenant, managed_identity, database, table, final_mapping, delete_temp_files, proxy_host, proxy_port,proxy_protocol, @logger, executor)

# send existing files
recover_past_files if recovery
Expand Down
34 changes: 29 additions & 5 deletions lib/logstash/outputs/kusto/ingestor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,34 @@ class Ingestor
LOW_QUEUE_LENGTH = 3
FIELD_REF = /%\{[^}]+\}/

def initialize(ingest_url, app_id, app_key, app_tenant, database, table, json_mapping, delete_local, proxy_host , proxy_port , proxy_protocol,logger, threadpool = DEFAULT_THREADPOOL)
def initialize(ingest_url, app_id, app_key, app_tenant, managed_identity_id, database, table, json_mapping, delete_local, proxy_host , proxy_port , proxy_protocol,logger, threadpool = DEFAULT_THREADPOOL)
@workers_pool = threadpool
@logger = logger
validate_config(database, table, json_mapping,proxy_protocol)
validate_config(database, table, json_mapping,proxy_protocol,app_id, app_key, managed_identity_id)
@logger.info('Preparing Kusto resources.')

kusto_java = Java::com.microsoft.azure.kusto
apache_http = Java::org.apache.http
kusto_connection_string = kusto_java.data.auth.ConnectionStringBuilder.createWithAadApplicationCredentials(ingest_url, app_id, app_key.value, app_tenant)
# kusto_connection_string = kusto_java.data.auth.ConnectionStringBuilder.createWithAadApplicationCredentials(ingest_url, app_id, app_key.value, app_tenant)
# If there is managed identity, use it. This means the AppId and AppKey are empty/nil
is_managed_identity = (app_id.nil? && app_key.empty?)
# If it is system managed identity, propagate the system identity
is_system_assigned_managed_identity = is_managed_identity && 0 == "system".casecmp(managed_identity_id)
# Is it direct connection
is_direct_conn = (proxy_host.nil? || proxy_host.empty?)
# Create a connection string
kusto_connection_string = if is_managed_identity
if is_system_assigned_managed_identity
@logger.info('Using system managed identity.')
kusto_java.data.auth.ConnectionStringBuilder.createWithAadManagedIdentity(ingest_url)
else
@logger.info('Using user managed identity.')
kusto_java.data.auth.ConnectionStringBuilder.createWithAadManagedIdentity(ingest_url, managed_identity_id)
end
else
kusto_java.data.auth.ConnectionStringBuilder.createWithAadApplicationCredentials(ingest_url, app_id, app_key.value, app_tenant)
end

#
@logger.debug(Gem.loaded_specs.to_s)
# Unfortunately there's no way to avoid using the gem/plugin name directly...
Expand All @@ -41,7 +60,7 @@ def initialize(ingest_url, app_id, app_key, app_tenant, database, table, json_ma
kusto_connection_string.setConnectorDetails("Logstash",version_for_tracing.to_s,"","",false,"", tuple_utils.Pair.emptyArray());

@kusto_client = begin
if proxy_host.nil? || proxy_host.empty?
if is_direct_conn
kusto_java.ingest.IngestClientFactory.createClient(kusto_connection_string)
else
kusto_http_client_properties = kusto_java.data.HttpClientProperties.builder().proxy(apache_http.HttpHost.new(proxy_host,proxy_port,proxy_protocol)).build()
Expand All @@ -57,7 +76,12 @@ def initialize(ingest_url, app_id, app_key, app_tenant, database, table, json_ma
@logger.debug('Kusto resources are ready.')
end

def validate_config(database, table, json_mapping,proxy_protocol)
def validate_config(database, table, json_mapping, proxy_protocol, app_id, app_key, managed_identity_id)
# Add an additional validation and fail this upfront
if app_id.nil? && app_key.empty? && managed_identity_id.empty?
@logger.error('managed_identity_id is not provided and app_id/app_key is empty.')
raise LogStash::ConfigurationError.new('managed_identity_id is not provided and app_id/app_key is empty.')
end
if database =~ FIELD_REF
@logger.error('database config value should not be dynamic.', database)
raise LogStash::ConfigurationError.new('database config value should not be dynamic.')
Expand Down
20 changes: 15 additions & 5 deletions spec/outputs/kusto/ingestor_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
let(:app_id) { "myid" }
let(:app_key) { LogStash::Util::Password.new("mykey") }
let(:app_tenant) { "mytenant" }
let(:managed_identity) { "managed_identity" }
let(:database) { "mydatabase" }
let(:table) { "mytable" }
let(:proxy_host) { "localhost" }
Expand All @@ -24,7 +25,7 @@
# note that this will cause an internal error since connection is being tried.
# however we still want to test that all the java stuff is working as expected
expect {
ingestor = described_class.new(ingest_url, app_id, app_key, app_tenant, database, table, json_mapping, delete_local, proxy_host, proxy_port,proxy_protocol, logger)
ingestor = described_class.new(ingest_url, app_id, app_key, app_tenant, managed_identity, database, table, json_mapping, delete_local, proxy_host, proxy_port,proxy_protocol, logger)
ingestor.stop
}.not_to raise_error
end
Expand All @@ -35,7 +36,7 @@
dynamic_name_array.each do |test_database|
it "with database: #{test_database}" do
expect {
ingestor = described_class.new(ingest_url, app_id, app_key, app_tenant, test_database, table, json_mapping, delete_local, proxy_host, proxy_port,proxy_protocol,logger)
ingestor = described_class.new(ingest_url, app_id, app_key, app_tenant, managed_identity, test_database, table, json_mapping, delete_local, proxy_host, proxy_port,proxy_protocol,logger)
ingestor.stop
}.to raise_error(LogStash::ConfigurationError)
end
Expand All @@ -46,7 +47,7 @@
dynamic_name_array.each do |test_table|
it "with database: #{test_table}" do
expect {
ingestor = described_class.new(ingest_url, app_id, app_key, app_tenant, database, test_table, json_mapping, delete_local, proxy_host, proxy_port,proxy_protocol,logger)
ingestor = described_class.new(ingest_url, app_id, app_key, app_tenant, managed_identity,database, test_table, json_mapping, delete_local, proxy_host, proxy_port,proxy_protocol,logger)
ingestor.stop
}.to raise_error(LogStash::ConfigurationError)
end
Expand All @@ -57,7 +58,7 @@
dynamic_name_array.each do |json_mapping|
it "with database: #{json_mapping}" do
expect {
ingestor = described_class.new(ingest_url, app_id, app_key, app_tenant, database, table, json_mapping, delete_local, proxy_host, proxy_port,proxy_protocol,logger)
ingestor = described_class.new(ingest_url, app_id, app_key, app_tenant, managed_identity,database, table, json_mapping, delete_local, proxy_host, proxy_port,proxy_protocol,logger)
ingestor.stop
}.to raise_error(LogStash::ConfigurationError)
end
Expand All @@ -67,7 +68,16 @@
context 'proxy protocol has to be http or https' do
it "with proxy protocol: socks" do
expect {
ingestor = described_class.new(ingest_url, app_id, app_key, app_tenant, database, table, json_mapping, delete_local, proxy_host, proxy_port,'socks',logger)
ingestor = described_class.new(ingest_url, app_id, app_key, app_tenant, managed_identity,database, table, json_mapping, delete_local, proxy_host, proxy_port,'socks',logger)
ingestor.stop
}.to raise_error(LogStash::ConfigurationError)
end
end

context 'one of appid or managedid has to be provided' do
it "with empty managed identity and appid" do
expect {
ingestor = described_class.new(ingest_url, "", app_key, app_tenant, "",database, table, json_mapping, delete_local, proxy_host, proxy_port,'socks',logger)
ingestor.stop
}.to raise_error(LogStash::ConfigurationError)
end
Expand Down
2 changes: 1 addition & 1 deletion version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
2.0.0
2.0.1