Skip to content

Commit

Permalink
sentry warning when internal mqtt queue passes a threshold
Browse files Browse the repository at this point in the history
  • Loading branch information
timcowlishaw committed Oct 14, 2024
1 parent e1ee360 commit c773070
Showing 1 changed file with 10 additions and 3 deletions.
13 changes: 10 additions & 3 deletions lib/tasks/mqtt_subscriber.rake
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,15 @@ namespace :mqtt do
mqtt_port = ENV.has_key?('MQTT_PORT') ? ENV['MQTT_PORT'] : 1883
mqtt_ssl = ENV.has_key?('MQTT_SSL') ? ENV['MQTT_SSL'] : false
mqtt_shared_subscription_group = ENV.fetch("MQTT_SHARED_SUBSCRIPTION_GROUP", nil)

mqtt_queue_length_warning_threshold = Env.fetch("MQTT_QUEUE_LENGTH_WARNING_THRESHOLD", 30)

mqtt_topics_string = ENV.fetch('MQTT_TOPICS', '')
mqtt_topics = mqtt_topics_string.include?(",") ? mqtt_topics_string.split(",") : [ mqtt_topics_string ]

if mqtt_shared_subscription_group && mqtt_clean_session
mqtt_client_id += "-#{ENV.fetch("HOSTNAME")}"
end

mqtt_log = Logger.new("log/mqtt-#{mqtt_client_id}.log", 5, 100.megabytes)
mqtt_log.info('MQTT TASK STARTING')
mqtt_log.info("clean_session: #{mqtt_clean_session}")
Expand Down Expand Up @@ -50,7 +51,7 @@ namespace :mqtt do
"#{prefix}/#{topic}device/inventory" => 2
]
})

threshold_passed = false
client.get do |topic, message|
Sentry.with_scope do
begin
Expand All @@ -59,6 +60,12 @@ namespace :mqtt do
end
mqtt_log.info "Processed MQTT message in #{time}"
mqtt_log.info "MQTT queue length: #{client.queue_length}"
if !threshold_passed && client.queue_length >= mqtt_queue_length_warning_threshold
threshold_passed = true
Sentry.capture_message("Warning: Internal MQTT queue length is #{client.queue_length} (>= #{mqtt_queue_length_warning_threshold} on client #{mqtt_client_id}).")
else
threshold_passed = false
end
rescue Exception => e
mqtt_log.info e
Sentry.capture_exception(e)
Expand Down

0 comments on commit c773070

Please sign in to comment.