Skip to content

Commit

Permalink
multisub - multiunsub
Browse files Browse the repository at this point in the history
  • Loading branch information
bertmelis authored Jun 9, 2022
1 parent 570e0c2 commit d6a896a
Show file tree
Hide file tree
Showing 14 changed files with 347 additions and 161 deletions.
8 changes: 5 additions & 3 deletions examples/ota-esp8266/ota-esp8266.ino
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,14 @@ void onMqttDisconnect(espMqttClientTypes::DisconnectReason reason) {
}
}

void onMqttSubscribe(uint16_t packetId, uint8_t qos) {
void onMqttSubscribe(uint16_t packetId, const espMqttClientTypes::SubscribeReturncode* codes, size_t len) {
Serial.println("Subscribe acknowledged.");
Serial.print(" packetId: ");
Serial.println(packetId);
Serial.print(" qos: ");
Serial.println(qos);
for (size_t i = 0; i < len; ++i) {
Serial.print(" qos: ");
Serial.println(static_cast<uint8_t>(codes[i]));
}
}

void handleUpdate(const uint8_t* payload, size_t length, size_t index, size_t total) {
Expand Down
8 changes: 5 additions & 3 deletions examples/simple-esp32/simple-esp32.ino
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,14 @@ void onMqttDisconnect(espMqttClientTypes::DisconnectReason reason) {
}
}

void onMqttSubscribe(uint16_t packetId, uint8_t qos) {
void onMqttSubscribe(uint16_t packetId, const espMqttClientTypes::SubscribeReturncode* codes, size_t len) {
Serial.println("Subscribe acknowledged.");
Serial.print(" packetId: ");
Serial.println(packetId);
Serial.print(" qos: ");
Serial.println(qos);
for (size_t i = 0; i < len; ++i) {
Serial.print(" qos: ");
Serial.println(static_cast<uint8_t>(codes[i]));
}
}

void onMqttUnsubscribe(uint16_t packetId) {
Expand Down
8 changes: 5 additions & 3 deletions examples/simple-esp8266/simple-esp8266.ino
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,14 @@ void onMqttDisconnect(espMqttClientTypes::DisconnectReason reason) {
}
}

void onMqttSubscribe(uint16_t packetId, uint8_t qos) {
void onMqttSubscribe(uint16_t packetId, const espMqttClientTypes::SubscribeReturncode* codes, size_t len) {
Serial.println("Subscribe acknowledged.");
Serial.print(" packetId: ");
Serial.println(packetId);
Serial.print(" qos: ");
Serial.println(qos);
for (size_t i = 0; i < len; ++i) {
Serial.print(" qos: ");
Serial.println(static_cast<uint8_t>(codes[i]));
}
}

void onMqttUnsubscribe(uint16_t packetId) {
Expand Down
8 changes: 5 additions & 3 deletions examples/tls-esp32/tls-esp32.ino
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,14 @@ void onMqttDisconnect(espMqttClientTypes::DisconnectReason reason) {
}
}

void onMqttSubscribe(uint16_t packetId, uint8_t qos) {
void onMqttSubscribe(uint16_t packetId, const espMqttClientTypes::SubscribeReturncode* codes, size_t len) {
Serial.println("Subscribe acknowledged.");
Serial.print(" packetId: ");
Serial.println(packetId);
Serial.print(" qos: ");
Serial.println(qos);
for (size_t i = 0; i < len; ++i) {
Serial.print(" qos: ");
Serial.println(static_cast<uint8_t>(codes[i]));
}
}

void onMqttUnsubscribe(uint16_t packetId) {
Expand Down
8 changes: 5 additions & 3 deletions examples/tls-esp8266/tls-esp8266.ino
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,14 @@ void onMqttDisconnect(espMqttClientTypes::DisconnectReason reason) {
}
}

void onMqttSubscribe(uint16_t packetId, uint8_t qos) {
void onMqttSubscribe(uint16_t packetId, const espMqttClientTypes::SubscribeReturncode* codes, size_t len) {
Serial.println("Subscribe acknowledged.");
Serial.print(" packetId: ");
Serial.println(packetId);
Serial.print(" qos: ");
Serial.println(qos);
for (size_t i = 0; i < len; ++i) {
Serial.print(" qos: ");
Serial.println(static_cast<uint8_t>(codes[i]));
}
}

void onMqttUnsubscribe(uint16_t packetId) {
Expand Down
2 changes: 1 addition & 1 deletion library.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
"type": "git",
"url": "https://github.com/bertmelis/espMqttClient.git"
},
"version": "0.0.9",
"version": "0.0.10",
"frameworks": "arduino",
"platforms": ["espressif8266", "espressif32"],
"headers": ["espMqttClient.h"]
Expand Down
2 changes: 1 addition & 1 deletion library.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name=espMqttClient
version=0.0.9
version=0.0.10
author=Bert Melis
maintainer=Bert Melis
sentence=an MQTT client for the Arduino framework for ESP8266 / ESP32
Expand Down
14 changes: 2 additions & 12 deletions platformio.ini
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,15 @@ build_flags =
[env:native]
platform = native
test_build_src = yes
build_src_filter =
+<Packets/>
build_flags =
${common.build_flags}
-D EMC_RX_BUFFER_SIZE=10
debug_test = test_outbox
build_type = debug

[env:native-cov]
platform = native
test_build_src = yes
build_src_filter =
+<Packets/>
build_flags =
${common.build_flags}
-lgcov
--coverage
-D EMC_RX_BUFFER_SIZE=10
extra_scripts = test-coverage.py
;extra_scripts = test-coverage.py
debug_test = test_outbox
build_type = debug

[env:esp8266]
Expand Down
38 changes: 3 additions & 35 deletions src/MqttClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,45 +126,13 @@ bool MqttClient::disconnect(bool force) {
return result;
}

uint16_t MqttClient::subscribe(const char* topic, uint8_t qos) {
uint16_t packetId = _getNextPacketId();
if (_state != CONNECTED) {
packetId = 0;
} else {
EMC_SEMAPHORE_TAKE();
if (!_addPacket(topic, qos, packetId)) {
emc_log_e("Could not create SUBSCRIBE packet");
_onError(packetId, Error::OUT_OF_MEMORY);
packetId = 0;
}
EMC_SEMAPHORE_GIVE();
}
return packetId;
}

uint16_t MqttClient::unsubscribe(const char* topic) {
uint16_t packetId = _getNextPacketId();
if (_state != CONNECTED) {
packetId = 0;
} else {
EMC_SEMAPHORE_TAKE();
if (!_addPacket(topic, packetId)) {
emc_log_e("Could not create UNSUBSCRIBE packet");
_onError(packetId, Error::OUT_OF_MEMORY);
packetId = 0;
}
EMC_SEMAPHORE_GIVE();
}
return packetId;
}

uint16_t MqttClient::publish(const char* topic, uint8_t qos, bool retain, const uint8_t* payload, size_t length) {
uint16_t packetId = (qos > 0) ? _getNextPacketId() : 1;
if (_state != CONNECTED) {
packetId = 0;
} else {
EMC_SEMAPHORE_TAKE();
if (!_addPacket(topic, payload, length, qos, retain, packetId)) {
if (!_addPacket(packetId, topic, payload, length, qos, retain)) {
emc_log_e("Could not create PUBLISH packet");
_onError(packetId, Error::OUT_OF_MEMORY);
packetId = 0;
Expand All @@ -185,7 +153,7 @@ uint16_t MqttClient::publish(const char* topic, uint8_t qos, bool retain, espMqt
packetId = 0;
} else {
EMC_SEMAPHORE_TAKE();
if (!_addPacket(topic, callback, length, qos, retain, packetId)) {
if (!_addPacket(packetId, topic, callback, length, qos, retain)) {
emc_log_e("Could not create PUBLISH packet");
_onError(packetId, Error::OUT_OF_MEMORY);
packetId = 0;
Expand Down Expand Up @@ -586,7 +554,7 @@ void MqttClient::_onSuback() {
}
EMC_SEMAPHORE_GIVE();
if (callback) {
if (_onSubscribeCallback) _onSubscribeCallback(idToMatch, *(_parser.getPacket().payload.data));
if (_onSubscribeCallback) _onSubscribeCallback(idToMatch, reinterpret_cast<const espMqttClientTypes::SubscribeReturncode*>(_parser.getPacket().payload.data), _parser.getPacket().payload.total);
} else {
emc_log_w("received SUBACK without SUB");
}
Expand Down
32 changes: 30 additions & 2 deletions src/MqttClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,36 @@ class MqttClient {
bool connected() const;
bool connect();
bool disconnect(bool force = false);
uint16_t subscribe(const char* topic, uint8_t qos);
uint16_t unsubscribe(const char* topic);
template <typename... Args>
uint16_t subscribe(const char* topic, uint8_t qos, Args&&... args) {
uint16_t packetId = _getNextPacketId();
if (_state != CONNECTED) {
packetId = 0;
} else {
EMC_SEMAPHORE_TAKE();
if (!_addPacket(packetId, topic, qos, std::forward<Args>(args) ...)) {
emc_log_e("Could not create SUBSCRIBE packet");
packetId = 0;
}
EMC_SEMAPHORE_GIVE();
}
return packetId;
}
template <typename... Args>
uint16_t unsubscribe(const char* topic, Args&&... args) {
uint16_t packetId = _getNextPacketId();
if (_state != CONNECTED) {
packetId = 0;
} else {
EMC_SEMAPHORE_TAKE();
if (!_addPacket(packetId, topic, std::forward<Args>(args) ...)) {
emc_log_e("Could not create UNSUBSCRIBE packet");
packetId = 0;
}
EMC_SEMAPHORE_GIVE();
}
return packetId;
}
uint16_t publish(const char* topic, uint8_t qos, bool retain, const uint8_t* payload, size_t length);
uint16_t publish(const char* topic, uint8_t qos, bool retain, const char* payload);
uint16_t publish(const char* topic, uint8_t qos, bool retain, espMqttClientTypes::PayloadCallback callback, size_t length);
Expand Down
Loading

0 comments on commit d6a896a

Please sign in to comment.