- PUB publishes a message;
- Backend records this message in DB;
- SUB subscribes to a topic;
- Backend retrieves the messages of this topic from DB;
- Messages are sent to SUB;
- Once the SUB acknowledged / received the message, backend removes the message from DB.
- PUB publishes a message;
- Backend records the message in DB;
- SUB1 and SUB2 subscribe to a topic;
- Backend retrieves the messages of this topic;
- Messages are sent to SUB1 and SUB2;
- Backend records the read position of SUB1 and SUB2, the next message’s retrieval starts from this position.
EMQ X supports retaining the client's connection state in Redis or DB.
EMQ X Persistence supports subscription by broker. When a client goes online, the persistence module loads the subscriptions of the client from Redis or Databases.
EMQ X allowes storing messages in Redis, MySQL, PostgreSQL, MongoDB, Cassandra, DynamoDB, InfluxDB, OpenTSDB and Timescale:
Persistence Plugins | Config File | Description |
---|---|---|
emqx_backend_redis | emqx_backend_redis.conf | Redis Message Persistence |
emqx_backend_mysql | emqx_backend_mysql.conf | MySQL Message Persistence |
emqx_backend_pgsql | emqx_backend_pgsql.conf | PostgreSQL Message Persistence |
emqx_backend_mongo | emqx_backend_mongo.conf | MongoDB Message Persistence |
emqx_backend_cassa | emqx_backend_cassa.conf | Cassandra Message Persistence |
emqx_backend_dynamo | emqx_backend_dynamo.conf | DynamoDB Message Persistence |
emqx_backend_influxdb | emqx_backend_influxdb.conf | InfluxDB Message Persistence |
emqx_backend_opentsdb | emqx_backend_opentsdb.conf | OpenTSDB Message Persistence |
emqx_backend_timescale | emqx_backend_timescale.conf | Timescale Message Persistence |
Config file: emqx_backend_redis.conf
Config Connection Pool of Multiple Redis Servers:
## Redis Server
backend.redis.pool1.server = 127.0.0.1:6379
## Redis Sentinel
## backend.redis.server = 127.0.0.1:26378
##Redis Sentinel Cluster name
## backend.redis.sentinel = mymaster
## Redis Pool Size
backend.redis.pool1.pool_size = 8
## Redis database
backend.redis.pool1.database = 1
## Redis subscribe channel
backend.redis.pool1.channel = mqtt_channel
## Expired after seconds, if =< 0 take the default value
backend.redis.msg.expired_after = 3600
## Client Connected Record
backend.redis.hook.client.connected.1 = {"action": {"function": "on_client_connected"}, "pool": "pool1"}
## Subscribe Lookup Record
backend.redis.hook.client.connected.2 = {"action": {"function": "on_subscribe_lookup"}, "pool": "pool1"}
## Client DisConnected Record
backend.redis.hook.client.disconnected.1 = {"action": {"function": "on_client_disconnected"}, "pool": "pool1"}
## Lookup Unread Message for one QOS > 0
backend.redis.hook.session.subscribed.1 = {"topic": "queue/#", "action": {"function": "on_message_fetch_for_queue"}, "pool": "pool1"}
## Lookup Unread Message for many QOS > 0
backend.redis.hook.session.subscribed.2 = {"topic": "pubsub/#", "action": {"function": "on_message_fetch_for_pubsub"}, "pool": "pool1"}
## Lookup Retain Message
backend.redis.hook.session.subscribed.3 = {"action": {"function": "on_retain_lookup"}, "pool": "pool1"}
## Store Publish Message QOS > 0
backend.redis.hook.message.publish.1 = {"topic": "#", "action": {"function": "on_message_publish"}, "pool": "pool1"}
## Store Retain Message
backend.redis.hook.message.publish.2 = {"topic": "#", "action": {"function": "on_message_retain"}, "pool": "pool1"}
## Delete Retain Message
backend.redis.hook.message.publish.3 = {"topic": "#", "action": {"function": "on_retain_delete"}, "pool": "pool1"}
## Store Ack for one
backend.redis.hook.message.acked.1 = {"topic": "queue/#", "action": {"function": "on_message_acked_for_queue"}, "pool": "pool1"}
## Store Ack for many
backend.redis.hook.message.acked.2 = {"topic": "pubsub/#", "action": {"function": "on_message_acked_for_pubsub"}, "pool": "pool1"}
hook | topic | action/function | Description |
---|---|---|---|
client.connected | on_client_connected | Store client connected state | |
client.connected | on_subscribe_lookup | Subscribe to topics | |
client.disconnected | on_client_disconnected | Store the client disconnected state | |
session.subscribed | queue/# | on_message_fetch_for_queue | Fetch one to one offline message |
session.subscribed | pubsub/# | on_message_fetch_for_pubsub | Fetch one to many offline message |
session.subscribed | # | on_retain_lookup | Lookup retained message |
message.publish | # | on_message_publish | Store the published messages |
message.publish | # | on_message_retain | Store retained messages |
message.publish | # | on_retain_delete | Delete retained messages |
message.acked | queue/# | on_message_acked_for_queue | Process ACK of one to one messages |
message.acked | pubsub/# | on_message_acked_for_pubsub | Process ACK of one to many messages |
hook | Parameter | Example (Fields separated exactly by one space) |
---|---|---|
client.connected | clientid | SET conn:${clientid} clientid |
client.disconnected | clientid | SET disconn:${clientid} clientid |
session.subscribed | clientid, topic, qos | HSET sub:${clientid} topic qos |
session.unsubscribed | clientid, topic | SET unsub:${clientid} topic |
message.publish | message, msgid, topic, payload, qos, clientid | RPUSH pub:${topic} msgid |
message.acked | msgid, topic, clientid | HSET ack:${clientid} topic msgid |
message.delivered | msgid, topic, clientid | HSET delivered:${clientid} topic msgid |
Redis backend supports raw 'commands' in 'action', e.g.:
## After a client connected to the EMQ X server, it executes a redis command (multiple redis commands also supported)
backend.redis.hook.client.connected.3 = {"action": {"commands": ["SET conn:${clientid} clientid"]}, "pool": "pool1"}
mqtt:client Hash for devices' connection state:
hmset key = mqtt:client:${clientid} value = {state:int, online_at:timestamp, offline_at:timestamp} hset key = mqtt:node:${node} field = ${clientid} value = ${ts}
Lookup devices' connection state:
HGETALL "mqtt:client:${clientId}"
E.g.: Client with ClientId 'test' goes online:
HGETALL mqtt:client:test 1) "state" 2) "1" 3) "online_at" 4) "1481685802" 5) "offline_at" 6) "undefined"
Client with ClientId 'test' goes offline:
HGETALL mqtt:client:test 1) "state" 2) "0" 3) "online_at" 4) "1481685802" 5) "offline_at" 6) "1481685924"
mqtt:retain Hash for retained messages:
hmset key = mqtt:retain:${topic} value = {id: string, from: string, qos: int, topic: string, retain: int, payload: string, ts: timestamp}
Lookup retained message:
HGETALL "mqtt:retain:${topic}"
Lookup retained messages with a topic of 'retain':
HGETALL mqtt:retain:topic 1) "id" 2) "6P9NLcJ65VXBbC22sYb4" 3) "from" 4) "test" 5) "qos" 6) "1" 7) "topic" 8) "topic" 9) "retain" 10) "true" 11) "payload" 12) "Hello world!" 13) "ts" 14) "1481690659"
mqtt:msg Hash for MQTT messages:
hmset key = mqtt:msg:${msgid} value = {id: string, from: string, qos: int, topic: string, retain: int, payload: string, ts: timestamp} zadd key = mqtt:msg:${topic} field = 1 value = ${msgid}
mqtt:acked SET stores acknowledgements from the clients:
set key = mqtt:acked:${clientid}:${topic} value = ${msgid}
mqtt:sub Hash for Subscriptions:
hset key = mqtt:sub:${clientid} field = ${topic} value = ${qos}
A client subscribes to a topic:
HSET mqtt:sub:${clientid} ${topic} ${qos}
A client with ClientId of 'test' subscribes to topic1 and topic2:
HSET "mqtt:sub:test" "topic1" 1 HSET "mqtt:sub:test" "topic2" 2
Lookup the subscribed topics of client with ClientId of 'test':
HGETALL mqtt:sub:test 1) "topic1" 2) "1" 3) "topic2" 4) "2"
When a device subscribes / unsubscribes to topics, EMQ X broker publish an event to the Redis:
PUBLISH channel = "mqtt_channel" message = {type: string , topic: string, clientid: string, qos: int} \*type: [subscribe/unsubscribe]
client with ClientID 'test' subscribe to 'topic0':
PUBLISH "mqtt_channel" "{\"type\": \"subscribe\", \"topic\": \"topic0\", \"clientid\": \"test\", \"qos\": \"0\"}"
Client with ClientId 'test' unsubscribes to 'test_topic0':
PUBLISH "mqtt_channel" "{\"type\": \"unsubscribe\", \"topic\": \"test_topic0\", \"clientid\": \"test\"}"
./bin/emqx_ctl plugins load emqx_backend_redis
Config file: emqx_backend_mysql.conf
Connection pool of multiple MySQL servers is supported:
## Mysql Server
backend.mysql.pool1.server = 127.0.0.1:3306
## Mysql Pool Size
backend.mysql.pool1.pool_size = 8
## Mysql Username
backend.mysql.pool1.user = root
## Mysql Password
backend.mysql.pool1.password = public
## Mysql Database
backend.mysql.pool1.database = mqtt
## Max number of fetch offline messages. Without count limit if infinity
## backend.mysql.max_returned_count = 500
## Time Range. Without time limit if infinity
## d - day
## h - hour
## m - minute
## s - second
## backend.mysql.time_range = 2h
## Client Connected Record
backend.mysql.hook.client.connected.1 = {"action": {"function": "on_client_connected"}, "pool": "pool1"}
## Subscribe Lookup Record
backend.mysql.hook.client.connected.2 = {"action": {"function": "on_subscribe_lookup"}, "pool": "pool1"}
## Client DisConnected Record
backend.mysql.hook.client.disconnected.1 = {"action": {"function": "on_client_disconnected"}, "pool": "pool1"}
## Lookup Unread Message QOS > 0
backend.mysql.hook.session.subscribed.1 = {"topic": "#", "action": {"function": "on_message_fetch"}, "pool": "pool1"}
## Lookup Retain Message
backend.mysql.hook.session.subscribed.2 = {"topic": "#", "action": {"function": "on_retain_lookup"}, "pool": "pool1"}
## Store Publish Message QOS > 0
backend.mysql.hook.message.publish.1 = {"topic": "#", "action": {"function": "on_message_publish"}, "pool": "pool1"}
## Store Retain Message
backend.mysql.hook.message.publish.2 = {"topic": "#", "action": {"function": "on_message_retain"}, "pool": "pool1"}
## Delete Retain Message
backend.mysql.hook.message.publish.3 = {"topic": "#", "action": {"function": "on_retain_delete"}, "pool": "pool1"}
## Store Ack
backend.mysql.hook.message.acked.1 = {"topic": "#", "action": {"function": "on_message_acked"}, "pool": "pool1"}
## Get offline messages
## "offline_opts": Get configuration for offline messages
## max_returned_count: Maximum number of offline messages get at a time
## time_range: Get only messages in the current time range
## backend.mysql.hook.session.subscribed.1 = {"topic": "#", "action": {"function": "on_message_fetch"}, "offline_opts": {"max_returned_count": 500, "time_range": "2h"}, "pool": "pool1"}
## If you need to store Qos0 messages, you can enable the following configuration
## Warning: When the following configuration is enabled, 'on_message_fetch' needs to be disabled, otherwise qos1, qos2 messages will be stored twice
## backend.mysql.hook.message.publish.4 = {"topic": "#", "action": {"function": "on_message_store"}, "pool": "pool1"}
hook | topic | action | Description |
---|---|---|---|
client.connected | on_client_connected | Store client connected state | |
client.connected | on_subscribe_lookup | Subscribed topics | |
client.disconnected | on_client_disconnected | Store client disconnected state | |
session.subscribed | # | on_message_fetch | Fetch offline messages |
session.subscribed | # | on_retain_lookup | Lookup retained messages |
message.publish | # | on_message_publish | Store published messages |
message.publish | # | on_message_retain | Store retained messages |
message.publish | # | on_retain_delete | Delete retained messages |
message.acked | # | on_message_acked | Process ACK |
hook | Parameters | Example (${name} represents available parameter) |
---|---|---|
client.connected | clientid | insert into conn(clientid) values(${clientid}) |
client.disconnected | clientid | insert into disconn(clientid) values(${clientid}) |
session.subscribed | clientid, topic, qos | insert into sub(topic, qos) values(${topic}, ${qos}) |
session.unsubscribed | clientid, topic | delete from sub where topic = ${topic} |
message.publish | msgid, topic, payload, qos, clientid | insert into msg(msgid, topic) values(${msgid}, ${topic}) |
message.acked | msgid, topic, clientid | insert into ack(msgid, topic) values(${msgid}, ${topic}) |
message.delivered | msgid, topic, clientid | insert into delivered(msgid, topic) values(${msgid}, ${topic}) |
MySQL backend supports SQL in 'action':
## After a client is connected to the EMQ X server, it executes a SQL command (multiple SQL commands also supported)
backend.mysql.hook.client.connected.3 = {"action": {"sql": ["insert into conn(clientid) values(${clientid})"]}, "pool": "pool1"}
create database mqtt;
mysql -u root -p mqtt < etc/sql/emqx_backend_mysql.sql
Note
DB name is free of choice
mqtt_client stores client connection states:
DROP TABLE IF EXISTS `mqtt_client`;
CREATE TABLE `mqtt_client` (
`id` int(11) unsigned NOT NULL AUTO_INCREMENT,
`clientid` varchar(64) DEFAULT NULL,
`state` varchar(3) DEFAULT NULL,
`node` varchar(100) DEFAULT NULL,
`online_at` datetime DEFAULT NULL,
`offline_at` datetime DEFAULT NULL,
`created` timestamp NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (`id`),
KEY `mqtt_client_idx` (`clientid`),
UNIQUE KEY `mqtt_client_key` (`clientid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
Query the client connection state:
select * from mqtt_client where clientid = ${clientid};
If client 'test' is online:
select * from mqtt_client where clientid = "test";
+----+----------+-------+----------------+---------------------+---------------------+---------------------+
| id | clientid | state | node | online_at | offline_at | created |
+----+----------+-------+----------------+---------------------+---------------------+---------------------+
| 1 | test | 1 | emqx@127.0.0.1 | 2016-11-15 09:40:40 | NULL | 2016-12-24 09:40:22 |
+----+----------+-------+----------------+---------------------+---------------------+---------------------+
1 rows in set (0.00 sec)
If client 'test' is offline:
select * from mqtt_client where clientid = "test";
+----+----------+-------+----------------+---------------------+---------------------+---------------------+
| id | clientid | state | node | online_at | offline_at | created |
+----+----------+-------+----------------+---------------------+---------------------+---------------------+
| 1 | test | 0 | emqx@127.0.0.1 | 2016-11-15 09:40:40 | 2016-11-15 09:46:10 | 2016-12-24 09:40:22 |
+----+----------+-------+----------------+---------------------+---------------------+---------------------+
1 rows in set (0.00 sec)
mqtt_sub table stores MQTT subscriptions of clients:
DROP TABLE IF EXISTS `mqtt_sub`;
CREATE TABLE `mqtt_sub` (
`id` int(11) unsigned NOT NULL AUTO_INCREMENT,
`clientid` varchar(64) DEFAULT NULL,
`topic` varchar(255) DEFAULT NULL,
`qos` int(3) DEFAULT NULL,
`created` timestamp NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (`id`),
KEY `mqtt_sub_idx` (`clientid`,`topic`(255),`qos`),
UNIQUE KEY `mqtt_sub_key` (`clientid`,`topic`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
E.g., client 'test' subscribes to 'test_topic1' and 'test_topic2':
insert into mqtt_sub(clientid, topic, qos) values("test", "test_topic1", 1);
insert into mqtt_sub(clientid, topic, qos) values("test", "test_topic2", 2);
Query subscription of a client:
select * from mqtt_sub where clientid = ${clientid};
E.g., query the Subscription of client 'test':
select * from mqtt_sub where clientid = "test";
+----+--------------+-------------+------+---------------------+
| id | clientId | topic | qos | created |
+----+--------------+-------------+------+---------------------+
| 1 | test | test_topic1 | 1 | 2016-12-24 17:09:05 |
| 2 | test | test_topic2 | 2 | 2016-12-24 17:12:51 |
+----+--------------+-------------+------+---------------------+
2 rows in set (0.00 sec)
mqtt_msg stores MQTT messages:
DROP TABLE IF EXISTS `mqtt_msg`;
CREATE TABLE `mqtt_msg` (
`id` int(11) unsigned NOT NULL AUTO_INCREMENT,
`msgid` varchar(100) DEFAULT NULL,
`topic` varchar(1024) NOT NULL,
`sender` varchar(1024) DEFAULT NULL,
`node` varchar(60) DEFAULT NULL,
`qos` int(11) NOT NULL DEFAULT '0',
`retain` tinyint(2) DEFAULT NULL,
`payload` blob,
`arrived` datetime NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
Query messages published by a client:
select * from mqtt_msg where sender = ${clientid};
Query messages published by client 'test':
select * from mqtt_msg where sender = "test";
+----+-------------------------------+----------+--------+------+-----+--------+---------+---------------------+
| id | msgid | topic | sender | node | qos | retain | payload | arrived |
+----+-------------------------------+----------+--------+------+-----+--------+---------+---------------------+
| 1 | 53F98F80F66017005000004A60003 | hello | test | NULL | 1 | 0 | hello | 2016-12-24 17:25:12 |
| 2 | 53F98F9FE42AD7005000004A60004 | world | test | NULL | 1 | 0 | world | 2016-12-24 17:25:45 |
+----+-------------------------------+----------+--------+------+-----+--------+---------+---------------------+
2 rows in set (0.00 sec)
mqtt_retain stores retained messages:
DROP TABLE IF EXISTS `mqtt_retain`;
CREATE TABLE `mqtt_retain` (
`id` int(11) unsigned NOT NULL AUTO_INCREMENT,
`topic` varchar(200) DEFAULT NULL,
`msgid` varchar(60) DEFAULT NULL,
`sender` varchar(100) DEFAULT NULL,
`node` varchar(100) DEFAULT NULL,
`qos` int(2) DEFAULT NULL,
`payload` blob,
`arrived` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (`id`),
UNIQUE KEY `mqtt_retain_key` (`topic`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
Query retained messages:
select * from mqtt_retain where topic = ${topic};
Query retained messages with topic 'retain':
select * from mqtt_retain where topic = "retain";
+----+----------+-------------------------------+---------+------+------+---------+---------------------+
| id | topic | msgid | sender | node | qos | payload | arrived |
+----+----------+-------------------------------+---------+------+------+---------+---------------------+
| 1 | retain | 53F33F7E4741E7007000004B70001 | test | NULL | 1 | www | 2016-12-24 16:55:18 |
+----+----------+-------------------------------+---------+------+------+---------+---------------------+
1 rows in set (0.00 sec)
mqtt_acked stores acknowledgements from the clients:
DROP TABLE IF EXISTS `mqtt_acked`;
CREATE TABLE `mqtt_acked` (
`id` int(11) unsigned NOT NULL AUTO_INCREMENT,
`clientid` varchar(200) DEFAULT NULL,
`topic` varchar(200) DEFAULT NULL,
`mid` int(200) DEFAULT NULL,
`created` timestamp NULL DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `mqtt_acked_key` (`clientid`,`topic`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
./bin/emqx_ctl plugins load emqx_backend_mysql
Config file: emqx_backend_pgsql.conf
Connection pool of multiple PostgreSQL servers is supported:
## Pgsql Server
backend.pgsql.pool1.server = 127.0.0.1:5432
## Pgsql Pool Size
backend.pgsql.pool1.pool_size = 8
## Pgsql Username
backend.pgsql.pool1.username = root
## Pgsql Password
backend.pgsql.pool1.password = public
## Pgsql Database
backend.pgsql.pool1.database = mqtt
## Pgsql Ssl
backend.pgsql.pool1.ssl = false
## Max number of fetch offline messages. Without count limit if infinity
## backend.pgsql.max_returned_count = 500
## Time Range. Without time limit if infinity
## d - day
## h - hour
## m - minute
## s - second
## backend.pgsql.time_range = 2h
## Client Connected Record
backend.pgsql.hook.client.connected.1 = {"action": {"function": "on_client_connected"}, "pool": "pool1"}
## Subscribe Lookup Record
backend.pgsql.hook.client.connected.2 = {"action": {"function": "on_subscribe_lookup"}, "pool": "pool1"}
## Client DisConnected Record
backend.pgsql.hook.client.disconnected.1 = {"action": {"function": "on_client_disconnected"}, "pool": "pool1"}
## Lookup Unread Message QOS > 0
backend.pgsql.hook.session.subscribed.1 = {"topic": "#", "action": {"function": "on_message_fetch"}, "pool": "pool1"}
## Lookup Retain Message
backend.pgsql.hook.session.subscribed.2 = {"topic": "#", "action": {"function": "on_retain_lookup"}, "pool": "pool1"}
## Store Publish Message QOS > 0
backend.pgsql.hook.message.publish.1 = {"topic": "#", "action": {"function": "on_message_publish"}, "pool": "pool1"}
## Store Retain Message
backend.pgsql.hook.message.publish.2 = {"topic": "#", "action": {"function": "on_message_retain"}, "pool": "pool1"}
## Delete Retain Message
backend.pgsql.hook.message.publish.3 = {"topic": "#", "action": {"function": "on_retain_delete"}, "pool": "pool1"}
## Store Ack
backend.pgsql.hook.message.acked.1 = {"topic": "#", "action": {"function": "on_message_acked"}, "pool": "pool1"}
## Get offline messages
## "offline_opts": Get configuration for offline messages
## max_returned_count: Maximum number of offline messages get at a time
## time_range: Get only messages in the current time range
## backend.pgsql.hook.session.subscribed.1 = {"topic": "#", "action": {"function": "on_message_fetch"}, "offline_opts": {"max_returned_count": 500, "time_range": "2h"}, "pool": "pool1"}
## If you need to store Qos0 messages, you can enable the following configuration
## Warning: When the following configuration is enabled, 'on_message_fetch' needs to be disabled, otherwise qos1, qos2 messages will be stored twice
## backend.pgsql.hook.message.publish.4 = {"topic": "#", "action": {"function": "on_message_store"}, "pool": "pool1"}
hook | topic | action | Description |
---|---|---|---|
client.connected | on_client_connected | Store client connected state | |
client.connected | on_subscribe_lookup | Subscribed topics | |
client.disconnected | on_client_disconnected | Store client disconnected state | |
session.subscribed | # | on_message_fetch | Fetch offline messages |
session.subscribed | # | on_retain_lookup | Lookup retained messages |
message.publish | # | on_message_publish | Store published messages |
message.publish | # | on_message_retain | Store retained messages |
message.publish | # | on_retain_delete | Delete retained messages |
message.acked | # | on_message_acked | Process ACK |
hook | Parameters | Example (${name} represents available parameter) |
---|---|---|
client.connected | clientid | insert into conn(clientid) values(${clientid}) |
client.disconnected | clientid | insert into disconn(clientid) values(${clientid}) |
session.subscribed | clientid, topic, qos | insert into sub(topic, qos) values(${topic}, ${qos}) |
session.unsubscribed | clientid, topic | delete from sub where topic = ${topic} |
message.publish | msgid, topic, payload, qos, clientid | insert into msg(msgid, topic) values(${msgid}, ${topic}) |
message.acked | msgid, topic, clientid | insert into ack(msgid, topic) values(${msgid}, ${topic}) |
message.delivered | msgid, topic, clientid | insert into delivered(msgid, topic) values(${msgid}, ${topic}) |
PostgreSQL backend supports SQL in 'action':
## After a client is connected to the EMQ X server, it executes a SQL command (multiple command also supported)
backend.pgsql.hook.client.connected.3 = {"action": {"sql": ["insert into conn(clientid) values(${clientid})"]}, "pool": "pool1"}
createdb mqtt -E UTF8 -e
\i etc/sql/emqx_backend_pgsql.sql
Note
DB name is free of choice
mqtt_client stores client connection states:
CREATE TABLE mqtt_client( id SERIAL primary key, clientid character varying(100), state integer, node character varying(100), online_at timestamp, offline_at timestamp, created timestamp without time zone, UNIQUE (clientid) );
Query a client's connection state:
select * from mqtt_client where clientid = ${clientid};
E.g., if client 'test' is online:
select * from mqtt_client where clientid = 'test'; id | clientid | state | node | online_at | offline_at | created ----+----------+-------+----------------+---------------------+---------------------+--------------------- 1 | test | 1 | emqx@127.0.0.1 | 2016-11-15 09:40:40 | NULL | 2016-12-24 09:40:22 (1 rows)
Client 'test' is offline:
select * from mqtt_client where clientid = 'test'; id | clientid | state | nod | online_at | offline_at | created ----+----------+-------+----------------+---------------------+---------------------+--------------------- 1 | test | 0 | emqx@127.0.0.1 | 2016-11-15 09:40:40 | 2016-11-15 09:46:10 | 2016-12-24 09:40:22 (1 rows)
mqtt_sub stores subscriptions of clients:
CREATE TABLE mqtt_sub( id SERIAL primary key, clientid character varying(100), topic character varying(200), qos integer, created timestamp without time zone, UNIQUE (clientid, topic) );
E.g., client 'test' subscribes to topic 'test_topic1' and 'test_topic2':
insert into mqtt_sub(clientid, topic, qos) values('test', 'test_topic1', 1);
insert into mqtt_sub(clientid, topic, qos) values('test', 'test_topic2', 2);
Query subscription of a client:
select * from mqtt_sub where clientid = ${clientid};
Query subscription of client 'test':
select * from mqtt_sub where clientid = 'test'; id | clientId | topic | qos | created ----+--------------+-------------+------+--------------------- 1 | test | test_topic1 | 1 | 2016-12-24 17:09:05 2 | test | test_topic2 | 2 | 2016-12-24 17:12:51 (2 rows)
mqtt_msg stores MQTT messages:
CREATE TABLE mqtt_msg (
id SERIAL primary key,
msgid character varying(60),
sender character varying(100),
topic character varying(200),
qos integer,
retain integer,
payload text,
arrived timestamp without time zone
);
Query messages published by a client:
select * from mqtt_msg where sender = ${clientid};
Query messages published by client 'test':
select * from mqtt_msg where sender = 'test'; id | msgid | topic | sender | node | qos | retain | payload | arrived ----+-------------------------------+----------+--------+------+-----+--------+---------+--------------------- 1 | 53F98F80F66017005000004A60003 | hello | test | NULL | 1 | 0 | hello | 2016-12-24 17:25:12 2 | 53F98F9FE42AD7005000004A60004 | world | test | NULL | 1 | 0 | world | 2016-12-24 17:25:45 (2 rows)
mqtt_retain stores retained messages:
CREATE TABLE mqtt_retain(
id SERIAL primary key,
topic character varying(200),
msgid character varying(60),
sender character varying(100),
qos integer,
payload text,
arrived timestamp without time zone,
UNIQUE (topic)
);
Query retained messages:
select * from mqtt_retain where topic = ${topic};
Query retained messages with topic 'retain':
select * from mqtt_retain where topic = 'retain'; id | topic | msgid | sender | node | qos | payload | arrived ----+----------+-------------------------------+---------+------+------+---------+--------------------- 1 | retain | 53F33F7E4741E7007000004B70001 | test | NULL | 1 | www | 2016-12-24 16:55:18 (1 rows)
mqtt_acked stores acknowledgements from the clients:
CREATE TABLE mqtt_acked (
id SERIAL primary key,
clientid character varying(100),
topic character varying(100),
mid integer,
created timestamp without time zone,
UNIQUE (clientid, topic)
);
./bin/emqx_ctl plugins load emqx_backend_pgsql
Config file: emqx_backend_mongo.conf
Connection pool of multiple MongoDB servers is supported:
## MongoDB Server Pools
## Mongo Topology Type single|unknown|sharded|rs
backend.mongo.pool1.type = single
## If type rs, need config setname
## backend.mongo.pool1.rs_set_name = testrs
## Mongo Server 127.0.0.1:27017,127.0.0.2:27017...
backend.mongo.pool1.server = 127.0.0.1:27017
## MongoDB Pool Size
backend.mongo.pool1.c_pool_size = 8
## MongoDB Database
backend.mongo.pool1.database = mqtt
## Mongo User
## backend.mongo.pool1.login = emqtt
## Mongo Password
## backend.mongo.pool1.password = emqtt
## MongoDB AuthSource
## Value: String
## Default: mqtt
## backend.mongo.pool1.auth_source = admin
## Whether to enable SSL connection.
##
## Value: true | false
## backend.mongo.pool1.ssl = false
## SSL keyfile.
##
## Value: File
## backend.mongo.pool1.keyfile =
## SSL certfile.
##
## Value: File
## backend.mongo.pool1.certfile =
## SSL cacertfile.
##
## Value: File
## backend.mongo.pool1.cacertfile =
# Value: unsafe | safe
## backend.mongo.pool1.w_mode = safe
## Value: master | slave_ok
## backend.mongo.pool1.r_mode = slave_ok
## Mongo Topology Options
## backend.mongo.topology.pool_size = 1
## backend.mongo.topology.max_overflow = 0
## backend.mongo.topology.overflow_ttl = 1000
## backend.mongo.topology.overflow_check_period = 1000
## backend.mongo.topology.local_threshold_ms = 1000
## backend.mongo.topology.connect_timeout_ms = 20000
## backend.mongo.topology.socket_timeout_ms = 100
## backend.mongo.topology.server_selection_timeout_ms = 30000
## backend.mongo.topology.wait_queue_timeout_ms = 1000
## backend.mongo.topology.heartbeat_frequency_ms = 10000
## backend.mongo.topology.min_heartbeat_frequency_ms = 1000
## Max number of fetch offline messages. Without count limit if infinity
## backend.mongo.max_returned_count = 500
## Time Range. Without time limit if infinity
## d - day
## h - hour
## m - minute
## s - second
## backend.mongo.time_range = 2h
## Client Connected Record
backend.mongo.hook.client.connected.1 = {"action": {"function": "on_client_connected"}, "pool": "pool1"}
## Subscribe Lookup Record
backend.mongo.hook.client.connected.2 = {"action": {"function": "on_subscribe_lookup"}, "pool": "pool1"}
## Client DisConnected Record
backend.mongo.hook.client.disconnected.1 = {"action": {"function": "on_client_disconnected"}, "pool": "pool1"}
## Lookup Unread Message QOS > 0
backend.mongo.hook.session.subscribed.1 = {"topic": "#", "action": {"function": "on_message_fetch"}, "pool": "pool1"}
## Lookup Retain Message
backend.mongo.hook.session.subscribed.2 = {"topic": "#", "action": {"function": "on_retain_lookup"}, "pool": "pool1"}
## Store Publish Message QOS > 0, payload_format options mongo_json | plain_text
backend.mongo.hook.message.publish.1 = {"topic": "#", "action": {"function": "on_message_publish"}, "pool": "pool1", "payload_format": "mongo_json"}
## Store Retain Message, payload_format options mongo_json | plain_text
backend.mongo.hook.message.publish.2 = {"topic": "#", "action": {"function": "on_message_retain"}, "pool": "pool1", "payload_format": "mongo_json"}
## Delete Retain Message
backend.mongo.hook.message.publish.3 = {"topic": "#", "action": {"function": "on_retain_delete"}, "pool": "pool1"}
## Store Ack
backend.mongo.hook.message.acked.1 = {"topic": "#", "action": {"function": "on_message_acked"}, "pool": "pool1"}
## Get offline messages
## "offline_opts": Get configuration for offline messages
## max_returned_count: Maximum number of offline messages get at a time
## time_range: Get only messages in the current time range
## backend.mongo.hook.session.subscribed.1 = {"topic": "#", "action": {"function": "on_message_fetch"}, "offline_opts": {"max_returned_count": 500, "time_range": "2h"}, "pool": "pool1"}
## If you need to store Qos0 messages, you can enable the following configuration
## Warning: When the following configuration is enabled, 'on_message_fetch' needs to be disabled, otherwise qos1, qos2 messages will be stored twice
## backend.mongo.hook.message.publish.4 = {"topic": "#", "action": {"function": "on_message_store"}, "pool": "pool1"}
hook | topic | action | Description |
---|---|---|---|
client.connected | on_client_connected | Store client connected state | |
client.connected | on_subscribe_lookup | Subscribed topics | |
client.disconnected | on_client_disconnected | Store client disconnected state | |
session.subscribed | # | on_message_fetch | Fetch offline messages |
session.subscribed | # | on_retain_lookup | Lookup retained messages |
message.publish | # | on_message_publish | Store published messages |
message.publish | # | on_message_retain | Store retained messages |
message.publish | # | on_retain_delete | Delete retained messages |
message.acked | # | on_message_acked | Process ACK |
use mqtt
db.createCollection("mqtt_client")
db.createCollection("mqtt_sub")
db.createCollection("mqtt_msg")
db.createCollection("mqtt_retain")
db.createCollection("mqtt_acked")
db.mqtt_client.ensureIndex({clientid:1, node:2})
db.mqtt_sub.ensureIndex({clientid:1})
db.mqtt_msg.ensureIndex({sender:1, topic:2})
db.mqtt_retain.ensureIndex({topic:1})
Note
DB name is free of choice
mqtt_client stores MQTT clients' connection states:
{
clientid: string,
state: 0,1, //0 disconnected 1 connected
node: string,
online_at: timestamp,
offline_at: timestamp
}
Query client's connection state:
db.mqtt_client.findOne({clientid: ${clientid}})
E.g., if client 'test' is online:
db.mqtt_client.findOne({clientid: "test"})
{
"_id" : ObjectId("58646c9bdde89a9fb9f7fb73"),
"clientid" : "test",
"state" : 1,
"node" : "emqx@127.0.0.1",
"online_at" : 1482976411,
"offline_at" : null
}
Client 'test' is offline:
db.mqtt_client.findOne({clientid: "test"})
{
"_id" : ObjectId("58646c9bdde89a9fb9f7fb73"),
"clientid" : "test",
"state" : 0,
"node" : "emq@127.0.0.1",
"online_at" : 1482976411,
"offline_at" : 1482976501
}
mqtt_sub stores subscriptions of clients:
{
clientid: string,
topic: string,
qos: 0,1,2
}
E.g., client 'test' subscribes to topic 'test_topic1' and 'test_topic2':
db.mqtt_sub.insert({clientid: "test", topic: "test_topic1", qos: 1})
db.mqtt_sub.insert({clientid: "test", topic: "test_topic2", qos: 2})
Query subscription of client 'test':
db.mqtt_sub.find({clientid: "test"})
{ "_id" : ObjectId("58646d90c65dff6ac9668ca1"), "clientid" : "test", "topic" : "test_topic1", "qos" : 1 }
{ "_id" : ObjectId("58646d96c65dff6ac9668ca2"), "clientid" : "test", "topic" : "test_topic2", "qos" : 2 }
mqtt_msg stores MQTT messages:
{
_id: int,
topic: string,
msgid: string,
sender: string,
qos: 0,1,2,
retain: boolean (true, false),
payload: string,
arrived: timestamp
}
Query messages published by a client:
db.mqtt_msg.find({sender: ${clientid}})
Query messages published by client 'test':
db.mqtt_msg.find({sender: "test"})
{
"_id" : 1,
"topic" : "/World",
"msgid" : "AAVEwm0la4RufgAABeIAAQ==",
"sender" : "test",
"qos" : 1,
"retain" : 1,
"payload" : "Hello world!",
"arrived" : 1482976729
}
mqtt_retain stores retained messages:
{
topic: string,
msgid: string,
sender: string,
qos: 0,1,2,
payload: string,
arrived: timestamp
}
Query retained messages:
db.mqtt_retain.findOne({topic: ${topic}})
Query retained messages with topic 'retain':
db.mqtt_retain.findOne({topic: "/World"})
{
"_id" : ObjectId("58646dd9dde89a9fb9f7fb75"),
"topic" : "/World",
"msgid" : "AAVEwm0la4RufgAABeIAAQ==",
"sender" : "c1",
"qos" : 1,
"payload" : "Hello world!",
"arrived" : 1482976729
}
mqtt_acked stores acknowledgements from the clients:
{
clientid: string,
topic: string,
mongo_id: int
}
./bin/emqx_ctl plugins load emqx_backend_mongo
Config file: etc/plugins/emqx_backend_cassa.conf
Multi node Cassandra cluster is supported:
## Cassandra Node
backend.ecql.pool1.nodes = 127.0.0.1:9042
## Cassandra Pool Size
backend.ecql.pool1.size = 8
## Cassandra auto reconnect flag
backend.ecql.pool1.auto_reconnect = 1
## Cassandra Username
backend.ecql.pool1.username = cassandra
## Cassandra Password
backend.ecql.pool1.password = cassandra
## Cassandra Keyspace
backend.ecql.pool1.keyspace = mqtt
## Cassandra Logger type
backend.ecql.pool1.logger = info
## Max number of fetch offline messages. Without count limit if infinity
## backend.cassa.max_returned_count = 500
## Time Range. Without time limit if infinity
## d - day
## h - hour
## m - minute
## s - second
## backend.cassa.time_range = 2h
## Client Connected Record
backend.cassa.hook.client.connected.1 = {"action": {"function": "on_client_connected"}, "pool": "pool1"}
## Subscribe Lookup Record
backend.cassa.hook.client.connected.2 = {"action": {"function": "on_subscription_lookup"}, "pool": "pool1"}
## Client DisConnected Record
backend.cassa.hook.client.disconnected.1 = {"action": {"function": "on_client_disconnected"}, "pool": "pool1"}
## Lookup Unread Message QOS > 0
backend.cassa.hook.session.subscribed.1 = {"topic": "#", "action": {"function": "on_message_fetch"}, "pool": "pool1"}
## Lookup Retain Message
backend.cassa.hook.session.subscribed.2 = {"action": {"function": "on_retain_lookup"}, "pool": "pool1"}
## Store Publish Message QOS > 0
backend.cassa.hook.message.publish.1 = {"topic": "#", "action": {"function": "on_message_publish"}, "pool": "pool1"}
## Delete Acked Record
backend.cassa.hook.session.unsubscribed.1= {"topic": "#", action": {"cql": ["delete from acked where client_id = ${clientid} and topic = ${topic}"]}, "pool": "pool1"}
## Store Retain Message
backend.cassa.hook.message.publish.2 = {"topic": "#", "action": {"function": "on_message_retain"}, "pool": "pool1"}
## Delete Retain Message
backend.cassa.hook.message.publish.3 = {"topic": "#", "action": {"function": "on_retain_delete"}, "pool": "pool1"}
## Store Ack
backend.cassa.hook.message.acked.1 = {"topic": "#", "action": {"function": "on_message_acked"}, "pool": "pool1"}
## Get offline messages
## "offline_opts": Get configuration for offline messages
## max_returned_count: Maximum number of offline messages get at a time
## time_range: Get only messages in the current time range
## backend.cassa.hook.session.subscribed.1 = {"topic": "#", "action": {"function": "on_message_fetch"}, "offline_opts": {"max_returned_count": 500, "time_range": "2h"}, "pool": "pool1"}
## If you need to store Qos0 messages, you can enable the following configuration
## Warning: When the following configuration is enabled, 'on_message_fetch' needs to be disabled, otherwise qos1, qos2 messages will be stored twice
## backend.cassa.hook.message.publish.4 = {"topic": "#", "action": {"function": "on_message_store"}, "pool": "pool1"}
hook | topic | action | Description |
---|---|---|---|
client.connected | on_client_connected | Store client connected state | |
client.connected | on_subscribe_lookup | Subscribed topics | |
client.disconnected | on_client_disconnected | Store client disconnected state | |
session.subscribed | # | on_message_fetch | Fetch offline messages |
session.subscribed | # | on_retain_lookup | Lookup retained messages |
message.publish | # | on_message_publish | Store published messages |
message.publish | # | on_message_retain | Store retained messages |
message.publish | # | on_retain_delete | Delete retained messages |
message.acked | # | on_message_acked | Process ACK |
Customized CQL command parameters includes:
hook | Parameter | Example (${name} in CQL represents available parameter |
---|---|---|
client.connected | clientid | insert into conn(clientid) values(${clientid}) |
client.disconnected | clientid | insert into disconn(clientid) values(${clientid}) |
session.subscribed | clientid, topic, qos | insert into sub(topic, qos) values(${topic}, ${qos}) |
session.unsubscribed | clientid, topic | delete from sub where topic = ${topic} |
message.publish | msgid, topic, payload, qos, clientid | insert into msg(msgid, topic) values(${msgid}, ${topic}) |
message.acked | msgid, topic, clientid | insert into ack(msgid, topic) values(${msgid}, ${topic}) |
message.delivered | msgid, topic, clientid | insert into delivered(msgid, topic) values(${msgid}, ${topic}) |
Cassandra backend supports CLQ in 'action':
## After a client is connected to the EMQ X server, it executes a CQL command(multiple command also supported):
backend.cassa.hook.client.connected.3 = {"action": {"cql": ["insert into conn(clientid) values(${clientid})"]}, "pool": "pool1"}
Create KeySpace:
CREATE KEYSPACE mqtt WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };
USR mqtt;
Import Cassandra tables:
cqlsh -e "SOURCE 'emqx_backend_cassa.cql'"
Note
KeySpace is free of choice
mqtt.client stores client connection states:
CREATE TABLE mqtt.client ( client_id text, node text, state int, connected timestamp, disconnected timestamp, PRIMARY KEY(client_id) );
Query a client's connection state:
select * from mqtt.client where clientid = ${clientid};
If client 'test' is online:
select * from mqtt.client where clientid = 'test'; client_id | connected | disconnected | node | state -----------+---------------------------------+---------------+---------------+------- test | 2017-02-14 08:27:29.872000+0000 | null | emqx@127.0.0.1| 1
Client 'test' is offline:
select * from mqtt.client where clientid = 'test'; client_id | connected | disconnected | node | state -----------+---------------------------------+---------------------------------+---------------+------- test | 2017-02-14 08:27:29.872000+0000 | 2017-02-14 08:27:35.872000+0000 | emqx@127.0.0.1| 0
mqtt.sub stores subscriptions of clients:
CREATE TABLE mqtt.sub ( client_id text, topic text, qos int, PRIMARY KEY(client_id, topic) );
Client 'test' subscribes to topic 'test_topic1' and 'test_topic2':
insert into mqtt.sub(client_id, topic, qos) values('test', 'test_topic1', 1); insert into mqtt.sub(client_id, topic, qos) values('test', 'test_topic2', 2);
Query subscriptions of a client:
select * from mqtt_sub where clientid = ${clientid};
Query subscriptions of client 'test':
select * from mqtt_sub where clientid = 'test'; client_id | topic | qos -----------+-------------+----- test | test_topic1 | 1 test | test_topic2 | 2
mqtt.msg stores MQTT messages:
CREATE TABLE mqtt.msg ( topic text, msgid text, sender text, qos int, retain int, payload text, arrived timestamp, PRIMARY KEY(topic, msgid) ) WITH CLUSTERING ORDER BY (msgid DESC);
Query messages published by a client:
select * from mqtt_msg where sender = ${clientid};
Query messages published by client 'test':
select * from mqtt_msg where sender = 'test'; topic | msgid | arrived | payload | qos | retain | sender -------+----------------------+---------------------------------+--------------+-----+--------+-------- hello | 2PguFrHsrzEvIIBdctmb | 2017-02-14 09:07:13.785000+0000 | Hello world! | 1 | 0 | test world | 2PguFrHsrzEvIIBdctmb | 2017-02-14 09:07:13.785000+0000 | Hello world! | 1 | 0 | test
mqtt.retain stores retained messages:
CREATE TABLE mqtt.retain ( topic text, msgid text, PRIMARY KEY(topic) );
Query retained messages:
select * from mqtt_retain where topic = ${topic};
Query retained messages with topic 'retain':
select * from mqtt_retain where topic = 'retain'; topic | msgid --------+---------------------- retain | 2PguFrHsrzEvIIBdctmb
mqtt.acked stores acknowledgements from the clients:
CREATE TABLE mqtt.acked ( client_id text, topic text, msgid text, PRIMARY KEY(client_id, topic) );
./bin/emqx_ctl plugins load emqx_backend_cassa
Config file: etc/plugins/emqx_backend_dynamo.conf
## DynamoDB Region
backend.dynamo.region = us-west-2
## DynamoDB Server
backend.dynamo.pool1.server = http://localhost:8000
## DynamoDB Pool Size
backend.dynamo.pool1.pool_size = 8
## AWS ACCESS KEY ID
backend.dynamo.pool1.aws_access_key_id = AKIAU5IM2XOC7AQWG7HK
## AWS SECRET ACCESS KEY
backend.dynamo.pool1.aws_secret_access_key = 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
## DynamoDB Backend Hooks
backend.dynamo.hook.client.connected.1 = {"action": {"function": "on_client_connected"}, "pool": "pool1"}
backend.dynamo.hook.session.created.1 = {"action": {"function": "on_subscribe_lookup"}, "pool": "pool1"}
backend.dynamo.hook.client.disconnected.1 = {"action": {"function": "on_client_disconnected"}, "pool": "pool1"}
backend.dynamo.hook.session.subscribed.1 = {"topic": "#", "action": {"function": "on_message_fetch_for_queue"}, "pool": "pool1"}
backend.dynamo.hook.session.subscribed.2 = {"topic": "#", "action": {"function": "on_retain_lookup"}, "pool": "pool1"}
backend.dynamo.hook.session.unsubscribed.1= {"topic": "#", "action": {"function": "on_acked_delete"}, "pool": "pool1"}
backend.dynamo.hook.message.publish.1 = {"topic": "#", "action": {"function": "on_message_publish"}, "pool": "pool1"}
backend.dynamo.hook.message.publish.2 = {"topic": "#", "action": {"function": "on_message_retain"}, "pool": "pool1"}
backend.dynamo.hook.message.publish.3 = {"topic": "#", "action": {"function": "on_retain_delete"}, "pool": "pool1"}
backend.dynamo.hook.message.acked.1 = {"topic": "#", "action": {"function": "on_message_acked_for_queue"}, "pool": "pool1"}
# backend.dynamo.hook.message.publish.4 = {"topic": "#", "action": {"function": "on_message_store"}, "pool": "pool1"}
hook | topic | action | Description |
---|---|---|---|
client.connected | on_client_connected | Store client connected state | |
client.connected | on_subscribe_lookup | Subscribed topics | |
client.disconnected | on_client_disconnected | Store client disconnected state | |
session.subscribed | # | on_message_fetch_for_queue | Fetch offline messages |
session.subscribed | # | on_retain_lookup | Lookup retained messages |
message.publish | # | on_message_publish | Store published messages |
message.publish | # | on_message_retain | Store retained messages |
message.publish | # | on_retain_delete | Delete retained messages |
message.acked | # | on_message_acked_for_queue | Process ACK |
./test/dynamo_test.sh
Note
DB name is free of choice
mqtt_client stores client connection states:
{
"TableName": "mqtt_client",
"KeySchema": [
{ "AttributeName": "clientid", "KeyType": "HASH" }
],
"AttributeDefinitions": [
{ "AttributeName": "clientid", "AttributeType": "S" }
],
"ProvisionedThroughput": {
"ReadCapacityUnits": 5,
"WriteCapacityUnits": 5
}
}
Query the client connection state:
aws dynamodb scan --table-name mqtt_client --region us-west-2 --endpoint-url http://localhost:8000
{
"Items": [
{
"offline_at": { "N": "0" },
"node": { "S": "emqx@127.0.0.1" },
"clientid": { "S": "mqttjs_384b9c73a9" },
"connect_state": { "N": "1" },
"online_at": { "N": "1562224940" }
}
],
"Count": 1,
"ScannedCount": 1,
"ConsumedCapacity": null
}
mqtt_sub table stores MQTT subscriptions of clients:
{
"TableName": "mqtt_sub",
"KeySchema": [
{ "AttributeName": "clientid", "KeyType": "HASH" },
{ "AttributeName": "topic", "KeyType": "RANGE" }
],
"AttributeDefinitions": [
{ "AttributeName": "clientid", "AttributeType": "S" },
{ "AttributeName": "topic", "AttributeType": "S" }
],
"ProvisionedThroughput": {
"ReadCapacityUnits": 5,
"WriteCapacityUnits": 5
}
}
Query topics subscribed by the client named "test-dynamo":
aws dynamodb scan --table-name mqtt_sub --region us-west-2 --endpoint-url http://localhost:8000
{
"Items": [{"qos": { "N": "2" }, "topic": { "S": "test-dynamo-sub" }, "clientid": { "S": "test-dynamo" }},
{"qos": { "N": "2" }, "topic": { "S": "test-dynamo-sub-1"}, "clientid": { "S": "test-dynamo" }},
{"qos": { "N": "2" }, "topic": { "S": "test-dynamo-sub-2"}, "clientid": { "S": "test-dynamo" }}],
"Count": 3,
"ScannedCount": 3,
"ConsumedCapacity": null
}
mqtt_msg stores MQTT messages:
{
"TableName": "mqtt_msg",
"KeySchema": [
{ "AttributeName": "msgid", "KeyType": "HASH" }
],
"AttributeDefinitions": [
{ "AttributeName": "msgid", "AttributeType": "S" }
],
"ProvisionedThroughput": {
"ReadCapacityUnits": 5,
"WriteCapacityUnits": 5
}
}
mqtt_topic_msg_map stores the mapping between topics and messages:
{
"TableName": "mqtt_topic_msg_map",
"KeySchema": [
{ "AttributeName": "topic", "KeyType": "HASH" }
],
"AttributeDefinitions": [
{ "AttributeName": "topic", "AttributeType": "S" }
],
"ProvisionedThroughput": {
"ReadCapacityUnits": 5,
"WriteCapacityUnits": 5
}
}
Query mqtt_msg and mqtt_topic_msg_map after a client publishes a message to the "test" topic:
Query mqtt_msg:
aws dynamodb scan --table-name mqtt_msg --region us-west-2 --endpoint-url http://localhost:8000
{
"Items": [
{
"arrived": { "N": "1562308553" },
"qos": { "N": "1" },
"sender": { "S": "mqttjs_231b962d5c" },
"payload": { "S": "{ \"msg\": \"Hello, World!\" }"},
"retain": { "N": "0" },
"msgid": { "S": "Mjg4MTk1MDYwNTk0NjYwNzYzMTg4MDk3OTQ2MDU2Nzg1OTD" },
"topic": { "S": "test" }
}
],
"Count": 1,
"ScannedCount": 1,
"ConsumedCapacity": null
}
Query mqtt_topic_msg_map:
aws dynamodb scan --table-name mqtt_topic_msg_map --region us-west-2 --endpoint-url http://localhost:8000
{
"Items": [
{
"topic": { "S": "test" },
"MsgId": { "SS": [ "Mjg4MTk1MDYwNTk0NjYwNzYzMTg4MDk3OTQ2MDU2Nzg1OTD" ]}
}
],
"Count": 1,
"ScannedCount": 1,
"ConsumedCapacity": null
}
mqtt_retain stores retained messages:
{
"TableName": "mqtt_retain",
"KeySchema": [
{ "AttributeName": "topic", "KeyType": "HASH" }
],
"AttributeDefinitions": [
{ "AttributeName": "topic", "AttributeType": "S" }
],
"ProvisionedThroughput": {
"ReadCapacityUnits": 5,
"WriteCapacityUnits": 5
}
}
Query mqtt_retain after a client publishes a message to the "test" topic:
{
"Items": [
{
"arrived": { "N": "1562312113" },
"qos": { "N": "1" },
"sender": { "S": "mqttjs_d0513acfce" },
"payload": { "S": "test" },
"retain": { "N": "1" },
"msgid": { "S": "Mjg4MTk1NzE3MTY4MjYxMjA5MDExMDg0NTk5ODgzMjAyNTH" },
"topic": { "S": "testtopic" }
}
],
"Count": 1,
"ScannedCount": 1,
"ConsumedCapacity": null
}
mqtt_acked stores acknowledgements from the clients:
{
"TableName": "mqtt_acked",
"KeySchema": [
{ "AttributeName": "topic", "KeyType": "HASH" },
{ "AttributeName": "clientid", "KeyType": "RANGE" }
],
"AttributeDefinitions": [
{ "AttributeName": "topic", "AttributeType": "S" },
{ "AttributeName": "clientid", "AttributeType": "S" }
],
"ProvisionedThroughput": {
"ReadCapacityUnits": 5,
"WriteCapacityUnits": 5
}
}
Query mqtt_acked after a client publishes a message to the "test" topic:
{
"Items": [
{
"topic": { "S": "test" },
"msgid": { "S": "Mjg4MTk1MDYwNTk0NjYwNzYzMTg4MDk3OTQ2MDU2Nzg1OTD" },
"clientid": { "S": "mqttjs_861e582a70" }
}
],
"Count": 1,
"ScannedCount": 1,
"ConsumedCapacity": null
}
./bin/emqx_ctl plugins load emqx_backend_dynamo
Config file: etc/plugins/emqx_backend_influxdb.conf:
## InfluxDB UDP Server
backend.influxdb.pool1.server = 127.0.0.1:8089
## InfluxDB Pool Size
backend.influxdb.pool1.pool_size = 5
## Wether to add timestamp automatically
backend.influxdb.pool1.set_timestamp = true
backend.influxdb.hook.message.publish.1 = {"topic": "#", "action": {"function": "on_message_publish"}, "pool": "pool1"}
Parameters in hook rule:
Option | Description |
---|---|
topic | Configure which topics need to execute hooks |
action | Configure specific action for hook, function is a built-in function provided as Backend for general functions |
pool | Pool Name, used to connect multiple InfluxDB servers |
Example:
## Store PUBLISH message whose topic is "sensor/#"
backend.influxdb.hook.message.publish.1 = {"topic": "sensor/#", "action": {"function": "on_message_publish"}, "pool": "pool1"}
## Store PUBLISH message whose topic is "stat/#"
backend.influxdb.hook.message.publish.2 = {"topic": "stat/#", "action": {"function": "on_message_publish"}, "pool": "pool1"}
hook | topic | action | Description |
---|---|---|---|
message.publish | # | on_message_publish | Store published messages |
Since MQTT Message cannot be written directly to InfluxDB, InfluxDB Backend provides an emqx_backend_influxdb.tmpl template file to convert MQTT Message to DataPoint that can be written to InfluxDB.
Template file use Json format:
key
- MQTT Topic, Json String, support wildcard charactersvalue
- Template, Json Object, used to convert MQTT Message intomeasurement,tag_key=tag_value,... field_key=field_value,... timestamp
and write to InfluxDB。
You can define different templates for different topics or multiple templates for the same topic, likes:
{
<Topic 1>: <Template 1>,
<Topic 2>: <Template 2>
}
Template format:
{
"measurement": <Measurement>,
"tags": {
<Tag Key>: <Tag Value>
},
"fields": {
<Field Key>: <Field Value>
},
"timestamp": <Timestamp>
}
measurement
and fields
are required options, tags
and timestamp
are optional.
All values (such as <Measurement>
) can be configured directly in the template as a fixed value that data types supported depending on the table you define. More realistically, of course, you can access the data in the MQTT message through the placeholder we provide.
Currently, we support placeholders as follows:
Placeholder | Description |
---|---|
$id | MQTT Message UUID, assigned by EMQ X |
$clientid | Client ID used by the Client |
$username | Username used by the Client |
$peerhost | IP of Client |
$qos | QoS of MQTT Message |
$topic | Topic of MQTT Message |
$payload | Payload of MQTT Message, must be valid Json data |
$<Number> | It must be used with $paylaod to retrieve data from Json Array |
$timestamp | The timestamp EMQ X sets when preparing to forward messages, precision: Nanoseconds |
You can directly use $content
to obtain the complete message payload, you can use ["$payload", <Key>, ...]
to get the data inside the message payload.
For example payload
is {"data": {"temperature": 23.9}}
, you can via ["$payload", "data", "temperature"]
to get 23.9
.
In the case of array data type in Json, we introduced $0
and $<pos_integer>
, $0
means to get all elements in the array, and $<pos_integer>
means to get the <pos_integer>th element in the array.
A simple example, ["$payload", "$0", "temp"]
will get [20, 21]
from [{"temp": 20}, {"temp": 21}]
, and ["$payload", "$1", "temp"]
will only get 20
.
It is worth noting that when you use $0
, we expect the number of data you get is same. Because we need to convert these arrays into multiple records and write it into InfluxDB, and when you have three pieces of data in one field and two in another, we won't know how to combine the data for you.
Example
data/templates directory provides a sample template (emqx_backend_influxdb_example.tmpl, please remove the "_example" suffix from the filename when using it formally) for the user's reference:
{
"sample": {
"measurement": "$topic",
"tags": {
"host": ["$payload", "data", "$0", "host"],
"region": ["$payload", "data", "$0", "region"],
"qos": "$qos",
"clientid": "$clientid"
},
"fields": {
"temperature": ["$payload", "data", "$0", "temp"]
},
"timestamp": "$timestamp"
}
}
When an MQTT Message whose Topic is "sample" has the following Payload:
{
"data": [
{
"temp": 1,
"host": "serverA",
"region": "hangzhou"
},
{
"temp": 2,
"host": "serverB",
"region": "ningbo"
}
]
}
Backend converts MQTT messages to:
[
{
"measurement": "sample",
"tags": {
"clientid": "mqttjs_ebcc36079a",
"host": "serverA",
"qos": "0",
"region": "hangzhou",
},
"fields": {
"temperature": "1"
},
"timestamp": "1560743513626681000"
},
{
"measurement": "sample",
"tags": {
"clientid": "mqttjs_ebcc36079a",
"host": "serverB",
"qos": "0",
"region": "ningbo",
},
"fields": {
"temperature": "2"
},
"timestamp": "1560743513626681000"
}
]
The data was finally encoded and written to InfluxDB as follows:
"sample,clientid=mqttjs_6990f0e886,host=serverA,qos=0,region=hangzhou temperature=\"1\" 1560745505429670000\nsample,clientid=mqttjs_6990f0e886,host=serverB,qos=0,region=ningbo temperature=\"2\" 1560745505429670000\n"
./bin/emqx_ctl plugins load emqx_backend_influxdb
Config file: etc/plugins/emqx_backend_opentsdb.conf:
## OpenTSDB Server
backend.opentsdb.pool1.server = 127.0.0.1:4242
## OpenTSDB Pool Size
backend.opentsdb.pool1.pool_size = 8
## Whether to return summary info
backend.opentsdb.pool1.summary = true
## Whether to return detailed info
##
## Value: true | false
backend.opentsdb.pool1.details = false
## Synchronous write or not
##
## Value: true | false
backend.opentsdb.pool1.sync = false
## Synchronous write timeout in milliseconds
##
## Value: Duration
##
## Default: 0
backend.opentsdb.pool1.sync_timeout = 0
## Max batch size
##
## Value: Number >= 0
## Default: 20
backend.opentsdb.pool1.max_batch_size = 20
## Store PUBLISH Messages
backend.opentsdb.hook.message.publish.1 = {"topic": "#", "action": {"function": "on_message_publish"}, "pool": "pool1"}
Parameters in hook rule:
Option | Description |
---|---|
topic | Configure which topics need to execute hooks |
action | Configure specific action for hook, function is a built-in function provided as Backend for general functions |
pool | Pool Name, used to connect multiple OpenTSDB servers |
Example:
## Store PUBLISH message whose topic is "sensor/#"
backend.influxdb.hook.message.publish.1 = {"topic": "sensor/#", "action": {"function": "on_message_publish"}, "pool": "pool1"}
## Store PUBLISH message whose topic is "stat/#"
backend.influxdb.hook.message.publish.2 = {"topic": "stat/#", "action": {"function": "on_message_publish"}, "pool": "pool1"}
hook | topic | action | Description |
---|---|---|---|
message.publish | # | on_message_publish | Store published messages |
Since MQTT Message cannot be written directly to OpenTSDB, OpenTSDB Backend provides an emqx_backend_opentsdb.tmpl template file to convert MQTT Message to DataPoint that can be written to OpenTSDB.
Template file use Json format:
key
- MQTT Topic, Json String, support wildcard charactersvalue
- Template, Json Object, used to convert MQTT Message intomeasurement,tag_key=tag_value,... field_key=field_value,... timestamp
and write to InfluxDB。
You can define different templates for different topics or multiple templates for the same topic, likes:
{
<Topic 1>: <Template 1>,
<Topic 2>: <Template 2>
}
The template format is as follows:
{
"measurement": <Measurement>,
"tags": {
<Tag Key>: <Tag Value>
},
"value": <Value>,
"timestamp": <Timestamp>
}
measurement
and value
are required options, tags
and timestamp
are optional.
All values (such as <Measurement>
) can be configured directly in the template as a fixed value that data types supported depending on the table you define. More realistically, of course, you can access the data in the MQTT message through the placeholder we provide.
Currently, we support placeholders as follows:
Placeholder | Description |
---|---|
$id | MQTT Message UUID, assigned by EMQ X |
$clientid | Client ID used by the Client |
$username | Username used by the Client |
$peerhost | IP of Client |
$qos | QoS of MQTT Message |
$topic | Topic of MQTT Message |
$payload | Payload of MQTT Message, must be valid Json data |
$<Number> | It must be used with $paylaod to retrieve data from Json Array |
$timestamp | The timestamp EMQ X sets when preparing to forward messages, precision: Nanoseconds |
You can directly use $content
to obtain the complete message payload, you can use ["$payload", <Key>, ...]
to get the data inside the message payload.
For example payload
is {"data": {"temperature": 23.9}}
, you can via ["$payload", "data", "temperature"]
to get 23.9
.
In the case of array data type in Json, we introduced $0
and $<pos_integer>
, $0
means to get all elements in the array, and $<pos_integer>
means to get the <pos_integer>th element in the array.
A simple example, ["$payload", "$0", "temp"]
will get [20, 21]
from [{"temp": 20}, {"temp": 21}]
, and ["$payload", "$1", "temp"]
will only get 20
.
It is worth noting that when you use $0
, we expect the number of data you get is same. Because we need to convert these arrays into multiple records and write it into OpenTSDB, and when you have three pieces of data in one field and two in another, we won't know how to combine the data for you.
Example
data/templates directory provides a sample template (emqx_backend_opentsdb_example.tmpl, please remove the "_example" suffix from the filename when using it formally) for the user's reference:
{
"sample": {
"measurement": "$topic",
"tags": {
"host": ["$payload", "data", "$0", "host"],
"region": ["$payload", "data", "$0", "region"],
"qos": "$qos",
"clientid": "$clientid"
},
"value": ["$payload", "data", "$0", "temp"],
"timestamp": "$timestamp"
}
}
When an MQTT Message whose Topic is "sample" has the following Payload:
{
"data": [
{
"temp": 1,
"host": "serverA",
"region": "hangzhou"
},
{
"temp": 2,
"host": "serverB",
"region": "ningbo"
}
]
}
Backend converts MQTT messages into the following data and writes it to OpenTSDB:
[
{
"measurement": "sample",
"tags": {
"clientid": "mqttjs_ebcc36079a",
"host": "serverA",
"qos": "0",
"region": "hangzhou",
},
"value": "1",
"timestamp": "1560743513626681000"
},
{
"measurement": "sample",
"tags": {
"clientid": "mqttjs_ebcc36079a",
"host": "serverB",
"qos": "0",
"region": "ningbo",
},
"value": "2",
"timestamp": "1560743513626681000"
}
]
./bin/emqx_ctl plugins load emqx_backend_opentsdb
Config file: etc/plugins/emqx_backend_timescale.conf:
## Timescale Server
backend.timescale.pool1.server = 127.0.0.1:5432
## Timescale Pool Size
backend.timescale.pool1.pool_size = 8
## Timescale Username
backend.timescale.pool1.username = postgres
## Timescale Password
backend.timescale.pool1.password = password
## Timescale Database
backend.timescale.pool1.database = tutorial
## Timescale SSL
backend.timescale.pool1.ssl = false
## SSL keyfile.
##
## Value: File
## backend.timescale.pool1.keyfile =
## SSL certfile.
##
## Value: File
## backend.timescale.pool1.certfile =
## SSL cacertfile.
##
## Value: File
## backend.timescale.pool1.cacertfile =
## Store Publish Message
backend.timescale.hook.message.publish.1 = {"topic": "#", "action": {"function": "on_message_publish"}, "pool": "pool1"}
Parameters in hook rule:
Option | Description |
---|---|
topic | Configure which topics need to execute hooks |
action | Configure specific action for hook, function is a built-in function provided as Backend for general functions |
pool | Pool Name, used to connect multiple Timescale servers |
Example:
## Store PUBLISH message whose topic is "sensor/#"
backend.influxdb.hook.message.publish.1 = {"topic": "sensor/#", "action": {"function": "on_message_publish"}, "pool": "pool1"}
## Store PUBLISH message whose topic is "stat/#"
backend.influxdb.hook.message.publish.2 = {"topic": "stat/#", "action": {"function": "on_message_publish"}, "pool": "pool1"}
hook | topic | action | Description |
---|---|---|---|
message.publish | # | on_message_publish | Store published messages |
Timescale Backend provides the template file named emqx_backend_timescale.tmpl
, which is used to extract data from MQTT messages with different topics for writing to Timescale.
Template file use Json format:
key
- MQTT Topic, Json String, support wildcard charactersvalue
- Template, Json Object, used to convert MQTT Message intomeasurement,tag_key=tag_value,... field_key=field_value,... timestamp
and write to InfluxDB。
You can define different templates for different topics or multiple templates for the same topic, likes:
{
<Topic 1>: <Template 1>,
<Topic 2>: <Template 2>
}
The template format is as follows:
{
"name": <Name of template>,
"sql": <SQL INSERT INTO>,
"param_keys": <Param Keys>
}
name
, sql
and param_keys
are required options.
name
can be any string, just make sure there are no duplicates.
sql
is SQL INSERT INTO statement for Timescale, like insert into sensor_data(time, location, temperature, humidity) values (NOW(), $1, $2, $3)
.
param_keys
is a array, its first element corresponds to $1
appearing in sql
and so on.
Any element in an array can be a fixed value, and the data type it supports depends on the table you define. More realistically, of course, you can access the data in the MQTT message through the placeholder we provide.
Currently, we support placeholders as follows:
Placeholder | Description |
---|---|
$id | MQTT Message UUID, assigned by EMQ X |
$clientid | Client ID used by the Client |
$username | Username used by the Client |
$peerhost | IP of Client |
$qos | QoS of MQTT Message |
$topic | Topic of MQTT Message |
$payload | Payload of MQTT Message, must be valid Json data |
$<Number> | It must be used with $paylaod to retrieve data from Json Array |
$timestamp | The timestamp EMQ X sets when preparing to forward messages, precision: Nanoseconds |
You can directly use $content
to obtain the complete message payload, you can use ["$payload", <Key>, ...]
to get the data inside the message payload.
For example payload
is {"data": {"temperature": 23.9}}
, you can via ["$payload", "data", "temperature"]
to get 23.9
.
In the case of array data type in Json, we introduced $0
and $<pos_integer>
, $0
means to get all elements in the array, and $<pos_integer>
means to get the <pos_integer>th element in the array.
A simple example, ["$payload", "$0", "temp"]
will get [20, 21]
from [{"temp": 20}, {"temp": 21}]
, and ["$payload", "$1", "temp"]
will only get 20
.
It is worth noting that when you use $0
, we expect the number of data you get is same. Because we need to convert these arrays into multiple records and write it into Timescale, and when you have three pieces of data in one field and two in another, we won't know how to combine the data for you.
Example
data/templates directory provides a sample template (emqx_backend_timescale_example.tmpl, please remove the "_example" suffix from the filename when using it formally) for the user's reference:
{
"sensor_data": {
"name": "insert_sensor_data",
"sql": "insert into sensor_data(time, location, temperature, humidity) values (NOW(), $1, $2, $3)",
"param_keys": [
["$payload", "data", "$0", "location"],
["$payload", "data", "$0", "temperature"],
["$payload", "data", "$0", "humidity"]
]
},
"sensor_data2/#": {
"name": "insert_sensor_data2",
"sql": "insert into sensor_data(time, location, temperature, humidity) values (NOW(), $1, $2, $3)",
"param_keys": [
["$payload", "location"],
["$payload", "temperature"],
["$payload", "humidity"]
]
},
"easy_data": {
"name": "insert_easy_data",
"sql": "insert into easy_data(time, data) values (NOW(), $1)",
"param_keys": [
"$payload"
]
}
}
When an MQTT Message whose Topic is "sensor_data" has the following Payload:
{
"data":[
{
"location":"bedroom",
"temperature":21.3,
"humidity":40.3
},
{
"location":"bathroom",
"temperature":22.3,
"humidity":61.8
},
{
"location":"kitchen",
"temperature":29.5,
"humidity":58.7
}
]
}
["$payload", "data", "$0", "location"]
will extract Payload from MQTT Message first.
If the format of Payload is json, backend continue to extract data
from Payload.
And the value of data
is an array, we use $0
to gets all elements in the array.
["$payload", "data", "$0", "location"]
will help us get ["bedroom", "bathroom", "kitchen"]
finally.
Accordingly if you replace $0
with $1
, you get only ["bedroom"]
.
So in this scene, we will get the following SQL statement:
insert into sensor_data(time, location, temperature, humidity) values (NOW(), 'bedroom', 21.3, 40.3)
insert into sensor_data(time, location, temperature, humidity) values (NOW(), 'bathroom', 22.3, 61.8)
insert into sensor_data(time, location, temperature, humidity) values (NOW(), 'kitchen', 29.5, 58.7)
Eventually Timescale Backend executes these SQL statements to write data to Timescale.