From 690e8e17664a899d3d4ecff131b2b3a864942e6a Mon Sep 17 00:00:00 2001 From: cfln123 Date: Tue, 9 Jul 2024 14:46:05 -0300 Subject: [PATCH 1/2] Add Kafka Cluster and Topic monitoring --- lib/cfnguardian/models/alarm.rb | 28 +++++++++ lib/cfnguardian/resources/kafka_cluster.rb | 68 ++++++++++++++++++++++ lib/cfnguardian/resources/kafka_topic.rb | 13 +++++ 3 files changed, 109 insertions(+) create mode 100644 lib/cfnguardian/resources/kafka_cluster.rb create mode 100644 lib/cfnguardian/resources/kafka_topic.rb diff --git a/lib/cfnguardian/models/alarm.rb b/lib/cfnguardian/models/alarm.rb index 0e22d4b..6bed0e0 100644 --- a/lib/cfnguardian/models/alarm.rb +++ b/lib/cfnguardian/models/alarm.rb @@ -394,6 +394,34 @@ def initialize(resource,environment) end end + class KafkaClusterAlarm < BaseAlarm + def initialize(resource) + super(resource) + @group = 'KafkaCluster' + @namespace = 'AWS/Kafka' + @dimensions = { ClusterName: resource['Id'] } + @statistic = 'Average' + @evaluation_periods = 1 + @datapoints_to_alarm = 1 + @period = 300 + @treat_missing_data = 'breaching' + end + end + + class KafkaTopicAlarm < BaseAlarm + def initialize(resource) + super(resource) + @group = 'KafkaTopic' + @namespace = 'AWS/Kafka' + @dimensions = { ClusterName: resource['ClusterName'], Topic: resource['Id'] } + @statistic = 'Average' + @evaluation_periods = 1 + @datapoints_to_alarm = 1 + @period = 300 + @treat_missing_data = 'breaching' + end + end + class LambdaAlarm < BaseAlarm def initialize(resource) super(resource) diff --git a/lib/cfnguardian/resources/kafka_cluster.rb b/lib/cfnguardian/resources/kafka_cluster.rb new file mode 100644 index 0000000..0d0df6e --- /dev/null +++ b/lib/cfnguardian/resources/kafka_cluster.rb @@ -0,0 +1,68 @@ +module CfnGuardian::Resource + class KafkaCluster < Base + + def default_alarms + alarm = CfnGuardian::Models::KafkaClusterAlarm.new(@resource) + alarm.name = 'CPUUserCritical' + alarm.metric_name = 'CPUUser' + alarm.threshold = 80 + @alarms.push(alarm) + + alarm = CfnGuardian::Models::KafkaClusterAlarm.new(@resource) + alarm.name = 'CPUUserWarning' + alarm.metric_name = 'CPUUser' + alarm.threshold = 50 + alarm.alarm_action = 'Warning' + @alarms.push(alarm) + + alarm = CfnGuardian::Models::KafkaClusterAlarm.new(@resource) + alarm.name = 'KafkaDataLogsDiskUsedCritical' + alarm.metric_name = 'KafkaDataLogsDiskUsed' + alarm.threshold = 85 + @alarms.push(alarm) + + alarm = CfnGuardian::Models::KafkaClusterAlarm.new(@resource) + alarm.name = 'KafkaDataLogsDiskUsedWarning' + alarm.metric_name = 'KafkaDataLogsDiskUsed' + alarm.threshold = 70 + alarm.alarm_action = 'Warning' + @alarms.push(alarm) + + alarm = CfnGuardian::Models::KafkaClusterAlarm.new(@resource) + alarm.name = 'BurstBalance' + alarm.metric_name = 'BurstBalance' + alarm.threshold = 1 + alarm.comparison_operator = 'LessThanThreshold' + @alarms.push(alarm) + + alarm = CfnGuardian::Models::KafkaClusterAlarm.new(@resource) + alarm.name = 'MemoryFreeCritical' + alarm.metric_name = 'MemoryFree' + alarm.threshold = 10 + alarm.comparison_operator = 'LessThanThreshold' + @alarms.push(alarm) + + alarm = CfnGuardian::Models::KafkaClusterAlarm.new(@resource) + alarm.name = 'MemoryFreeWarning' + alarm.metric_name = 'MemoryFree' + alarm.threshold = 50 + alarm.alarm_action = 'Warning' + alarm.comparison_operator = 'LessThanThreshold' + @alarms.push(alarm) + + alarm = CfnGuardian::Models::KafkaClusterAlarm.new(@resource) + alarm.name = 'NetworkRxErrorsCritical' + alarm.metric_name = 'NetworkRxErrors' + alarm.threshold = 10 + @alarms.push(alarm) + + alarm = CfnGuardian::Models::KafkaClusterAlarm.new(@resource) + alarm.name = 'NetworkRxErrorsWarning' + alarm.metric_name = 'NetworkRxErrors' + alarm.threshold = 5 + alarm.alarm_action = 'Warning' + @alarms.push(alarm) + end + + end +end diff --git a/lib/cfnguardian/resources/kafka_topic.rb b/lib/cfnguardian/resources/kafka_topic.rb new file mode 100644 index 0000000..1911331 --- /dev/null +++ b/lib/cfnguardian/resources/kafka_topic.rb @@ -0,0 +1,13 @@ +module CfnGuardian::Resource + class KafkaTopic < Base + + def default_alarms + alarm = CfnGuardian::Models::KafkaTopicAlarm.new(@resource) + alarm.name = 'MessagesInPerSec' + alarm.metric_name = 'MessagesInPerSec' + alarm.threshold = 5 + alarm.comparison_operator = 'LessThanThreshold' + @alarms.push(alarm) + end + end +end From ae5d138488f37262207bf70c3c085764d499fba5 Mon Sep 17 00:00:00 2001 From: cfln123 Date: Tue, 9 Jul 2024 17:47:36 -0300 Subject: [PATCH 2/2] Refactor kafka dimensions --- lib/cfnguardian/compile.rb | 2 + lib/cfnguardian/models/alarm.rb | 8 +- lib/cfnguardian/resources/kafka_cluster.rb | 114 +++++++++++---------- lib/cfnguardian/resources/kafka_topic.rb | 21 ++-- 4 files changed, 80 insertions(+), 65 deletions(-) diff --git a/lib/cfnguardian/compile.rb b/lib/cfnguardian/compile.rb index 0c349c4..c6c3a02 100644 --- a/lib/cfnguardian/compile.rb +++ b/lib/cfnguardian/compile.rb @@ -30,6 +30,8 @@ require 'cfnguardian/resources/port' require 'cfnguardian/resources/internal_port' require 'cfnguardian/resources/nrpe' +require 'cfnguardian/resources/kafka_cluster' +require 'cfnguardian/resources/kafka_topic' require 'cfnguardian/resources/lambda' require 'cfnguardian/resources/network_targetgroup' require 'cfnguardian/resources/rds_cluster' diff --git a/lib/cfnguardian/models/alarm.rb b/lib/cfnguardian/models/alarm.rb index 6bed0e0..6c6a12f 100644 --- a/lib/cfnguardian/models/alarm.rb +++ b/lib/cfnguardian/models/alarm.rb @@ -395,11 +395,11 @@ def initialize(resource,environment) end class KafkaClusterAlarm < BaseAlarm - def initialize(resource) + def initialize(resource,broker) super(resource) @group = 'KafkaCluster' @namespace = 'AWS/Kafka' - @dimensions = { ClusterName: resource['Id'] } + @dimensions = { 'Cluster Name': resource['Id'], 'Broker ID': broker } @statistic = 'Average' @evaluation_periods = 1 @datapoints_to_alarm = 1 @@ -409,11 +409,11 @@ def initialize(resource) end class KafkaTopicAlarm < BaseAlarm - def initialize(resource) + def initialize(resource,broker) super(resource) @group = 'KafkaTopic' @namespace = 'AWS/Kafka' - @dimensions = { ClusterName: resource['ClusterName'], Topic: resource['Id'] } + @dimensions = { 'Cluster Name': resource['ClusterName'], 'Broker ID': broker, Topic: resource['Id'] } @statistic = 'Average' @evaluation_periods = 1 @datapoints_to_alarm = 1 diff --git a/lib/cfnguardian/resources/kafka_cluster.rb b/lib/cfnguardian/resources/kafka_cluster.rb index 0d0df6e..eb88a53 100644 --- a/lib/cfnguardian/resources/kafka_cluster.rb +++ b/lib/cfnguardian/resources/kafka_cluster.rb @@ -1,68 +1,74 @@ module CfnGuardian::Resource class KafkaCluster < Base - def default_alarms - alarm = CfnGuardian::Models::KafkaClusterAlarm.new(@resource) - alarm.name = 'CPUUserCritical' - alarm.metric_name = 'CPUUser' - alarm.threshold = 80 - @alarms.push(alarm) + def initialize(resource, override_group = nil) + super(resource, override_group) + @brokers_list = resource['Brokers'] + end + + def default_alarms + @brokers_list.each do |broker| + alarm = CfnGuardian::Models::KafkaClusterAlarm.new(@resource,broker) + alarm.name = "Broker#{broker}-CPUUserCritical" + alarm.metric_name = 'CpuUser' + alarm.threshold = 80 + @alarms.push(alarm) - alarm = CfnGuardian::Models::KafkaClusterAlarm.new(@resource) - alarm.name = 'CPUUserWarning' - alarm.metric_name = 'CPUUser' - alarm.threshold = 50 - alarm.alarm_action = 'Warning' - @alarms.push(alarm) + alarm = CfnGuardian::Models::KafkaClusterAlarm.new(@resource,broker) + alarm.name = "Broker#{broker}-CPUUserWarning" + alarm.metric_name = 'CpuUser' + alarm.threshold = 50 + alarm.alarm_action = 'Warning' + @alarms.push(alarm) - alarm = CfnGuardian::Models::KafkaClusterAlarm.new(@resource) - alarm.name = 'KafkaDataLogsDiskUsedCritical' - alarm.metric_name = 'KafkaDataLogsDiskUsed' - alarm.threshold = 85 - @alarms.push(alarm) + alarm = CfnGuardian::Models::KafkaClusterAlarm.new(@resource,broker) + alarm.name = "Broker#{broker}-KafkaDataLogsDiskUsedCritical" + alarm.metric_name = 'KafkaDataLogsDiskUsed' + alarm.threshold = 85 + @alarms.push(alarm) - alarm = CfnGuardian::Models::KafkaClusterAlarm.new(@resource) - alarm.name = 'KafkaDataLogsDiskUsedWarning' - alarm.metric_name = 'KafkaDataLogsDiskUsed' - alarm.threshold = 70 - alarm.alarm_action = 'Warning' - @alarms.push(alarm) + alarm = CfnGuardian::Models::KafkaClusterAlarm.new(@resource,broker) + alarm.name = "Broker#{broker}-KafkaDataLogsDiskUsedWarning" + alarm.metric_name = 'KafkaDataLogsDiskUsed' + alarm.threshold = 70 + alarm.alarm_action = 'Warning' + @alarms.push(alarm) - alarm = CfnGuardian::Models::KafkaClusterAlarm.new(@resource) - alarm.name = 'BurstBalance' - alarm.metric_name = 'BurstBalance' - alarm.threshold = 1 - alarm.comparison_operator = 'LessThanThreshold' - @alarms.push(alarm) + alarm = CfnGuardian::Models::KafkaClusterAlarm.new(@resource,broker) + alarm.name = "Broker#{broker}-BurstBalance" + alarm.metric_name = 'BurstBalance' + alarm.threshold = 1 + alarm.comparison_operator = 'LessThanThreshold' + @alarms.push(alarm) - alarm = CfnGuardian::Models::KafkaClusterAlarm.new(@resource) - alarm.name = 'MemoryFreeCritical' - alarm.metric_name = 'MemoryFree' - alarm.threshold = 10 - alarm.comparison_operator = 'LessThanThreshold' - @alarms.push(alarm) + alarm = CfnGuardian::Models::KafkaClusterAlarm.new(@resource,broker) + alarm.name = "Broker#{broker}-MemoryFreeCritical" + alarm.metric_name = 'MemoryFree' + alarm.threshold = 10 + alarm.comparison_operator = 'LessThanThreshold' + @alarms.push(alarm) - alarm = CfnGuardian::Models::KafkaClusterAlarm.new(@resource) - alarm.name = 'MemoryFreeWarning' - alarm.metric_name = 'MemoryFree' - alarm.threshold = 50 - alarm.alarm_action = 'Warning' - alarm.comparison_operator = 'LessThanThreshold' - @alarms.push(alarm) + alarm = CfnGuardian::Models::KafkaClusterAlarm.new(@resource,broker) + alarm.name = "Broker#{broker}-MemoryFreeWarning" + alarm.metric_name = 'MemoryFree' + alarm.threshold = 50 + alarm.alarm_action = 'Warning' + alarm.comparison_operator = 'LessThanThreshold' + @alarms.push(alarm) - alarm = CfnGuardian::Models::KafkaClusterAlarm.new(@resource) - alarm.name = 'NetworkRxErrorsCritical' - alarm.metric_name = 'NetworkRxErrors' - alarm.threshold = 10 - @alarms.push(alarm) + alarm = CfnGuardian::Models::KafkaClusterAlarm.new(@resource,broker) + alarm.name = "Broker#{broker}-NetworkRxErrorsCritical" + alarm.metric_name = 'NetworkRxErrors' + alarm.threshold = 10 + @alarms.push(alarm) - alarm = CfnGuardian::Models::KafkaClusterAlarm.new(@resource) - alarm.name = 'NetworkRxErrorsWarning' - alarm.metric_name = 'NetworkRxErrors' - alarm.threshold = 5 - alarm.alarm_action = 'Warning' - @alarms.push(alarm) + alarm = CfnGuardian::Models::KafkaClusterAlarm.new(@resource,broker) + alarm.name = "Broker#{broker}-NetworkRxErrorsWarning" + alarm.metric_name = 'NetworkRxErrors' + alarm.threshold = 5 + alarm.alarm_action = 'Warning' + @alarms.push(alarm) + end end - end end diff --git a/lib/cfnguardian/resources/kafka_topic.rb b/lib/cfnguardian/resources/kafka_topic.rb index 1911331..ca5e6cc 100644 --- a/lib/cfnguardian/resources/kafka_topic.rb +++ b/lib/cfnguardian/resources/kafka_topic.rb @@ -1,13 +1,20 @@ module CfnGuardian::Resource class KafkaTopic < Base - def default_alarms - alarm = CfnGuardian::Models::KafkaTopicAlarm.new(@resource) - alarm.name = 'MessagesInPerSec' - alarm.metric_name = 'MessagesInPerSec' - alarm.threshold = 5 - alarm.comparison_operator = 'LessThanThreshold' - @alarms.push(alarm) + def initialize(resource, override_group = nil) + super(resource, override_group) + @brokers_list = resource['Brokers'] + end + + def default_alarms + @brokers_list.each do |broker| + alarm = CfnGuardian::Models::KafkaTopicAlarm.new(@resource,broker) + alarm.name = "Broker#{broker}-MessagesInPerSec" + alarm.metric_name = 'MessagesInPerSec' + alarm.threshold = 5 + alarm.comparison_operator = 'LessThanThreshold' + @alarms.push(alarm) + end end end end