Skip to content

Commit

Permalink
fix puback & pubcomp removal
Browse files Browse the repository at this point in the history
  • Loading branch information
bertmelis committed Jun 7, 2022
1 parent 04320db commit 504f90d
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 12 deletions.
2 changes: 1 addition & 1 deletion library.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
"type": "git",
"url": "https://github.com/bertmelis/espMqttClient.git"
},
"version": "0.0.7",
"version": "0.0.8",
"frameworks": "arduino",
"platforms": ["espressif8266", "espressif32"],
"headers": ["espMqttClient.h"]
Expand Down
2 changes: 1 addition & 1 deletion library.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name=espMqttClient
version=0.0.7
version=0.0.8
author=Bert Melis
maintainer=Bert Melis
sentence=an MQTT client for the Arduino framework for ESP8266 / ESP32
Expand Down
20 changes: 10 additions & 10 deletions src/MqttClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -291,13 +291,13 @@ void MqttClient::_checkOutgoing() {
_bytesSent += written;
emc_log_i("tx %zu/%zu", _bytesSent, packet->size());
if (_bytesSent == packet->size()) {
if ((packet->data(0)[0] & 0xF0) == PacketType.DISCONNECT) _state = DISCONNECTINGTCP;
if (packet->packetId() == 0) {
if ((packet->packetType()) == PacketType.DISCONNECT) _state = DISCONNECTINGTCP;
if (packet->removable()) {
_outbox.removeCurrent();
} else {
// handle with care! millis() returns unsigned 32 bit, token is void*
packet->token = reinterpret_cast<void*>(millis());
if ((packet->data(0)[0] & 0xF0) == PacketType.PUBLISH) packet->setDup();
if ((packet->packetType()) == PacketType.PUBLISH) packet->setDup();
_outbox.next();
}
packet = _outbox.getCurrent();
Expand Down Expand Up @@ -423,7 +423,7 @@ void MqttClient::_onPublish() {
EMC_SEMAPHORE_TAKE();
espMqttClientInternals::Outbox<espMqttClientInternals::Packet>::Iterator it = _outbox.front();
while (it) {
if ((it.get()->data(0)[0] & 0xF0) == PacketType.PUBREC && it.get()->packetId() == packetId) {
if ((it.get()->packetType()) == PacketType.PUBREC && it.get()->packetId() == packetId) {
callback = false;
emc_log_e("QoS2 packet previously delivered");
break;
Expand Down Expand Up @@ -455,7 +455,7 @@ void MqttClient::_onPuback() {
while (it) {
// PUBACKs come in the order PUBs are sent. So we only check the first PUB packet in outbox
// if it doesn't match the ID, return
if ((it.get()->data(0)[0] & 0xF0) == PacketType.PUBLISH) {
if ((it.get()->packetType()) == PacketType.PUBLISH) {
if (it.get()->packetId() == idToMatch) {
callback = true;
_outbox.remove(it);
Expand All @@ -482,7 +482,7 @@ void MqttClient::_onPubrec() {
while (it) {
// PUBRECs come in the order PUBs are sent. So we only check the first PUB packet in outbox
// if it doesn't match the ID, return
if ((it.get()->data(0)[0] & 0xF0) == PacketType.PUBLISH) {
if ((it.get()->packetType()) == PacketType.PUBLISH) {
if (it.get()->packetId() == idToMatch) {
if (!_addPacket(true, PacketType.PUBREL, idToMatch)) {
emc_log_e("Could not create PUBREL packet");
Expand All @@ -508,7 +508,7 @@ void MqttClient::_onPubrel() {
while (it) {
// PUBRELs come in the order PUBRECs are sent. So we only check the first PUBREC packet in outbox
// if it doesn't match the ID, return
if ((it.get()->data(0)[0] & 0xF0) == PacketType.PUBREC) {
if ((it.get()->packetType()) == PacketType.PUBREC) {
if (it.get()->packetId() == idToMatch) {
if (!_addPacket(true, PacketType.PUBCOMP, idToMatch)) {
emc_log_e("Could not create PUBCOMP packet");
Expand All @@ -534,7 +534,7 @@ void MqttClient::_onPubcomp() {
while (it) {
// PUBCOMPs come in the order PUBRELs are sent. So we only check the first PUBREL packet in outbox
// if it doesn't match the ID, return
if ((it.get()->data(0)[0] & 0xF0) == PacketType.PUBREL) {
if ((it.get()->packetType()) == PacketType.PUBREL) {
if (it.get()->packetId() == idToMatch) {
if (!_addPacket(true, PacketType.PUBCOMP, idToMatch)) {
emc_log_e("Could not create PUBCOMP packet");
Expand Down Expand Up @@ -562,7 +562,7 @@ void MqttClient::_onSuback() {
EMC_SEMAPHORE_TAKE();
espMqttClientInternals::Outbox<espMqttClientInternals::Packet>::Iterator it = _outbox.front();
while (it) {
if (((it.get()->data(0)[0] & 0xF0) == PacketType.SUBSCRIBE) && it.get()->packetId() == idToMatch) {
if (((it.get()->packetType()) == PacketType.SUBSCRIBE) && it.get()->packetId() == idToMatch) {
callback = true;
_outbox.remove(it);
break;
Expand Down Expand Up @@ -611,7 +611,7 @@ void MqttClient::_clearQueue(bool clearSession) {
// Spec only mentions PUB and PUBREL but this lib implements method B from point 4.3.3 (Fig. 4.3)
// and stores the packet id in the PUBREC packet. So we also must keep PUBREC.
while (it) {
espMqttClientInternals::MQTTPacketType type = it.get()->data(0)[0] & 0xF0;
espMqttClientInternals::MQTTPacketType type = it.get()->packetType();
if (type == PacketType.PUBREC ||
type == PacketType.PUBREL ||
(type == PacketType.PUBLISH && it.get()->packetId() != 0)) {
Expand Down
11 changes: 11 additions & 0 deletions src/Packets/Packet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,17 @@ uint16_t Packet::packetId() const {
return _packetId;
}

MQTTPacketType Packet::packetType() const {
if (_data) return static_cast<MQTTPacketType>(data(0)[0] & 0xF0);
return static_cast<MQTTPacketType>(0);
}

bool Packet::removable() const {
if (_packetId == 0) return true;
if (((data(0)[0] & 0xF0) == PacketType.PUBACK) || ((data(0)[0] & 0xF0) == PacketType.PUBCOMP)) return true;
return false;
}

Packet::Packet(bool cleanSession,
const char* username,
const char* password,
Expand Down
2 changes: 2 additions & 0 deletions src/Packets/Packet.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ class Packet {
size_t size() const;
void setDup();
uint16_t packetId() const;
MQTTPacketType packetType() const;
bool removable() const;

void* token; // native typeless variable to store any additional data

Expand Down

0 comments on commit 504f90d

Please sign in to comment.