Skip to content

Commit

Permalink
* Start work on refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
ag-ramachandran committed Dec 13, 2023
1 parent 4830770 commit 78d5c70
Show file tree
Hide file tree
Showing 7 changed files with 363 additions and 192 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,4 @@ gradle/wrapper/gradle-wrapper.properties
rspec.xml
e2e/output_file.txt
logs.txt
local-run.sh
6 changes: 5 additions & 1 deletion lib/logstash/outputs/kusto.rb
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,9 @@ class LogStash::Outputs::Kusto < LogStash::Outputs::Base
# Check Proxy URL can be over http or https. Dowe need it this way or ignore this & remove this
config :proxy_protocol, validate: :string, required: false , default: 'http'

# Use proxy for AAD only. If true, the plugin will use the proxy only for AAD authentication and will not use it for the actual data transfer.
config :proxy_aad_only, validate: :boolean, required: false , default: false

default :codec, 'json_lines'

def register
Expand Down Expand Up @@ -154,7 +157,8 @@ def register
max_queue: upload_queue_size,
fallback_policy: :caller_runs)

@ingestor = Ingestor.new(ingest_url, app_id, app_key, app_tenant, managed_identity, database, table, final_mapping, delete_temp_files, proxy_host, proxy_port,proxy_protocol, @logger, executor)
@ingestor = Ingestor.new(ingest_url, app_id, app_key, app_tenant, managed_identity, database, table,
final_mapping, delete_temp_files, proxy_host, proxy_port, proxy_protocol, proxy_aad_only, @logger, executor)

# send existing files
recover_past_files if recovery
Expand Down
93 changes: 33 additions & 60 deletions lib/logstash/outputs/kusto/ingestor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,30 +20,41 @@ class Ingestor
LOW_QUEUE_LENGTH = 3
FIELD_REF = /%\{[^}]+\}/

def initialize(ingest_url, app_id, app_key, app_tenant, managed_identity_id, database, table, json_mapping, delete_local, proxy_host , proxy_port , proxy_protocol,logger, threadpool = DEFAULT_THREADPOOL)
def initialize(ingest_url, app_id, app_key, app_tenant, managed_identity_id, database, table, json_mapping, delete_local, proxy_host , proxy_port , proxy_protocol,proxy_aad_only, logger, threadpool = DEFAULT_THREADPOOL)
@workers_pool = threadpool
@logger = logger
validate_config(database, table, json_mapping,proxy_protocol,app_id, app_key, managed_identity_id)
@kustoLogstashConfiguration = LogStash::Outputs::KustoInternal::KustoLogstashConfiguration.new(ingest_url, app_id, app_key, app_tenant, managed_identity_id, database, table, json_mapping, delete_local, proxy_host , proxy_port , proxy_protocol,proxy_aad_only, logger)
@kustoLogstashConfiguration.validate_config()
@logger.info('Preparing Kusto resources.')
@ingestion_properties = get_ingestion_properties()
if @kustoLogstashConfiguration.proxy_aad_only
@kustoAadTokenProvider = LogStash::Outputs::KustoInternal::KustoAadTokenProvider.new(@kustoLogstashConfiguration)
end
@delete_local = delete_local
@logger.debug('Kusto resources are ready.')
end

def get_kusto_client()
if @kusto_client.nil?
kusto_client = create_kusto_client()
end
return kusto_client
end

def create_kusto_client()
kusto_java = Java::com.microsoft.azure.kusto
apache_http = Java::org.apache.http
# kusto_connection_string = kusto_java.data.auth.ConnectionStringBuilder.createWithAadApplicationCredentials(ingest_url, app_id, app_key.value, app_tenant)
# If there is managed identity, use it. This means the AppId and AppKey are empty/nil
is_managed_identity = (app_id.nil? && app_key.nil?)
# If it is system managed identity, propagate the system identity
is_system_assigned_managed_identity = is_managed_identity && 0 == "system".casecmp(managed_identity_id)
# Is it direct connection
is_direct_conn = (proxy_host.nil? || proxy_host.empty?)
# Create a connection string
kusto_connection_string = if is_managed_identity
if is_system_assigned_managed_identity
kusto_connection_string = if @kustoLogstashConfiguration.is_managed_identity
if @kustoLogstashConfiguration.is_system_assigned_managed_identity
@logger.info('Using system managed identity.')
kusto_java.data.auth.ConnectionStringBuilder.createWithAadManagedIdentity(ingest_url)
else
@logger.info('Using user managed identity.')
kusto_java.data.auth.ConnectionStringBuilder.createWithAadManagedIdentity(ingest_url, managed_identity_id)
end
elsif @kustoLogstashConfiguration.proxy_aad_only
kusto_java.data.auth.ConnectionStringBuilder.createWithAccessToken(ingest_url, )
else
kusto_java.data.auth.ConnectionStringBuilder.createWithAadApplicationCredentials(ingest_url, app_id, app_key.value, app_tenant)
end
Expand All @@ -67,45 +78,22 @@ def initialize(ingest_url, app_id, app_key, app_tenant, managed_identity_id, dat
end
end

@ingestion_properties = kusto_java.ingest.IngestionProperties.new(database, table)
is_mapping_ref_provided = !(json_mapping.nil? || json_mapping.empty?)
if is_mapping_ref_provided
@logger.debug('Using mapping reference.', json_mapping)
@ingestion_properties.setIngestionMapping(json_mapping, kusto_java.ingest.IngestionMapping::IngestionMappingKind::JSON)
@ingestion_properties.setDataFormat(kusto_java.ingest.IngestionProperties::DataFormat::JSON)
else
@logger.debug('No mapping reference provided. Columns will be mapped by names in the logstash output')
end
@delete_local = delete_local
@logger.debug('Kusto resources are ready.')
end

def validate_config(database, table, json_mapping, proxy_protocol, app_id, app_key, managed_identity_id)
# Add an additional validation and fail this upfront
if app_id.nil? && app_key.nil? && managed_identity_id.nil?
@logger.error('managed_identity_id is not provided and app_id/app_key is empty.')
raise LogStash::ConfigurationError.new('managed_identity_id is not provided and app_id/app_key is empty.')
end
if database =~ FIELD_REF
@logger.error('database config value should not be dynamic.', database)
raise LogStash::ConfigurationError.new('database config value should not be dynamic.')
end

if table =~ FIELD_REF
@logger.error('table config value should not be dynamic.', table)
raise LogStash::ConfigurationError.new('table config value should not be dynamic.')
end

if json_mapping =~ FIELD_REF
@logger.error('json_mapping config value should not be dynamic.', json_mapping)
raise LogStash::ConfigurationError.new('json_mapping config value should not be dynamic.')
end

if not(["https", "http"].include? proxy_protocol)
@logger.error('proxy_protocol has to be http or https.', proxy_protocol)
raise LogStash::ConfigurationError.new('proxy_protocol has to be http or https.')
end
end

def get_ingestion_properties()
ingestion_properties = kusto_java.ingest.IngestionProperties.new(@kustoLogstashConfiguration.database, @kustoLogstashConfiguration.table)
if @kustoLogstashConfiguration.is_mapping_ref_provided
@logger.debug('Using mapping reference.', json_mapping)
ingestion_properties.setIngestionMapping(json_mapping, kusto_java.ingest.IngestionMapping::IngestionMappingKind::JSON)
ingestion_properties.setDataFormat(kusto_java.ingest.IngestionProperties::DataFormat::JSON)
else
@logger.debug('No mapping reference provided. Columns will be mapped by names in the logstash output')
end
return ingestion_properties
end

def upload_async(path, delete_on_success)
Expand All @@ -125,21 +113,6 @@ def upload_async(path, delete_on_success)
def upload(path, delete_on_success)
file_size = File.size(path)
@logger.debug("Sending file to kusto: #{path}. size: #{file_size}")

# TODO: dynamic routing
# file_metadata = path.partition('.kusto.').last
# file_metadata_parts = file_metadata.split('.')

# if file_metadata_parts.length == 3
# # this is the number we expect - database, table, json_mapping
# database = file_metadata_parts[0]
# table = file_metadata_parts[1]
# json_mapping = file_metadata_parts[2]

# local_ingestion_properties = Java::KustoIngestionProperties.new(database, table)
# local_ingestion_properties.addJsonMappingName(json_mapping)
# end

if file_size > 0
file_source_info = Java::com.microsoft.azure.kusto.ingest.source.FileSourceInfo.new(path, 0); # 0 - let the sdk figure out the size of the file
@kusto_client.ingestFromFile(file_source_info, @ingestion_properties)
Expand Down
97 changes: 97 additions & 0 deletions lib/logstash/outputs/kusto/kustoAadProvider.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
# encoding: utf - 8
require "logstash/kusto/kustoLogstashConfiguration"
require 'rest-client'
require 'json'
require 'openssl'
require 'base64'
require 'time'

module LogStash
module Outputs
module KustoInternal
class KustoAadTokenProvider
def initialize(kustoLogstashConfiguration)
@kustoLogstashConfiguration = kustoLogstashConfiguration
# Perform the auth initialization
scope = CGI.escape(sprintf("%s/.default",kustoLogstashConfiguration.ingest_url))
@logger = logger
@aad_uri = "https://login.microsoftonline.com"
@token_request_body = sprintf("client_id=%s&scope=%s&client_secret=%s&grant_type=client_credentials", kustoLogstashConfiguration.app_id, scope, kustoLogstashConfiguration.app_key)
@token_request_uri = sprintf("%s/%s/oauth2/v2.0/token", aad_uri, kustoLogstashConfiguration.app_tenant)
@token_state = {
: access_token => nil,
: expiry_time => nil,
: token_details_mutex => Mutex.new,
}
end # def initialize

# Public methods
public

def get_aad_token_bearer()
@token_state[: token_details_mutex].synchronize
do
if is_saved_token_need_refresh()
refresh_saved_token()
end
return @token_state[: access_token]
end
end # def get_aad_token_bearer

# Private methods
private

def is_saved_token_need_refresh()
return @token_state[: access_token].nil ? || @token_state[: expiry_time].nil ? || @token_state[: expiry_time] <= Time.now
end # def is_saved_token_need_refresh

def refresh_saved_token()
@logger.info("aad token expired - refreshing token.")
token_response = post_token_request()
@token_state[: access_token] = token_response["access_token"]
@token_state[: expiry_time] = get_token_expiry_time(token_response["expires_in"])
end # def refresh_saved_token

def get_token_expiry_time(expires_in_seconds)
if (expires_in_seconds.nil ? || expires_in_seconds <= 0)
return Time.now + (60 * 60 * 24) # Refresh anyway in 24 hours
else
return Time.now + expires_in_seconds - 1;
# Decrease by 1 second to be on the safe side
end
end # def get_token_expiry_time

# Post the given json to Azure Loganalytics
def post_token_request()
# Create REST request header
headers = get_header()
while true
begin
proxy_aad = sprintf("%s://%s:%s", @kustoLogstashConfiguration.proxy_protocol, @kustoLogstashConfiguration.proxy_host, @kustoLogstashConfiguration.proxy_port)
# Post REST request
response = RestClient::Request.execute(method:: post, url: @token_request_uri, payload: @token_request_body, headers: headers,
proxy: proxy_aad)

if (response.code == 200 || response.code == 201)
return JSON.parse(response.body)
end
rescue RestClient::ExceptionWithResponse => ewr
@logger.error("Exception while authenticating with AAD API ['#{ewr.response}']")
rescue Exception => ex
@logger.trace("Exception while authenticating with AAD API ['#{ex}']")
end
@logger.error("Error while authenticating with AAD ('#{@aad_uri}'), retrying in 10 seconds.")
sleep 10
end
end # def post_token_request
# Create a header
def get_header()
return {
'Content-Type' => 'application/x-www-form-urlencoded',
}
end # def get_header
end # class KustoAadTokenProvider
end # module Kusto
end # module Outputs
end # module LogStash

132 changes: 132 additions & 0 deletions lib/logstash/outputs/kusto/kustoLogstashConfiguration.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
# encoding: utf-8
# A class just having all the configurations wrapped into a seperate object

module LogStash
module Outputs
module KustoInternal
class KustoLogstashConfiguration
FIELD_REF = /%\{[^}]+\}/
def initialize(ingest_url, app_id, app_key, app_tenant, managed_identity_id, database, table, json_mapping, delete_local, proxy_host , proxy_port , proxy_protocol, proxy_aad_only, logger)
@logger = logger
# For ingestion
@ingest_url = ingest_url
@database = database
@table = table
@json_mapping = json_mapping
@is_mapping_ref_provided = !(json_mapping.nil? || json_mapping.empty?)
# Authentication configuration
@app_id = app_id
@app_key = app_key
@app_tenant = app_tenant
@managed_identity_id = managed_identity_id
# proxy configuration
@proxy_host = proxy_host
@proxy_port = proxy_port
@proxy_protocol = proxy_protocol
@proxy_aad_only = proxy_aad_only
# Fields that are derived from the configuration
@is_managed_identity = app_id.to_s.empty? && app_key.to_s.empty?
# If it is system managed identity, propagate the system identity
@is_system_assigned_managed_identity = is_managed_identity && 0 == "system".casecmp(managed_identity_id)
# Is it direct connection
@is_direct_conn = (proxy_host.nil? || proxy_host.empty?)
@logger.info("Kusto configuration initialized.")
end # def initialize

def validate_config()
# Add an additional validation and fail this upfront
if @app_id.to_s.empty? && @app_key.to_s.empty? && @managed_identity_id.to_s.empty?
@logger.error('managed_identity_id is not provided and app_id/app_key is empty.')
raise LogStash::ConfigurationError.new('managed_identity_id is not provided and app_id/app_key is empty.')
end
# If proxy AAD is required and the proxy configuration is not provided - fail
if @proxy_aad_only && @is_direct_conn
@logger.error('proxy_aad_only can be used only when proxy is configured.', proxy_aad_only)
raise LogStash::ConfigurationError.new('proxy_aad_only can be used only when proxy is configured.')
end

if @database =~ FIELD_REF
@logger.error('database config value should not be dynamic.', database)
raise LogStash::ConfigurationError.new('database config value should not be dynamic.')
end

if @table =~ FIELD_REF
@logger.error('table config value should not be dynamic.', table)
raise LogStash::ConfigurationError.new('table config value should not be dynamic.')
end

if @json_mapping =~ FIELD_REF
@logger.error('json_mapping config value should not be dynamic.', json_mapping)
raise LogStash::ConfigurationError.new('json_mapping config value should not be dynamic.')
end

if not(["https", "http"].include? @proxy_protocol)
@logger.error('proxy_protocol has to be http or https.', proxy_protocol)
raise LogStash::ConfigurationError.new('proxy_protocol has to be http or https.')
end
# If all validation pass then configuration is valid
return true
end #validate_config()

# Getters for all the attributes defined in this class
def ingest_url
@ingest_url
end
def database
@database
end
def table
@table
end
def json_mapping
@json_mapping
end
def is_mapping_ref_provided
@is_mapping_ref_provided
end
# Authentication configuration
def app_id
@app_id
end
def app_key
@app_key
end
def app_tenant
@app_tenant
end
def managed_identity_id
@managed_identity_id
end

def is_managed_identity
@is_managed_identity
end

def is_system_assigned_managed_identity
@is_system_assigned_managed_identity
end

# proxy configuration
def proxy_host
@proxy_host
end

def proxy_port
@proxy_port
end

def proxy_protocol
@proxy_protocol
end

def proxy_aad_only
@proxy_aad_only
end

def is_direct_conn
@is_direct_conn
end
end # class KustoLogstashConfiguration
end # module Kusto
end # module Outputs
end # module LogStash
Loading

0 comments on commit 78d5c70

Please sign in to comment.