Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/upgrade sdk #73

Merged
merged 7 commits into from
Jun 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 23 additions & 5 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,29 @@ jobs:
build:
name: Build gem
runs-on: ubuntu-latest
environment: build
permissions:
checks: write
pull-requests: write
pull-requests: write
id-token: write
contents: read
strategy:
matrix:
logstash: [
{ version: '8.5.0', url: "https://artifacts.elastic.co/downloads/logstash/logstash-8.5.0-linux-x86_64.tar.gz" , main: 'true' }
{ version: '8.7.0', url: "https://artifacts.elastic.co/downloads/logstash/logstash-8.7.0-linux-x86_64.tar.gz" , main: 'true' }
]
env:
LOGSTASH_SOURCE: 1
LOGSTASH_PATH: /home/runner/logstash
JRUBY_HOME: /home/runner/logstash/vendor/jruby
JAVA_HOME: /home/runner/logstash/jdk
steps:
- name: Azure login
uses: azure/login@v2
with:
client-id: ${{ secrets.APP_ID }}
tenant-id: ${{ secrets.AUTH_ID }}
subscription-id: ${{ secrets.SUBSCRIPTION_ID }}
- name: Build logstash
run: |
echo "Getting logstash version ${{matrix.logstash.version}}"
Expand Down Expand Up @@ -68,8 +77,20 @@ jobs:
e2e:
name: End-To-End Testing
runs-on: ubuntu-latest
environment: build
permissions:
checks: write
pull-requests: write
id-token: write
contents: read
needs: build
steps:
- name: Azure login
uses: azure/login@v2
with:
client-id: ${{ secrets.APP_ID }}
tenant-id: ${{ secrets.AUTH_ID }}
subscription-id: ${{ secrets.SUBSCRIPTION_ID }}
- uses: ruby/setup-ruby@v1
with:
ruby-version: jruby
Expand Down Expand Up @@ -98,7 +119,4 @@ jobs:
env:
ENGINE_URL: ${{ secrets.ENGINE_URL }}
INGEST_URL: ${{ secrets.INGEST_URL }}
APP_ID: ${{ secrets.APP_ID }}
APP_KEY: ${{ secrets.APP_KEY }}
TENANT_ID: ${{ secrets.TENANT_ID }}
TEST_DATABASE: ${{ secrets.TEST_DATABASE }}
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,8 @@ gradle/wrapper/gradle-wrapper.properties
rspec.xml
e2e/output_file.txt
logs.txt
docker-e2e/.env
local-run.sh
logs2.txt
**/.vscode/*.*
**/settings.json
109 changes: 56 additions & 53 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -19,71 +19,74 @@ repositories {
// These dependencies are required by the gemspec to build the gem. The easiest to arrive at this list is to look at the effective pom of kusto-ingest and arrive at this list
// even if we use the ruby-maven gem to package the gem, install and lock_jars will create the logstash_output_kusto_jars.rb file with the same list of dependencies.
// In the gradle way, running ./gradlew vendor creates the jar file list and adds them to vendor/jar-dependencies folder from where it is referenced in the gemspec (require_paths and files)

// update dependencies to bom azure-sdk-bom/1.2.24

dependencies {
implementation 'com.azure:azure-core-http-netty:1.13.9'
implementation 'com.azure:azure-core:1.44.1'
implementation 'com.azure:azure-data-tables:12.3.16'
implementation 'com.azure:azure-identity:1.10.4'
implementation 'com.microsoft.azure.kusto:kusto-data:5.1.0'
implementation 'com.microsoft.azure.kusto:kusto-ingest:5.1.0'
implementation 'com.azure:azure-core-http-netty:1.15.0'
implementation 'com.azure:azure-core:1.49.0'
implementation 'com.azure:azure-data-tables:12.4.1'
implementation 'com.azure:azure-identity:1.12.1'
implementation 'com.azure:azure-json:1.1.0'
implementation 'com.azure:azure-storage-blob:12.24.1'
implementation 'com.azure:azure-storage-common:12.23.1'
implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-xml:2.15.2'
implementation 'com.azure:azure-storage-queue:12.19.1'
implementation 'com.fasterxml.jackson.core:jackson-annotations:2.15.2'
implementation 'com.fasterxml.jackson.core:jackson-core:2.15.2'
implementation 'com.fasterxml.jackson.core:jackson-databind:2.15.2'
implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.15.2'
implementation 'com.github.stephenc.jcip:jcip-annotations:1.0-1'
implementation 'com.microsoft.azure.kusto:kusto-data:5.0.5'
implementation 'com.microsoft.azure.kusto:kusto-ingest:5.0.5'
implementation 'com.microsoft.azure:msal4j-persistence-extension:1.2.0'
implementation 'com.microsoft.azure:msal4j:1.13.10'
implementation 'com.nimbusds:content-type:2.2'
implementation 'com.azure:azure-storage-blob:12.26.0'
implementation 'com.azure:azure-storage-common:12.25.0'
implementation 'com.azure:azure-storage-queue:12.21.0'
implementation 'com.azure:azure-xml:1.0.0'
implementation 'com.fasterxml.jackson.core:jackson-annotations:2.16.0'
implementation 'com.fasterxml.jackson.core:jackson-core:2.16.0'
implementation 'com.fasterxml.jackson.core:jackson-databind:2.16.0'
implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310:2.16.0'
implementation 'com.fasterxml.woodstox:woodstox-core:6.7.0'
implementation 'com.github.stephenc.jcip:jcip-annotations:1.0-1'
implementation 'com.microsoft.azure:msal4j:1.15.1'
implementation 'com.nimbusds:content-type:2.3'
implementation 'com.nimbusds:lang-tag:1.7'
implementation 'com.nimbusds:nimbus-jose-jwt:9.30.2'
implementation 'com.nimbusds:oauth2-oidc-sdk:10.7.1'
implementation 'com.nimbusds:nimbus-jose-jwt:9.40'
implementation 'com.nimbusds:oauth2-oidc-sdk:11.13'
implementation 'com.univocity:univocity-parsers:2.9.1'
implementation 'commons-codec:commons-codec:1.16.0'
implementation 'commons-logging:commons-logging:1.2'
implementation 'commons-codec:commons-codec:1.16.1'
implementation 'commons-logging:commons-logging:1.3.1'
implementation 'io.github.resilience4j:resilience4j-core:1.7.1'
implementation 'io.github.resilience4j:resilience4j-retry:1.7.1'
implementation 'io.netty:netty-buffer:4.1.104.Final'
implementation 'io.netty:netty-codec-dns:4.1.104.Final'
implementation 'io.netty:netty-codec-http2:4.1.104.Final'
implementation 'io.netty:netty-codec-http:4.1.104.Final'
implementation 'io.netty:netty-codec-socks:4.1.104.Final'
implementation 'io.netty:netty-codec:4.1.104.Final'
implementation 'io.netty:netty-common:4.1.104.Final'
implementation 'io.netty:netty-handler-proxy:4.1.104.Final'
implementation 'io.netty:netty-handler:4.1.104.Final'
implementation 'io.netty:netty-resolver-dns-classes-macos:4.1.104.Final'
implementation 'io.netty:netty-resolver-dns-native-macos:4.1.104.Final:osx-x86_64'
implementation 'io.netty:netty-resolver-dns:4.1.104.Final'
implementation 'io.netty:netty-resolver:4.1.104.Final'
implementation 'io.netty:netty-tcnative-boringssl-static:2.0.62.Final'
implementation 'io.netty:netty-tcnative-classes:2.0.62.Final'
implementation 'io.netty:netty-transport-classes-epoll:4.1.104.Final'
implementation 'io.netty:netty-transport-classes-kqueue:4.1.104.Final'
implementation 'io.netty:netty-transport-native-epoll:4.1.104.Final:linux-x86_64'
implementation 'io.netty:netty-transport-native-kqueue:4.1.104.Final:osx-x86_64'
implementation 'io.netty:netty-transport-native-unix-common:4.1.104.Final'
implementation 'io.netty:netty-transport:4.1.104.Final'
implementation 'io.projectreactor.netty:reactor-netty-core:1.0.39'
implementation 'io.projectreactor.netty:reactor-netty-http:1.0.39'
implementation 'io.projectreactor:reactor-core:3.4.34'
implementation 'io.vavr:vavr-match:0.10.2'
implementation 'io.vavr:vavr:0.10.2'
implementation 'io.netty:netty-buffer:4.1.108.Final'
implementation 'io.netty:netty-codec-dns:4.1.108.Final'
implementation 'io.netty:netty-codec-http2:4.1.108.Final'
implementation 'io.netty:netty-codec-http:4.1.108.Final'
implementation 'io.netty:netty-codec-socks:4.1.108.Final'
implementation 'io.netty:netty-codec:4.1.108.Final'
implementation 'io.netty:netty-common:4.1.108.Final'
implementation 'io.netty:netty-handler-proxy:4.1.108.Final'
implementation 'io.netty:netty-handler:4.1.108.Final'
implementation 'io.netty:netty-resolver-dns-classes-macos:4.1.108.Final'
implementation 'io.netty:netty-resolver-dns-native-macos:4.1.108.Final:osx-x86_64'
implementation 'io.netty:netty-resolver-dns:4.1.108.Final'
implementation 'io.netty:netty-resolver:4.1.108.Final'
implementation 'io.netty:netty-tcnative-boringssl-static:2.0.65.Final'
implementation 'io.netty:netty-tcnative-classes:2.0.65.Final'
implementation 'io.netty:netty-transport-classes-epoll:4.1.108.Final'
implementation 'io.netty:netty-transport-classes-kqueue:4.1.108.Final'
implementation 'io.netty:netty-transport-native-epoll:4.1.108.Final:linux-x86_64'
implementation 'io.netty:netty-transport-native-kqueue:4.1.108.Final:osx-x86_64'
implementation 'io.netty:netty-transport-native-unix-common:4.1.108.Final'
implementation 'io.netty:netty-transport:4.1.108.Final'
implementation 'io.projectreactor.netty:reactor-netty-core:1.0.43'
implementation 'io.projectreactor.netty:reactor-netty-http:1.0.43'
implementation 'io.projectreactor:reactor-core:3.4.36'
implementation 'io.vavr:vavr:0.10.4'
implementation 'io.vavr:vavr-match:0.10.4'
implementation 'net.java.dev.jna:jna-platform:5.13.0'
implementation 'net.java.dev.jna:jna:5.13.0'
implementation 'net.minidev:accessors-smart:2.4.9'
implementation 'net.minidev:json-smart:2.4.10'
implementation 'net.minidev:accessors-smart:2.5.1'
implementation 'net.minidev:json-smart:2.5.1'
implementation 'org.apache.commons:commons-lang3:3.14.0'
implementation 'org.apache.commons:commons-text:1.11.0'
implementation 'org.apache.httpcomponents:httpclient:4.5.14'
implementation 'org.apache.httpcomponents:httpcore:4.4.16'
implementation 'org.codehaus.woodstox:stax2-api:4.2.1'
implementation 'org.jetbrains:annotations:22.0.0'
implementation 'org.ow2.asm:asm:9.3'
implementation 'org.codehaus.woodstox:stax2-api:4.2.2'
implementation 'org.jetbrains:annotations:24.1.0'
implementation 'org.ow2.asm:asm:9.7'
implementation 'org.reactivestreams:reactive-streams:1.0.4'
implementation 'org.slf4j:slf4j-api:1.8.0-beta4'
implementation 'org.slf4j:slf4j-simple:1.8.0-beta4'
Expand Down
20 changes: 8 additions & 12 deletions e2e/e2e.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,11 @@ def initialize
@column_count = 19
@engine_url = ENV["ENGINE_URL"]
@ingest_url = ENV["INGEST_URL"]
@app_id = ENV["APP_ID"]
@app_key = ENV['APP_KEY']
@tenant_id = ENV['TENANT_ID']
@database = ENV['TEST_DATABASE']
@lslocalpath = ENV['LS_LOCAL_PATH']
if @lslocalpath.nil?
@lslocalpath = "/usr/share/logstash/bin/logstash"
end
@table_with_mapping = "RubyE2E#{Time.now.getutc.to_i}"
@table_without_mapping = "RubyE2ENoMapping#{Time.now.getutc.to_i}"
@mapping_name = "test_mapping"
Expand All @@ -36,19 +37,15 @@ def initialize
kusto {
path => "tmp%{+YYYY-MM-dd-HH-mm}.txt"
ingest_url => "#{@ingest_url}"
app_id => "#{@app_id}"
app_key => "#{@app_key}"
app_tenant => "#{@tenant_id}"
cli_auth => true
database => "#{@database}"
table => "#{@table_with_mapping}"
json_mapping => "#{@mapping_name}"
}
kusto {
path => "nomaptmp%{+YYYY-MM-dd-HH-mm}.txt"
cli_auth => true
ingest_url => "#{@ingest_url}"
app_id => "#{@app_id}"
app_key => "#{@app_key}"
app_tenant => "#{@tenant_id}"
database => "#{@database}"
table => "#{@table_without_mapping}"
}
Expand Down Expand Up @@ -82,7 +79,7 @@ def run_logstash
logstashpath = File.absolute_path("logstash.conf")
File.write(@output_file, "")
File.write(@input_file, "")
lscommand = "/usr/share/logstash/bin/logstash -f #{logstashpath}"
lscommand = "#{@lslocalpath} -f #{logstashpath}"
puts "Running logstash from config path #{logstashpath} and final command #{lscommand}"
spawn(lscommand)
sleep(60)
Expand Down Expand Up @@ -137,8 +134,7 @@ def assert_data
end

def start
@query_client = $kusto_java.data.ClientFactory.createClient($kusto_java.data.auth.ConnectionStringBuilder::createWithAadApplicationCredentials(@engine_url, @app_id,
@app_key, @tenant_id))
@query_client = $kusto_java.data.ClientFactory.createClient($kusto_java.data.auth.ConnectionStringBuilder::createWithAzureCli(@engine_url))
create_table_and_mapping
run_logstash
assert_data
Expand Down
5 changes: 3 additions & 2 deletions lib/logstash/outputs/kusto.rb
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ class LogStash::Outputs::Kusto < LogStash::Outputs::Base
config :app_tenant, validate: :string, default: nil
# managed identity id
config :managed_identity, validate: :string, default: nil

# CLI credentials for dev-test
config :cli_auth, validate: :boolean, default: false
# The following are the data settings that impact where events are written to
# Database name
config :database, validate: :string, required: true
Expand Down Expand Up @@ -154,7 +155,7 @@ def register
max_queue: upload_queue_size,
fallback_policy: :caller_runs)

@ingestor = Ingestor.new(ingest_url, app_id, app_key, app_tenant, managed_identity, database, table, final_mapping, delete_temp_files, proxy_host, proxy_port,proxy_protocol, @logger, executor)
@ingestor = Ingestor.new(ingest_url, app_id, app_key, app_tenant, managed_identity, cli_auth, database, table, final_mapping, delete_temp_files, proxy_host, proxy_port,proxy_protocol, @logger, executor)

# send existing files
recover_past_files if recovery
Expand Down
26 changes: 19 additions & 7 deletions lib/logstash/outputs/kusto/ingestor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,18 @@ class Ingestor
LOW_QUEUE_LENGTH = 3
FIELD_REF = /%\{[^}]+\}/

def initialize(ingest_url, app_id, app_key, app_tenant, managed_identity_id, database, table, json_mapping, delete_local, proxy_host , proxy_port , proxy_protocol,logger, threadpool = DEFAULT_THREADPOOL)
def initialize(ingest_url, app_id, app_key, app_tenant, managed_identity_id, cli_auth, database, table, json_mapping, delete_local, proxy_host , proxy_port , proxy_protocol,logger, threadpool = DEFAULT_THREADPOOL)
@workers_pool = threadpool
@logger = logger
validate_config(database, table, json_mapping,proxy_protocol,app_id, app_key, managed_identity_id)
validate_config(database, table, json_mapping,proxy_protocol,app_id, app_key, managed_identity_id,cli_auth)
@logger.info('Preparing Kusto resources.')

kusto_java = Java::com.microsoft.azure.kusto
apache_http = Java::org.apache.http
# kusto_connection_string = kusto_java.data.auth.ConnectionStringBuilder.createWithAadApplicationCredentials(ingest_url, app_id, app_key.value, app_tenant)
# If there is managed identity, use it. This means the AppId and AppKey are empty/nil
is_managed_identity = (app_id.nil? && app_key.nil?)
# If there is CLI Auth, use that instead of managed identity
is_managed_identity = (app_id.nil? && app_key.nil? && !cli_auth)
# If it is system managed identity, propagate the system identity
is_system_assigned_managed_identity = is_managed_identity && 0 == "system".casecmp(managed_identity_id)
# Is it direct connection
Expand All @@ -45,7 +46,13 @@ def initialize(ingest_url, app_id, app_key, app_tenant, managed_identity_id, dat
kusto_java.data.auth.ConnectionStringBuilder.createWithAadManagedIdentity(ingest_url, managed_identity_id)
end
else
kusto_java.data.auth.ConnectionStringBuilder.createWithAadApplicationCredentials(ingest_url, app_id, app_key.value, app_tenant)
if cli_auth
@logger.warn('*Use of CLI Auth is only for dev-test scenarios. This is ***NOT RECOMMENDED*** for production*')
kusto_java.data.auth.ConnectionStringBuilder.createWithAzureCli(ingest_url)
else
@logger.info('Using app id and app key.')
kusto_java.data.auth.ConnectionStringBuilder.createWithAadApplicationCredentials(ingest_url, app_id, app_key.value, app_tenant)
end
end
#
@logger.debug(Gem.loaded_specs.to_s)
Expand Down Expand Up @@ -75,16 +82,21 @@ def initialize(ingest_url, app_id, app_key, app_tenant, managed_identity_id, dat
@ingestion_properties.setDataFormat(kusto_java.ingest.IngestionProperties::DataFormat::JSON)
else
@logger.debug('No mapping reference provided. Columns will be mapped by names in the logstash output')
@ingestion_properties.setDataFormat(kusto_java.ingest.IngestionProperties::DataFormat::JSON)
end
@delete_local = delete_local
@logger.debug('Kusto resources are ready.')
end

def validate_config(database, table, json_mapping, proxy_protocol, app_id, app_key, managed_identity_id)
def validate_config(database, table, json_mapping, proxy_protocol, app_id, app_key, managed_identity_id,cli_auth)
# Add an additional validation and fail this upfront
if app_id.nil? && app_key.nil? && managed_identity_id.nil?
@logger.error('managed_identity_id is not provided and app_id/app_key is empty.')
raise LogStash::ConfigurationError.new('managed_identity_id is not provided and app_id/app_key is empty.')
if cli_auth
@logger.info('Using CLI Auth, this is only for dev-test scenarios. This is ***NOT RECOMMENDED*** for production')
else
@logger.error('managed_identity_id is not provided and app_id/app_key is empty.')
raise LogStash::ConfigurationError.new('managed_identity_id is not provided and app_id/app_key is empty.')
end
end
if database =~ FIELD_REF
@logger.error('database config value should not be dynamic.', database)
Expand Down
2 changes: 1 addition & 1 deletion logstash-output-kusto.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,4 @@ Gem::Specification.new do |s|
s.add_development_dependency 'rspec_junit_formatter'


end
end
Loading
Loading