diff --git a/docker-e2e/Logstash-Docker b/docker-e2e/Logstash-Docker index bed079b..af61434 100644 --- a/docker-e2e/Logstash-Docker +++ b/docker-e2e/Logstash-Docker @@ -1,6 +1,7 @@ FROM docker.elastic.co/logstash/logstash-oss:8.10.0 -COPY logstash-output-kusto-2.0.0-java.gem /tmp/logstash-output-kusto-2.0.0-java.gem +COPY logstash-output-kusto-2.0.2-java.gem /tmp/logstash-output-kusto-2.0.2-java.gem RUN rm -f /usr/share/logstash/pipeline/logstash.conf && \ - bin/logstash-plugin install /tmp/logstash-output-kusto-2.0.0-java.gem + bin/logstash-plugin install /tmp/logstash-output-kusto-2.0.2-java.gem COPY logstash-nsg-logs.conf /usr/share/logstash/pipeline/logstash.conf +COPY 2023-12-15-12-fw-d-hub01.log /tmp/fw-d-hub01.log COPY logstash.yml /usr/share/logstash/config/logstash.yml diff --git a/docker-e2e/docker-compose-all.yml b/docker-e2e/docker-compose-all.yml index f87259f..9b0641d 100644 --- a/docker-e2e/docker-compose-all.yml +++ b/docker-e2e/docker-compose-all.yml @@ -10,6 +10,7 @@ services: ports: - "9600:9600" - "5044:5044" + - "30001:30001" deploy: restart_policy: condition: on-failure diff --git a/docker-e2e/kusto-tables.kql b/docker-e2e/kusto-tables.kql index fe98354..882cef2 100644 --- a/docker-e2e/kusto-tables.kql +++ b/docker-e2e/kusto-tables.kql @@ -1,3 +1,54 @@ +.create-merge table PaloAltoRaw([timestamp]:datetime , OriginalRecord:string) + +.create table PaloAltoRaw ingestion json mapping 'fwmaps' '[{"column":"timestamp","path":"$.@timestamp"},{"column":"OriginalRecord","path":"$.message"}]' + +.create table PaloAltoProcessed (CEFDateTime:datetime, Event:string, MessageType:string, SourceIP:string, DestIP:string,Action:string) + +// the function that parses this +.create-or-alter function + with (docstring = 'Parses raw PaloAlto CEF into some specific columns', folder = 'UpdatePolicyFunctions') + ExtractPaloAltoLogs() + { + PaloAltoRaw + | extend logs = split(OriginalRecord, "|") + | extend cefdtversion=tostring(logs[0]) + | extend VersionTimePart = split(cefdtversion, " ") + | extend CEFDateTime = todatetime(VersionTimePart[0]) + | extend Source = VersionTimePart[1] + | extend CEFVersion = VersionTimePart[3] + | extend Vendor = replace_string(tostring(logs[1]), "#012"," ") + | extend App = logs[2] + | extend Version = logs[3] + | extend Event = tostring(logs[4]) + | extend MessageType = tostring(logs[5]) + | extend Type = logs[6] + | extend kv= tostring(logs[7]) + | extend Extension = replace_string(kv, "#012"," ") + | parse-kv Extension as (src:string, dst:string, proto:string, spt:long, dpt:long, act:string,msg:string,rt:string,PanDynamicUsrgrp:string) with(kv_delimiter="=", pair_delimiter=" ", quote='"',greedy=true) + | extend rtdttm=todatetime(rt) + | project CEFDateTime,Event,MessageType,SourceIP=src,DestIP=dst,Action=act +} + + +// Now that the function is created, mark this as an update policy + +.alter table PaloAltoProcessed policy update @'[{ "IsEnabled": true, "Source": "PaloAltoRaw", "Query": "ExtractPaloAltoLogs()", "IsTransactional": true, "PropagateIngestionProperties": false}]' + +flowLogs +| count + +PaloAltoRaw +| count + + +PaloAltoProcessed +| count + + +.show commands-and-queries | where LastUpdatedOn >= ago(5m) | where Text has ".ingest" | where Database == "e2e" | take 20 + +.show ingestion failures | where FailedOn >= ago(15m) | where Database == "e2e" | take 20 + .create table flowLogs ( Time:datetime, macAddress:string, diff --git a/docker-e2e/logstash-nsg-logs-paloalto.conf.template b/docker-e2e/logstash-nsg-logs-paloalto.conf.template index daafef0..b78861b 100644 --- a/docker-e2e/logstash-nsg-logs-paloalto.conf.template +++ b/docker-e2e/logstash-nsg-logs-paloalto.conf.template @@ -1,19 +1,26 @@ input { # Have both Paloalto and NSG logs coming in beats { + add_field => { "[@metadata][source_type]" => "nsg-beats" } port => 5044 } udp { + # This is the actual case in your environment where you have PaloAlto sending data on UDP. We dont have PA so we are using a file (provided earlier as sample) + add_field => { "[@metadata][source_type]" => "udp" } port => "30001" type => "paloalto" } + file { + # Took the file that you provided as the sample and sent that data into ADX + add_field => { "[@metadata][source_type]" => "file" } + path => "/tmp/fw-d-hub01.log" + start_position => "beginning" + } } - - filter { # NSG Data comes through Filebeat use this conditionally - if [agent][type] == 'filebeat' { + if [@metadata][source_type] == "nsg-beats" { json { source => "message" } @@ -94,26 +101,26 @@ filter { } output { - if [agent][type] == 'filebeat' { + if [@metadata][source_type] == "nsg-beats" { kusto { - path => "/tmp/kusto/nsg1/%{+YYYY-MM-dd-HH-mm}.txt" - ingest_url => "https://ingest-adx-d-nss.koreacentral.kusto.windows.net" + path => "/tmp/kusto/nsg/%{+YYYY-MM-dd-HH-mm}.txt" + ingest_url => "" app_id => "" app_key => "" app_tenant => "" - database => "nss-db" + database => "" table => "flowLogs" # fw as defined above json_mapping => "flowLogsMapping" # fw as defined above } } else { kusto { - path => "/tmp/kusto/paloaltofw1/%{+YYYY-MM-dd-HH-mm}.txt" - ingest_url => "https://ingest-adx-d-nss.koreacentral.kusto.windows.net" + path => "/tmp/kusto/paloalto/%{+YYYY-MM-dd-HH-mm}.txt" + ingest_url => "" app_id => "" app_key => "" app_tenant => "" - database => "nss-db" - table => "MySourceTable" # fw as defined above + database => "" + table => "PaloAltoRaw" # fw as defined above json_mapping => "fwmaps" # fw as defined above } }