diff --git a/app/grammars/raw_message.tt b/app/grammars/raw_message.tt index a457dd68..3c904d90 100644 --- a/app/grammars/raw_message.tt +++ b/app/grammars/raw_message.tt @@ -1,6 +1,6 @@ grammar RawMessage rule message - '{' message_body '}' { + '{' whitespace message_body whitespace '}' { def to_hash { data: [ message_body.to_hash ] } end @@ -24,7 +24,7 @@ grammar RawMessage end rule pair - pair:(timestamp_pair / value_pair) ","? { + pair:(timestamp_pair / value_pair) whitespace ","? whitespace { def to_value pair.to_value end @@ -32,7 +32,7 @@ grammar RawMessage end rule timestamp_pair - 't:' timestamp { + 't' whitespace ':' timestamp { def to_value { key: :t, value: timestamp.text_value } end @@ -40,7 +40,7 @@ grammar RawMessage end rule value_pair - number ':' value { + number whitespace ':' whitespace value { def to_value { key: number.text_value, value: value.text_value } end @@ -79,4 +79,8 @@ grammar RawMessage } end + rule whitespace + ' '* + end + end diff --git a/app/lib/mqtt_messages_handler.rb b/app/lib/mqtt_messages_handler.rb index e2989c89..4396eb05 100644 --- a/app/lib/mqtt_messages_handler.rb +++ b/app/lib/mqtt_messages_handler.rb @@ -1,106 +1,58 @@ class MqttMessagesHandler def handle_topic(topic, message, retry_on_nil_device=true) - Sentry.set_tags('mqtt-topic': topic) - - crumb = Sentry::Breadcrumb.new( - category: "MqttMessagesHandler.handle_topic", - message: "Handling topic #{topic}", - data: { topic: topic, message: message.encode("UTF-8", invalid: :replace, undef: :replace) } - ) - Sentry.add_breadcrumb(crumb) - return if topic.nil? - + message = message.encode("US-ASCII", invalid: :replace, undef: :replace, replace: "") + log_message_to_sentry(topic, message) handshake_device(topic) - # The following do NOT need a device if topic.to_s.include?('inventory') - DeviceInventory.create({ report: (message rescue nil) }) - return true - end - - device = Device.find_by(device_token: device_token(topic)) - if device.nil? - handle_nil_device(topic, message, retry_on_nil_device) - return nil - end - - if topic.to_s.include?('raw') - handle_readings(device, parse_raw_readings(message, device.id)) + handle_inventory(topic, message) + elsif topic.to_s.include?('raw') + handle_readings(topic, parse_raw_readings(message), retry_on_nil_device) elsif topic.to_s.include?('readings') - handle_readings(device, message) + handle_readings(topic, message, retry_on_nil_device) elsif topic.to_s.include?('info') - json_message = JSON.parse(message) - crumb = Sentry::Breadcrumb.new( - category: "MqttMessagesHandler.handle_topic", - message: "Parsing info message", - data: { - topic: topic, - message: message.encode("UTF-8", invalid: :replace, undef: :replace), - json: json_message, - device_id: device.id - } - ) - Sentry.add_breadcrumb(crumb) - device.update_column(:hardware_info, json_message) + handle_info(topic, message, retry_on_nil_device) + else + true end - return true end - def handle_nil_device(topic, message, retry_on_nil_device) - orphan_device = OrphanDevice.find_by_device_token(device_token(topic)) - if topic.to_s.include?("info") && !topic.to_s.include?("bridge") && orphan_device - retry_later(topic, message) if retry_on_nil_device - end - end + private - def retry_later(topic, message) - RetryMQTTMessageJob.perform_later(topic, message) + def handle_inventory(topic, message) + DeviceInventory.create({ report: (message rescue nil) }) + return true end - # takes a packet and stores data - def handle_readings(device, message) - data = self.data(message) - return if data.nil? or data&.empty? + def handle_readings(topic, message, retry_on_nil_device) + device = find_device_for_topic(topic, message, retry_on_nil_device) + return nil if device.nil? + + data = JSON.parse(message)["data"] if message + return nil if data.nil? or data&.empty? data.each do |reading| storer.store(device, reading) end + + return true rescue Exception => e Sentry.capture_exception(e) raise e if Rails.env.test? - #puts e.inspect - #puts message end - # takes a raw packet and converts into JSON - def parse_raw_readings(message, device_id=nil) - crumb = Sentry::Breadcrumb.new( - category: "MqttMessagesHandler.parse_raw_readings", - message: "Parsing raw readings", - data: { message: message.encode("UTF-8", invalid: :replace, undef: :replace), device_id: device_id } - ) - Sentry.add_breadcrumb(crumb) - clean_tm = message[1..-2].split(",")[0].gsub("t:", "").strip - raw_readings = message[1..-2].split(",")[1..] - - reading = { 'data' => ['recorded_at' => clean_tm, 'sensors' => []] } - - raw_readings.each do |raw_read| - raw_id = raw_read.split(":")[0].strip - raw_value = raw_read.split(":")[1]&.strip - reading['data'].first['sensors'] << { 'id' => raw_id, 'value' => raw_value } - end - - crumb = Sentry::Breadcrumb.new( - category: "MqttMessagesHandler.parse_raw_readings", - message: "Readings data constructed", - data: { message: message.encode("UTF-8", invalid: :replace, undef: :replace), reading: reading, device_id: device_id } - ) - Sentry.add_breadcrumb(crumb) + def handle_info(topic, message, retry_on_nil_device) + device = find_device_for_topic(topic, message, retry_on_nil_device) + return nil if device.nil? + json_message = JSON.parse(message) + device.update_column(:hardware_info, json_message) + return true + end - JSON[reading] + def parse_raw_readings(message) + JSON[raw_readings_parser.parse(message)] end def handshake_device(topic) @@ -112,29 +64,38 @@ def handshake_device(topic) }.to_json) end - # takes a packet and returns 'device token' from topic - def device_token(topic) - topic[/device\/sck\/(.*?)\//m, 1].to_s + def log_message_to_sentry(topic, message) + Sentry.set_tags('mqtt-topic': topic) + crumb = Sentry::Breadcrumb.new( + category: "MqttMessagesHandler.handle_topic", + message: "Handling topic #{topic}", + data: { topic: topic, message: message } + ) + Sentry.add_breadcrumb(crumb) end - # takes a packet and returns 'data' from payload - def data(message) - # TODO: what if message is empty? - if message - begin - JSON.parse(message)['data'] - rescue JSON::ParserError - # Handle error - end - else - raise "No data(message)" - end + def find_device_for_topic(topic, message, retry_on_nil_device) + device = Device.find_by(device_token: device_token(topic)) + handle_nil_device(topic, message, retry_on_nil_device) if device.nil? + return device end + def handle_nil_device(topic, message, retry_on_nil_device) + orphan_device = OrphanDevice.find_by_device_token(device_token(topic)) + if topic.to_s.include?("info") && !topic.to_s.include?("bridge") && orphan_device + RetryMQTTMessageJob.perform_later(topic, message) if retry_on_nil_device + end + end - private + def device_token(topic) + device_token = topic[/device\/sck\/(.*?)\//m, 1].to_s + end def storer @storer ||= Storer.new end + + def raw_readings_parser + @raw_readings_parser ||= RawMqttMessageParser.new + end end diff --git a/app/lib/raw_mqtt_message_parser.rb b/app/lib/raw_mqtt_message_parser.rb index 031154eb..ba2c8f48 100644 --- a/app/lib/raw_mqtt_message_parser.rb +++ b/app/lib/raw_mqtt_message_parser.rb @@ -1,12 +1,12 @@ require_relative "../grammars/raw_message" -class RawMQTTMessageParser +class RawMqttMessageParser def initialize @parser = RawMessageParser.new end def parse(message) - parser.parse(self.convert_to_ascii(message))&.to_hash + parser.parse(self.convert_to_ascii(message.strip))&.to_hash end private diff --git a/spec/lib/mqtt_messages_handler_spec.rb b/spec/lib/mqtt_messages_handler_spec.rb index c015d2a6..66926a49 100644 --- a/spec/lib/mqtt_messages_handler_spec.rb +++ b/spec/lib/mqtt_messages_handler_spec.rb @@ -52,18 +52,6 @@ ) end - describe '#device_token' do - it 'returns device_token from topic' do - expect(message_handler.device_token(@packet.topic)).to eq(device.device_token) - end - end - - describe '#data' do - it 'returns parsed data from payload' do - expect(message_handler.data(@packet.payload)).to match_array(@data) - end - end - describe '#readings' do before do # storer data processing diff --git a/spec/lib/raw_mqtt_message_parser_spec.rb b/spec/lib/raw_mqtt_message_parser_spec.rb new file mode 100644 index 00000000..97817fbe --- /dev/null +++ b/spec/lib/raw_mqtt_message_parser_spec.rb @@ -0,0 +1,85 @@ +require 'rails_helper' + +RSpec.describe RawMqttMessageParser do + subject(:parser) { + RawMqttMessageParser.new + } + + it "parses empty messages" do + message = "{}" + parsed = parser.parse(message) + expect(parsed).to eq({ data: [ { recorded_at: nil, sensors: [] }]}) + end + + it "parses messages with a timestamp" do + message = "{t:2024-09-25T13:19:38Z}" + parsed = parser.parse(message) + expect(parsed).to eq({ data: [ { recorded_at: "2024-09-25T13:19:38Z", sensors: [] }]}) + end + + it "parses messages with a timestamp and one postitive integer value" do + message = "{t:2024-09-25T13:19:38Z,1:2}" + parsed = parser.parse(message) + expect(parsed).to eq({ data: [ { recorded_at: "2024-09-25T13:19:38Z", sensors: [{id: "1", value: "2"}] }]}) + end + + it "parses messages with a timestamp and one negative integer value" do + message = "{t:2024-09-25T13:19:38Z,2:-3}" + parsed = parser.parse(message) + expect(parsed).to eq({ data: [ { recorded_at: "2024-09-25T13:19:38Z", sensors: [{id: "2", value: "-3"}] }]}) + end + + it "parses messages with a timestamp and one positive float value" do + message = "{t:2024-09-25T13:19:38Z,10:3.12345}" + parsed = parser.parse(message) + expect(parsed).to eq({ data: [ { recorded_at: "2024-09-25T13:19:38Z", sensors: [{id: "10", value: "3.12345"}] }]}) + end + + it "parses messages with a timestamp and one negative float value" do + message = "{t:2024-09-25T13:19:38Z,100:-2000.12345}" + parsed = parser.parse(message) + expect(parsed).to eq({ data: [ { recorded_at: "2024-09-25T13:19:38Z", sensors: [{id: "100", value: "-2000.12345"}] }]}) + end + + it "parses messages with a timestamp and multiple values" do + message = "{t:2024-09-25T13:19:38Z,100:-2000.12345,21:12345.23450}" + parsed = parser.parse(message) + expect(parsed).to eq({ data: [ { recorded_at: "2024-09-25T13:19:38Z", sensors: [{id: "100", value: "-2000.12345"}, { id: "21", value: "12345.23450"}] }]}) + end + + it "strips non-ascii characters from messages" do + message = "{t:2024-09-25T13:19:38Z💣💣💣💣,100:-2000.12345,21:12345.23450}" + parsed = parser.parse(message) + expect(parsed).to eq({ data: [ { recorded_at: "2024-09-25T13:19:38Z", sensors: [{id: "100", value: "-2000.12345"}, { id: "21", value: "12345.23450"}] }]}) + end + + it "returns nil if no valid message parsed" do + message = "ceci n'est pas un message" + parsed = parser.parse(message) + expect(parsed).to eq(nil) + end + + it "parses timestamps at any position in the packet" do + message = "{100:-2000.12345,t:2024-09-25T13:19:38Z,21:12345.23450}" + parsed = parser.parse(message) + expect(parsed).to eq({ data: [ { recorded_at: "2024-09-25T13:19:38Z", sensors: [{id: "100", value: "-2000.12345"}, { id: "21", value: "12345.23450"}] }]}) + end + + it "parses messages with spaces between entries" do + message = "{t:2024-09-25T13:19:38Z, 100 : -2000.12345, 21 : 12345.23450}" + parsed = parser.parse(message) + expect(parsed).to eq({ data: [ { recorded_at: "2024-09-25T13:19:38Z", sensors: [{id: "100", value: "-2000.12345"}, { id: "21", value: "12345.23450"}] }]}) + end + + it "parses messages with leading spaces after the braces" do + message = "{ t:2024-09-25T13:19:38Z,100:-2000.12345,21:12345.23450 }" + parsed = parser.parse(message) + expect(parsed).to eq({ data: [ { recorded_at: "2024-09-25T13:19:38Z", sensors: [{id: "100", value: "-2000.12345"}, { id: "21", value: "12345.23450"}] }]}) + end + + it "parses messages padded with whitespace" do + message = " {t:2024-09-25T13:19:38Z,100:-2000.12345,21:12345.23450} " + parsed = parser.parse(message) + expect(parsed).to eq({ data: [ { recorded_at: "2024-09-25T13:19:38Z", sensors: [{id: "100", value: "-2000.12345"}, { id: "21", value: "12345.23450"}] }]}) + end +end