Skip to content

Commit

Permalink
Emplace packets in queue
Browse files Browse the repository at this point in the history
  • Loading branch information
bertmelis authored May 30, 2022
1 parent 5f42961 commit 72658a4
Show file tree
Hide file tree
Showing 6 changed files with 107 additions and 128 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/cppcheck.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,5 @@ jobs:
pip install platformio
- name: Cppcheck
run: |
pio check --fail-on-defect=medium --flags "--suppress=unusedFunction --inline-suppr" --skip-packages
#pio check --fail-on-defect=high --fail-on-defect=medium --flags "--suppress=unusedFunction" --skip-packages
pio check --flags "--suppress=unusedFunction" --skip-packages
122 changes: 45 additions & 77 deletions src/MqttClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,29 +86,29 @@ bool MqttClient::connected() const {
bool MqttClient::connect() {
bool result = true;
if (_state == DISCONNECTED) {
_state = CONNECTINGTCP;
Packet packet(_cleanSession,
_username,
_password,
_willTopic,
_willRetain,
_willQos,
_willPayload,
_willPayloadLength,
_keepAlive,
_clientId);
if (packet.size() > 0) {
EMC_SEMAPHORE_TAKE();
_outbox.addFront(std::move(packet));
EMC_SEMAPHORE_GIVE();
#if defined(ESP32)
EMC_SEMAPHORE_TAKE();
if (_addPacket(false,
_cleanSession,
_username,
_password,
_willTopic,
_willRetain,
_willQos,
_willPayload,
_willPayloadLength,
_keepAlive,
_clientId)) {
#if defined(ESP32)
vTaskResume(_taskHandle);
#endif
#endif
_state = CONNECTINGTCP;
} else {
EMC_SEMAPHORE_GIVE();
emc_log_e("Could not create CONNECT packet");
_onError(0, Error::OUT_OF_MEMORY);
result = false;
}
EMC_SEMAPHORE_GIVE();
}
return result;
}
Expand All @@ -121,7 +121,7 @@ bool MqttClient::disconnect(bool force) {
if (force) {
_state = DISCONNECTINGTCP;
} else {
_state = DISCONNECTINGMQTT;
_state = DISCONNECTINGMQTT1;
}
}
return result;
Expand All @@ -132,16 +132,13 @@ uint16_t MqttClient::subscribe(const char* topic, uint8_t qos) {
if (_state != CONNECTED) {
packetId = 0;
} else {
Packet packet(topic, qos, packetId);
if (packet.size() > 0) {
EMC_SEMAPHORE_TAKE();
_outbox.add(std::move(packet));
EMC_SEMAPHORE_GIVE();
} else {
EMC_SEMAPHORE_TAKE();
if (!_addPacket(true, topic, qos, packetId)) {
emc_log_e("Could not create SUBSCRIBE packet");
_onError(packetId, Error::OUT_OF_MEMORY);
packetId = 0;
}
EMC_SEMAPHORE_GIVE();
}
return packetId;
}
Expand All @@ -151,16 +148,13 @@ uint16_t MqttClient::unsubscribe(const char* topic) {
if (_state != CONNECTED) {
packetId = 0;
} else {
Packet packet(topic, packetId);
if (packet.size() > 0) {
EMC_SEMAPHORE_TAKE();
_outbox.add(std::move(packet));
EMC_SEMAPHORE_GIVE();
} else {
EMC_SEMAPHORE_TAKE();
if (!_addPacket(true, topic, packetId)) {
emc_log_e("Could not create UNSUBSCRIBE packet");
_onError(packetId, Error::OUT_OF_MEMORY);
packetId = 0;
}
EMC_SEMAPHORE_GIVE();
}
return packetId;
}
Expand All @@ -170,16 +164,13 @@ uint16_t MqttClient::publish(const char* topic, uint8_t qos, bool retain, const
if (_state != CONNECTED) {
packetId = 0;
} else {
Packet packet(topic, payload, length, qos, retain, packetId);
if (packet.size() > 0) {
EMC_SEMAPHORE_TAKE();
_outbox.add(std::move(packet));
EMC_SEMAPHORE_GIVE();
} else {
EMC_SEMAPHORE_TAKE();
if (!_addPacket(true, topic, payload, length, qos, retain, packetId)) {
emc_log_e("Could not create PUBLISH packet");
_onError(packetId, Error::OUT_OF_MEMORY);
packetId = 0;
}
EMC_SEMAPHORE_GIVE();
}
return packetId;
}
Expand Down Expand Up @@ -215,19 +206,20 @@ void MqttClient::loop() {
_disconnectReason = DisconnectReason::TCP_DISCONNECTED;
}
break;
case DISCONNECTINGMQTT:
case DISCONNECTINGMQTT1:
EMC_SEMAPHORE_TAKE();
if (_outbox.empty()) {
Packet packet(PacketType.DISCONNECT);
if (packet.size() > 0) {
_outbox.add(std::move(packet));
} else {
if (!_addPacket(true, PacketType.DISCONNECT)) {
EMC_SEMAPHORE_GIVE();
emc_log_e("Could not create DISCONNECT packet");
_onError(0, Error::OUT_OF_MEMORY);
} else {
_state = DISCONNECTINGMQTT2;
}
}
EMC_SEMAPHORE_GIVE();
// fall through to CONNECTED to send out DISCONN packet
case DISCONNECTINGMQTT2:
case CONNECTINGMQTT:
// receipt of CONNACK packet will set state to CONNECTED
// client however is allowed to send packets before CONNACK is received
Expand All @@ -251,7 +243,7 @@ void MqttClient::loop() {
_transport->stop(0);
#endif
_state = DISCONNECTED;
_onDisconnect();
if (_onDisconnectCallback) _onDisconnectCallback(_disconnectReason);
break;
// all cases covered, no default case
}
Expand Down Expand Up @@ -340,7 +332,7 @@ void MqttClient::_checkIncoming() {
}
break;
case PacketType.PUBLISH:
if (_state == DISCONNECTINGMQTT) break; // stop processing incoming once user has called disconnect
if (_state == DISCONNECTINGMQTT1 || _state == DISCONNECTINGMQTT2) break; // stop processing incoming once user has called disconnect
_onPublish();
break;
case PacketType.PUBACK:
Expand Down Expand Up @@ -391,12 +383,7 @@ void MqttClient::_checkPing() {
// send ping when client was inactive for 0.7 times the keepalive time
if (millis() - _lastClientActivity > 700 * _keepAlive) {
emc_log_i("Near keepalive, sending PING");
Packet packet(PacketType.PINGREQ);
if (packet.size() > 0) {
EMC_SEMAPHORE_TAKE();
_outbox.add(std::move(packet));
EMC_SEMAPHORE_GIVE();
} else {
if (!_addPacket(true, PacketType.PINGREQ)) {
emc_log_e("Could not create PING packet");
}
}
Expand Down Expand Up @@ -427,14 +414,11 @@ void MqttClient::_onPublish() {
bool callback = true;
if (qos == 1) {
if (p.payload.index + p.payload.length == p.payload.total) {
Packet packet(PacketType.PUBACK, packetId);
if (packet.size() > 0) {
EMC_SEMAPHORE_TAKE();
_outbox.add(std::move(packet));
EMC_SEMAPHORE_GIVE();
} else {
EMC_SEMAPHORE_TAKE();
if (!_addPacket(true, PacketType.PUBACK, packetId)) {
emc_log_e("Could not create PUBACK packet");
}
EMC_SEMAPHORE_GIVE();
}
} else if (qos == 2) {
EMC_SEMAPHORE_TAKE();
Expand All @@ -448,12 +432,11 @@ void MqttClient::_onPublish() {
++it;
}
if (p.payload.index + p.payload.length == p.payload.total) {
Packet packet(PacketType.PUBREC, packetId);
if (packet.size() > 0) {
_outbox.add(std::move(packet));
} else {
EMC_SEMAPHORE_TAKE();
if (!_addPacket(true, PacketType.PUBREC, packetId)) {
emc_log_e("Could not create PUBREC packet");
}
EMC_SEMAPHORE_GIVE();
}
EMC_SEMAPHORE_GIVE();
}
Expand Down Expand Up @@ -502,10 +485,7 @@ void MqttClient::_onPubrec() {
// if it doesn't match the ID, return
if ((it.data()->data(0)[0] & 0xF0) == PacketType.PUBLISH) {
if (it.data()->packetId() == idToMatch) {
Packet packet(PacketType.PUBREL, idToMatch);
if (packet.size() > 0) {
_outbox.add(std::move(packet));
} else {
if (!_addPacket(true, PacketType.PUBREL, idToMatch)) {
emc_log_e("Could not create PUBREL packet");
}
_outbox.remove(it);
Expand All @@ -531,10 +511,7 @@ void MqttClient::_onPubrel() {
// if it doesn't match the ID, return
if ((it.data()->data(0)[0] & 0xF0) == PacketType.PUBREC) {
if (it.data()->packetId() == idToMatch) {
Packet packet(PacketType.PUBCOMP, idToMatch);
if (packet.size() > 0) {
_outbox.add(std::move(packet));
} else {
if (!_addPacket(true, PacketType.PUBCOMP, idToMatch)) {
emc_log_e("Could not create PUBCOMP packet");
}
_outbox.remove(it);
Expand All @@ -560,10 +537,7 @@ void MqttClient::_onPubcomp() {
// if it doesn't match the ID, return
if ((it.data()->data(0)[0] & 0xF0) == PacketType.PUBREL) {
if (it.data()->packetId() == idToMatch) {
Packet packet(PacketType.PUBCOMP, idToMatch);
if (packet.size() > 0) {
_outbox.add(std::move(packet));
} else {
if (!_addPacket(true, PacketType.PUBCOMP, idToMatch)) {
emc_log_e("Could not create PUBCOMP packet");
}
callback = true;
Expand Down Expand Up @@ -625,12 +599,6 @@ void MqttClient::_onUnsuback() {
}
}

void MqttClient::_onDisconnect() {
if (_onDisconnectCallback) {
_onDisconnectCallback(_disconnectReason);
}
}

void MqttClient::_clearQueue(bool clearSession) {
EMC_SEMAPHORE_TAKE();
espMqttClientInternals::Outbox<espMqttClientInternals::Packet>::Iterator it = _outbox.front();
Expand Down
14 changes: 12 additions & 2 deletions src/MqttClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ class MqttClient {
CONNECTINGTCP,
CONNECTINGMQTT,
CONNECTED,
DISCONNECTINGMQTT,
DISCONNECTINGMQTT1,
DISCONNECTINGMQTT2,
DISCONNECTINGTCP
};
std::atomic<State> _state;
Expand All @@ -112,6 +113,16 @@ class MqttClient {

uint16_t _getNextPacketId();

template <typename... Args>
bool _addPacket(bool addBack, Args&&... args) {
if (addBack) {
if (_outbox.emplace(std::forward<Args>(args) ...)) return true;
} else {
if (_outbox.emplaceFront(std::forward<Args>(args) ...)) return true;
}
return false;
}

void _checkOutgoing();
void _checkIncoming();
void _checkPing();
Expand All @@ -124,7 +135,6 @@ class MqttClient {
void _onPubcomp();
void _onSuback();
void _onUnsuback();
void _onDisconnect();

void _clearQueue(bool clearSession);
void _onError(uint16_t packetId, espMqttClientTypes::Error error);
Expand Down
71 changes: 44 additions & 27 deletions src/Outbox.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ class Outbox {

struct Node {
public:
explicit Node(T item)
: item(std::move(item))
template <typename... Args>
explicit Node(Args&&... args)
: item(std::forward<Args>(args) ...)
, prev(nullptr)
, next(nullptr) {
// empty
Expand Down Expand Up @@ -69,35 +70,51 @@ class Outbox {
};

// add node to back, advance current to new if applicable
void add(T&& item) {
Node* node = new Node(std::move(item));
if (!_first) {
// queue is empty
_first = _last = node;
} else {
// queue has at least one item
node->prev = _last;
_last->next = node;
_last = node;
}
// advance current to newly created if applicable
if (!_current) {
_current = _last;
template <class... Args>
Iterator emplace(Args&&... args) {
Iterator it;
Node* node = new (std::nothrow) Node(std::forward<Args>(args) ...);
if (node != nullptr) {
if (!_first) {
// queue is empty
_first = _last = node;
} else {
// queue has at least one item
node->prev = _last;
_last->next = node;
_last = node;
}
// advance current to newly created if applicable
if (!_current) {
_current = _last;
}
it = front();
}
return it;
}

// add item to front, current points to newly created front
void addFront(T&& item) {
Node* node = new Node(std::move(item));
if (!_first) {
// queue is empty
_last = node;
} else {
// queue has at least one
node->next = _first;
_first->prev = node;
// add item to front, current points to newly created front.
template <class... Args>
Iterator emplaceFront(Args&&... args) {
Iterator it;
Node* node = new (std::nothrow) Node(std::forward<Args>(args) ...);
if (node != nullptr) {
if (!_first) {
// queue is empty
_first = _last = node;
} else {
// queue has at least one item
node->prev = _last;
_last->next = node;
_last = node;
}
// advance current to newly created if applicable
if (!_current) {
_current = _last;
}
it = back();
}
_current = _first = node;
return it;
}

// remove node at iterator, iterator points to next
Expand Down
Loading

0 comments on commit 72658a4

Please sign in to comment.