From 77c4e445ea2bb13c787ad7c1ded61ecbcf00ee87 Mon Sep 17 00:00:00 2001 From: Nick O'Leary Date: Wed, 20 May 2020 01:18:02 +0100 Subject: [PATCH] Add setKeepAlive function and tests --- src/PubSubClient.cpp | 24 +++++++++++++++++++++--- src/PubSubClient.h | 2 ++ tests/src/connect_spec.cpp | 34 ++++++++++++++++++++++++++++++++++ 3 files changed, 57 insertions(+), 3 deletions(-) diff --git a/src/PubSubClient.cpp b/src/PubSubClient.cpp index 7efe9b0f..d92922c8 100755 --- a/src/PubSubClient.cpp +++ b/src/PubSubClient.cpp @@ -15,6 +15,7 @@ PubSubClient::PubSubClient() { setCallback(NULL); this->bufferSize = 0; setBufferSize(MQTT_MAX_PACKET_SIZE); + setKeepAlive(MQTT_KEEPALIVE); } PubSubClient::PubSubClient(Client& client) { @@ -23,6 +24,7 @@ PubSubClient::PubSubClient(Client& client) { this->stream = NULL; this->bufferSize = 0; setBufferSize(MQTT_MAX_PACKET_SIZE); + setKeepAlive(MQTT_KEEPALIVE); } PubSubClient::PubSubClient(IPAddress addr, uint16_t port, Client& client) { @@ -32,6 +34,7 @@ PubSubClient::PubSubClient(IPAddress addr, uint16_t port, Client& client) { this->stream = NULL; this->bufferSize = 0; setBufferSize(MQTT_MAX_PACKET_SIZE); + setKeepAlive(MQTT_KEEPALIVE); } PubSubClient::PubSubClient(IPAddress addr, uint16_t port, Client& client, Stream& stream) { this->_state = MQTT_DISCONNECTED; @@ -40,6 +43,7 @@ PubSubClient::PubSubClient(IPAddress addr, uint16_t port, Client& client, Stream setStream(stream); this->bufferSize = 0; setBufferSize(MQTT_MAX_PACKET_SIZE); + setKeepAlive(MQTT_KEEPALIVE); } PubSubClient::PubSubClient(IPAddress addr, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client) { this->_state = MQTT_DISCONNECTED; @@ -49,6 +53,7 @@ PubSubClient::PubSubClient(IPAddress addr, uint16_t port, MQTT_CALLBACK_SIGNATUR this->stream = NULL; this->bufferSize = 0; setBufferSize(MQTT_MAX_PACKET_SIZE); + setKeepAlive(MQTT_KEEPALIVE); } PubSubClient::PubSubClient(IPAddress addr, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client, Stream& stream) { this->_state = MQTT_DISCONNECTED; @@ -58,6 +63,7 @@ PubSubClient::PubSubClient(IPAddress addr, uint16_t port, MQTT_CALLBACK_SIGNATUR setStream(stream); this->bufferSize = 0; setBufferSize(MQTT_MAX_PACKET_SIZE); + setKeepAlive(MQTT_KEEPALIVE); } PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, Client& client) { @@ -67,6 +73,7 @@ PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, Client& client) { this->stream = NULL; this->bufferSize = 0; setBufferSize(MQTT_MAX_PACKET_SIZE); + setKeepAlive(MQTT_KEEPALIVE); } PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, Client& client, Stream& stream) { this->_state = MQTT_DISCONNECTED; @@ -75,6 +82,7 @@ PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, Client& client, Stream& s setStream(stream); this->bufferSize = 0; setBufferSize(MQTT_MAX_PACKET_SIZE); + setKeepAlive(MQTT_KEEPALIVE); } PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client) { this->_state = MQTT_DISCONNECTED; @@ -84,6 +92,7 @@ PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, MQTT_CALLBACK_SIGNATURE, this->stream = NULL; this->bufferSize = 0; setBufferSize(MQTT_MAX_PACKET_SIZE); + setKeepAlive(MQTT_KEEPALIVE); } PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client, Stream& stream) { this->_state = MQTT_DISCONNECTED; @@ -93,6 +102,7 @@ PubSubClient::PubSubClient(uint8_t *ip, uint16_t port, MQTT_CALLBACK_SIGNATURE, setStream(stream); this->bufferSize = 0; setBufferSize(MQTT_MAX_PACKET_SIZE); + setKeepAlive(MQTT_KEEPALIVE); } PubSubClient::PubSubClient(const char* domain, uint16_t port, Client& client) { @@ -102,6 +112,7 @@ PubSubClient::PubSubClient(const char* domain, uint16_t port, Client& client) { this->stream = NULL; this->bufferSize = 0; setBufferSize(MQTT_MAX_PACKET_SIZE); + setKeepAlive(MQTT_KEEPALIVE); } PubSubClient::PubSubClient(const char* domain, uint16_t port, Client& client, Stream& stream) { this->_state = MQTT_DISCONNECTED; @@ -110,6 +121,7 @@ PubSubClient::PubSubClient(const char* domain, uint16_t port, Client& client, St setStream(stream); this->bufferSize = 0; setBufferSize(MQTT_MAX_PACKET_SIZE); + setKeepAlive(MQTT_KEEPALIVE); } PubSubClient::PubSubClient(const char* domain, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client) { this->_state = MQTT_DISCONNECTED; @@ -119,6 +131,7 @@ PubSubClient::PubSubClient(const char* domain, uint16_t port, MQTT_CALLBACK_SIGN this->stream = NULL; this->bufferSize = 0; setBufferSize(MQTT_MAX_PACKET_SIZE); + setKeepAlive(MQTT_KEEPALIVE); } PubSubClient::PubSubClient(const char* domain, uint16_t port, MQTT_CALLBACK_SIGNATURE, Client& client, Stream& stream) { this->_state = MQTT_DISCONNECTED; @@ -128,6 +141,7 @@ PubSubClient::PubSubClient(const char* domain, uint16_t port, MQTT_CALLBACK_SIGN setStream(stream); this->bufferSize = 0; setBufferSize(MQTT_MAX_PACKET_SIZE); + setKeepAlive(MQTT_KEEPALIVE); } PubSubClient::~PubSubClient() { @@ -201,8 +215,8 @@ boolean PubSubClient::connect(const char *id, const char *user, const char *pass } this->buffer[length++] = v; - this->buffer[length++] = ((MQTT_KEEPALIVE) >> 8); - this->buffer[length++] = ((MQTT_KEEPALIVE) & 0xFF); + this->buffer[length++] = ((this->keepAlive) >> 8); + this->buffer[length++] = ((this->keepAlive) & 0xFF); CHECK_STRING_LENGTH(length,id) length = writeString(id,this->buffer,length); @@ -342,7 +356,7 @@ uint32_t PubSubClient::readPacket(uint8_t* lengthLength) { boolean PubSubClient::loop() { if (connected()) { unsigned long t = millis(); - if ((t - lastInActivity > MQTT_KEEPALIVE*1000UL) || (t - lastOutActivity > MQTT_KEEPALIVE*1000UL)) { + if ((t - lastInActivity > this->keepAlive*1000UL) || (t - lastOutActivity > this->keepAlive*1000UL)) { if (pingOutstanding) { this->_state = MQTT_CONNECTION_TIMEOUT; _client->stop(); @@ -731,3 +745,7 @@ boolean PubSubClient::setBufferSize(uint16_t size) { uint16_t PubSubClient::getBufferSize() { return this->bufferSize; } +PubSubClient& PubSubClient::setKeepAlive(uint16_t keepAlive) { + this->keepAlive = keepAlive; + return *this; +} diff --git a/src/PubSubClient.h b/src/PubSubClient.h index bd44f2a9..831eadbc 100755 --- a/src/PubSubClient.h +++ b/src/PubSubClient.h @@ -90,6 +90,7 @@ class PubSubClient : public Print { Client* _client; uint8_t* buffer; uint16_t bufferSize; + uint16_t keepAlive; uint16_t nextMsgId; unsigned long lastOutActivity; unsigned long lastInActivity; @@ -134,6 +135,7 @@ class PubSubClient : public Print { PubSubClient& setCallback(MQTT_CALLBACK_SIGNATURE); PubSubClient& setClient(Client& client); PubSubClient& setStream(Stream& stream); + PubSubClient& setKeepAlive(uint16_t keepAlive); boolean setBufferSize(uint16_t size); uint16_t getBufferSize(); diff --git a/tests/src/connect_spec.cpp b/tests/src/connect_spec.cpp index e27a1f59..e8545c49 100644 --- a/tests/src/connect_spec.cpp +++ b/tests/src/connect_spec.cpp @@ -280,6 +280,38 @@ int test_connect_disconnect_connect() { END_IT } +int test_connect_custom_keepalive() { + IT("sends a properly formatted connect packet with custom keepalive value"); + ShimClient shimClient; + + shimClient.setAllowConnect(true); + byte expectServer[] = { 172, 16, 0, 2 }; + shimClient.expectConnect(expectServer,1883); + + // Set keepalive to 300secs == 0x01 0x2c + byte connect[] = {0x10,0x18,0x0,0x4,0x4d,0x51,0x54,0x54,0x4,0x2,0x01,0x2c,0x0,0xc,0x63,0x6c,0x69,0x65,0x6e,0x74,0x5f,0x74,0x65,0x73,0x74,0x31}; + byte connack[] = { 0x20, 0x02, 0x00, 0x00 }; + + shimClient.expect(connect,26); + shimClient.respond(connack,4); + + PubSubClient client(server, 1883, callback, shimClient); + int state = client.state(); + IS_TRUE(state == MQTT_DISCONNECTED); + + client.setKeepAlive(300); + + int rc = client.connect((char*)"client_test1"); + IS_TRUE(rc); + IS_FALSE(shimClient.error()); + + state = client.state(); + IS_TRUE(state == MQTT_CONNECTED); + + END_IT +} + + int main() { SUITE("Connect"); @@ -298,5 +330,7 @@ int main() test_connect_with_will(); test_connect_with_will_username_password(); test_connect_disconnect_connect(); + + test_connect_custom_keepalive(); FINISH }