From c773070dfff6491e4bff4f2b64b2971b3dba17c6 Mon Sep 17 00:00:00 2001 From: Tim Cowlishaw Date: Mon, 14 Oct 2024 06:08:50 +0200 Subject: [PATCH] sentry warning when internal mqtt queue passes a threshold --- lib/tasks/mqtt_subscriber.rake | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/lib/tasks/mqtt_subscriber.rake b/lib/tasks/mqtt_subscriber.rake index 83460187..68b4e271 100644 --- a/lib/tasks/mqtt_subscriber.rake +++ b/lib/tasks/mqtt_subscriber.rake @@ -10,14 +10,15 @@ namespace :mqtt do mqtt_port = ENV.has_key?('MQTT_PORT') ? ENV['MQTT_PORT'] : 1883 mqtt_ssl = ENV.has_key?('MQTT_SSL') ? ENV['MQTT_SSL'] : false mqtt_shared_subscription_group = ENV.fetch("MQTT_SHARED_SUBSCRIPTION_GROUP", nil) - + mqtt_queue_length_warning_threshold = Env.fetch("MQTT_QUEUE_LENGTH_WARNING_THRESHOLD", 30) + mqtt_topics_string = ENV.fetch('MQTT_TOPICS', '') mqtt_topics = mqtt_topics_string.include?(",") ? mqtt_topics_string.split(",") : [ mqtt_topics_string ] if mqtt_shared_subscription_group && mqtt_clean_session mqtt_client_id += "-#{ENV.fetch("HOSTNAME")}" end - + mqtt_log = Logger.new("log/mqtt-#{mqtt_client_id}.log", 5, 100.megabytes) mqtt_log.info('MQTT TASK STARTING') mqtt_log.info("clean_session: #{mqtt_clean_session}") @@ -50,7 +51,7 @@ namespace :mqtt do "#{prefix}/#{topic}device/inventory" => 2 ] }) - + threshold_passed = false client.get do |topic, message| Sentry.with_scope do begin @@ -59,6 +60,12 @@ namespace :mqtt do end mqtt_log.info "Processed MQTT message in #{time}" mqtt_log.info "MQTT queue length: #{client.queue_length}" + if !threshold_passed && client.queue_length >= mqtt_queue_length_warning_threshold + threshold_passed = true + Sentry.capture_message("Warning: Internal MQTT queue length is #{client.queue_length} (>= #{mqtt_queue_length_warning_threshold} on client #{mqtt_client_id}).") + else + threshold_passed = false + end rescue Exception => e mqtt_log.info e Sentry.capture_exception(e)