Skip to content

Commit

Permalink
tests for raw message parser, refactor message handler to use it
Browse files Browse the repository at this point in the history
  • Loading branch information
timcowlishaw committed Oct 2, 2024
1 parent 2b441bd commit 39a52e3
Show file tree
Hide file tree
Showing 5 changed files with 150 additions and 112 deletions.
12 changes: 8 additions & 4 deletions app/grammars/raw_message.tt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
grammar RawMessage
rule message
'{' message_body '}' {
'{' whitespace message_body whitespace '}' {
def to_hash
{ data: [ message_body.to_hash ] }
end
Expand All @@ -24,23 +24,23 @@ grammar RawMessage
end

rule pair
pair:(timestamp_pair / value_pair) ","? {
pair:(timestamp_pair / value_pair) whitespace ","? whitespace {
def to_value
pair.to_value
end
}
end

rule timestamp_pair
't:' timestamp {
't' whitespace ':' timestamp {
def to_value
{ key: :t, value: timestamp.text_value }
end
}
end

rule value_pair
number ':' value {
number whitespace ':' whitespace value {
def to_value
{ key: number.text_value, value: value.text_value }
end
Expand Down Expand Up @@ -79,4 +79,8 @@ grammar RawMessage
}
end

rule whitespace
' '*
end

end
149 changes: 55 additions & 94 deletions app/lib/mqtt_messages_handler.rb
Original file line number Diff line number Diff line change
@@ -1,106 +1,58 @@
class MqttMessagesHandler

def handle_topic(topic, message, retry_on_nil_device=true)
Sentry.set_tags('mqtt-topic': topic)

crumb = Sentry::Breadcrumb.new(
category: "MqttMessagesHandler.handle_topic",
message: "Handling topic #{topic}",
data: { topic: topic, message: message.encode("UTF-8", invalid: :replace, undef: :replace) }
)
Sentry.add_breadcrumb(crumb)

return if topic.nil?

message = message.encode("US-ASCII", invalid: :replace, undef: :replace, replace: "")
log_message_to_sentry(topic, message)
handshake_device(topic)

# The following do NOT need a device
if topic.to_s.include?('inventory')
DeviceInventory.create({ report: (message rescue nil) })
return true
end

device = Device.find_by(device_token: device_token(topic))
if device.nil?
handle_nil_device(topic, message, retry_on_nil_device)
return nil
end

if topic.to_s.include?('raw')
handle_readings(device, parse_raw_readings(message, device.id))
handle_inventory(topic, message)
elsif topic.to_s.include?('raw')
handle_readings(topic, parse_raw_readings(message), retry_on_nil_device)
elsif topic.to_s.include?('readings')
handle_readings(device, message)
handle_readings(topic, message, retry_on_nil_device)
elsif topic.to_s.include?('info')
json_message = JSON.parse(message)
crumb = Sentry::Breadcrumb.new(
category: "MqttMessagesHandler.handle_topic",
message: "Parsing info message",
data: {
topic: topic,
message: message.encode("UTF-8", invalid: :replace, undef: :replace),
json: json_message,
device_id: device.id
}
)
Sentry.add_breadcrumb(crumb)
device.update_column(:hardware_info, json_message)
handle_info(topic, message, retry_on_nil_device)
else
true
end
return true
end

def handle_nil_device(topic, message, retry_on_nil_device)
orphan_device = OrphanDevice.find_by_device_token(device_token(topic))
if topic.to_s.include?("info") && !topic.to_s.include?("bridge") && orphan_device
retry_later(topic, message) if retry_on_nil_device
end
end
private

def retry_later(topic, message)
RetryMQTTMessageJob.perform_later(topic, message)
def handle_inventory(topic, message)
DeviceInventory.create({ report: (message rescue nil) })
return true
end

# takes a packet and stores data
def handle_readings(device, message)
data = self.data(message)
return if data.nil? or data&.empty?
def handle_readings(topic, message, retry_on_nil_device)
device = find_device_for_topic(topic, message, retry_on_nil_device)
return nil if device.nil?

data = JSON.parse(message)["data"] if message
return nil if data.nil? or data&.empty?

data.each do |reading|
storer.store(device, reading)
end

return true
rescue Exception => e
Sentry.capture_exception(e)
raise e if Rails.env.test?
#puts e.inspect
#puts message
end

# takes a raw packet and converts into JSON
def parse_raw_readings(message, device_id=nil)
crumb = Sentry::Breadcrumb.new(
category: "MqttMessagesHandler.parse_raw_readings",
message: "Parsing raw readings",
data: { message: message.encode("UTF-8", invalid: :replace, undef: :replace), device_id: device_id }
)
Sentry.add_breadcrumb(crumb)
clean_tm = message[1..-2].split(",")[0].gsub("t:", "").strip
raw_readings = message[1..-2].split(",")[1..]

reading = { 'data' => ['recorded_at' => clean_tm, 'sensors' => []] }

raw_readings.each do |raw_read|
raw_id = raw_read.split(":")[0].strip
raw_value = raw_read.split(":")[1]&.strip
reading['data'].first['sensors'] << { 'id' => raw_id, 'value' => raw_value }
end

crumb = Sentry::Breadcrumb.new(
category: "MqttMessagesHandler.parse_raw_readings",
message: "Readings data constructed",
data: { message: message.encode("UTF-8", invalid: :replace, undef: :replace), reading: reading, device_id: device_id }
)
Sentry.add_breadcrumb(crumb)
def handle_info(topic, message, retry_on_nil_device)
device = find_device_for_topic(topic, message, retry_on_nil_device)
return nil if device.nil?
json_message = JSON.parse(message)
device.update_column(:hardware_info, json_message)
return true
end

JSON[reading]
def parse_raw_readings(message)
JSON[raw_readings_parser.parse(message)]
end

def handshake_device(topic)
Expand All @@ -112,29 +64,38 @@ def handshake_device(topic)
}.to_json)
end

# takes a packet and returns 'device token' from topic
def device_token(topic)
topic[/device\/sck\/(.*?)\//m, 1].to_s
def log_message_to_sentry(topic, message)
Sentry.set_tags('mqtt-topic': topic)
crumb = Sentry::Breadcrumb.new(
category: "MqttMessagesHandler.handle_topic",
message: "Handling topic #{topic}",
data: { topic: topic, message: message }
)
Sentry.add_breadcrumb(crumb)
end

# takes a packet and returns 'data' from payload
def data(message)
# TODO: what if message is empty?
if message
begin
JSON.parse(message)['data']
rescue JSON::ParserError
# Handle error
end
else
raise "No data(message)"
end
def find_device_for_topic(topic, message, retry_on_nil_device)
device = Device.find_by(device_token: device_token(topic))
handle_nil_device(topic, message, retry_on_nil_device) if device.nil?
return device
end

def handle_nil_device(topic, message, retry_on_nil_device)
orphan_device = OrphanDevice.find_by_device_token(device_token(topic))
if topic.to_s.include?("info") && !topic.to_s.include?("bridge") && orphan_device
RetryMQTTMessageJob.perform_later(topic, message) if retry_on_nil_device
end
end

private
def device_token(topic)
device_token = topic[/device\/sck\/(.*?)\//m, 1].to_s
end

def storer
@storer ||= Storer.new
end

def raw_readings_parser
@raw_readings_parser ||= RawMqttMessageParser.new
end
end
4 changes: 2 additions & 2 deletions app/lib/raw_mqtt_message_parser.rb
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
require_relative "../grammars/raw_message"

class RawMQTTMessageParser
class RawMqttMessageParser
def initialize
@parser = RawMessageParser.new
end

def parse(message)
parser.parse(self.convert_to_ascii(message))&.to_hash
parser.parse(self.convert_to_ascii(message.strip))&.to_hash
end

private
Expand Down
12 changes: 0 additions & 12 deletions spec/lib/mqtt_messages_handler_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -52,18 +52,6 @@
)
end

describe '#device_token' do
it 'returns device_token from topic' do
expect(message_handler.device_token(@packet.topic)).to eq(device.device_token)
end
end

describe '#data' do
it 'returns parsed data from payload' do
expect(message_handler.data(@packet.payload)).to match_array(@data)
end
end

describe '#readings' do
before do
# storer data processing
Expand Down
85 changes: 85 additions & 0 deletions spec/lib/raw_mqtt_message_parser_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
require 'rails_helper'

RSpec.describe RawMqttMessageParser do
subject(:parser) {
RawMqttMessageParser.new
}

it "parses empty messages" do
message = "{}"
parsed = parser.parse(message)
expect(parsed).to eq({ data: [ { recorded_at: nil, sensors: [] }]})
end

it "parses messages with a timestamp" do
message = "{t:2024-09-25T13:19:38Z}"
parsed = parser.parse(message)
expect(parsed).to eq({ data: [ { recorded_at: "2024-09-25T13:19:38Z", sensors: [] }]})
end

it "parses messages with a timestamp and one postitive integer value" do
message = "{t:2024-09-25T13:19:38Z,1:2}"
parsed = parser.parse(message)
expect(parsed).to eq({ data: [ { recorded_at: "2024-09-25T13:19:38Z", sensors: [{id: "1", value: "2"}] }]})
end

it "parses messages with a timestamp and one negative integer value" do
message = "{t:2024-09-25T13:19:38Z,2:-3}"
parsed = parser.parse(message)
expect(parsed).to eq({ data: [ { recorded_at: "2024-09-25T13:19:38Z", sensors: [{id: "2", value: "-3"}] }]})
end

it "parses messages with a timestamp and one positive float value" do
message = "{t:2024-09-25T13:19:38Z,10:3.12345}"
parsed = parser.parse(message)
expect(parsed).to eq({ data: [ { recorded_at: "2024-09-25T13:19:38Z", sensors: [{id: "10", value: "3.12345"}] }]})
end

it "parses messages with a timestamp and one negative float value" do
message = "{t:2024-09-25T13:19:38Z,100:-2000.12345}"
parsed = parser.parse(message)
expect(parsed).to eq({ data: [ { recorded_at: "2024-09-25T13:19:38Z", sensors: [{id: "100", value: "-2000.12345"}] }]})
end

it "parses messages with a timestamp and multiple values" do
message = "{t:2024-09-25T13:19:38Z,100:-2000.12345,21:12345.23450}"
parsed = parser.parse(message)
expect(parsed).to eq({ data: [ { recorded_at: "2024-09-25T13:19:38Z", sensors: [{id: "100", value: "-2000.12345"}, { id: "21", value: "12345.23450"}] }]})
end

it "strips non-ascii characters from messages" do
message = "{t:2024-09-25T13:19:38Z💣💣💣💣,100:-2000.12345,21:12345.23450}"
parsed = parser.parse(message)
expect(parsed).to eq({ data: [ { recorded_at: "2024-09-25T13:19:38Z", sensors: [{id: "100", value: "-2000.12345"}, { id: "21", value: "12345.23450"}] }]})
end

it "returns nil if no valid message parsed" do
message = "ceci n'est pas un message"
parsed = parser.parse(message)
expect(parsed).to eq(nil)
end

it "parses timestamps at any position in the packet" do
message = "{100:-2000.12345,t:2024-09-25T13:19:38Z,21:12345.23450}"
parsed = parser.parse(message)
expect(parsed).to eq({ data: [ { recorded_at: "2024-09-25T13:19:38Z", sensors: [{id: "100", value: "-2000.12345"}, { id: "21", value: "12345.23450"}] }]})
end

it "parses messages with spaces between entries" do
message = "{t:2024-09-25T13:19:38Z, 100 : -2000.12345, 21 : 12345.23450}"
parsed = parser.parse(message)
expect(parsed).to eq({ data: [ { recorded_at: "2024-09-25T13:19:38Z", sensors: [{id: "100", value: "-2000.12345"}, { id: "21", value: "12345.23450"}] }]})
end

it "parses messages with leading spaces after the braces" do
message = "{ t:2024-09-25T13:19:38Z,100:-2000.12345,21:12345.23450 }"
parsed = parser.parse(message)
expect(parsed).to eq({ data: [ { recorded_at: "2024-09-25T13:19:38Z", sensors: [{id: "100", value: "-2000.12345"}, { id: "21", value: "12345.23450"}] }]})
end

it "parses messages padded with whitespace" do
message = " {t:2024-09-25T13:19:38Z,100:-2000.12345,21:12345.23450} "
parsed = parser.parse(message)
expect(parsed).to eq({ data: [ { recorded_at: "2024-09-25T13:19:38Z", sensors: [{id: "100", value: "-2000.12345"}, { id: "21", value: "12345.23450"}] }]})
end
end

0 comments on commit 39a52e3

Please sign in to comment.