diff --git a/tests/test_client.py b/tests/test_client.py index d6457fe7..1fd6e656 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -8,7 +8,7 @@ import tests.paho_test as paho_test # Import test fixture -from tests.testsupport.broker import fake_broker # noqa: F401 +from tests.testsupport.broker import FakeBroker, fake_broker # noqa: F401 @pytest.mark.parametrize("proto_ver", [ @@ -137,6 +137,61 @@ def on_connect(mqttc, obj, flags, reason, properties): mqttc.loop_stop() +class TestPublish: + def test_publish_before_connect(self, fake_broker: FakeBroker) -> None: + mqttc = client.Client( + "test_publish_before_connect", + ) + + def on_connect(mqttc, obj, flags, rc): + assert rc == 0 + + mqttc.on_connect = on_connect + + mqttc.loop_start() + mqttc.connect("localhost", fake_broker.port) + mqttc.enable_logger() + + try: + mi = mqttc.publish("test", "testing") + + fake_broker.start() + + packet_in = fake_broker.receive_packet(1) + assert not packet_in # Check connection is closed + # re-call fake_broker.start() to take the 2nd connection done by client + # ... this is probably a bug, when using loop_start/loop_forever + # and doing a connect() before, the TCP connection is opened twice. + fake_broker.start() + + connect_packet = paho_test.gen_connect( + "test_publish_before_connect", keepalive=60, + proto_ver=client.MQTTv311) + packet_in = fake_broker.receive_packet(1000) + assert packet_in # Check connection was not closed + assert packet_in == connect_packet + + connack_packet = paho_test.gen_connack(rc=0) + count = fake_broker.send_packet(connack_packet) + assert count # Check connection was not closed + assert count == len(connack_packet) + + with pytest.raises(RuntimeError): + mi.wait_for_publish(1) + + mqttc.disconnect() + + disconnect_packet = paho_test.gen_disconnect() + packet_in = fake_broker.receive_packet(1000) + assert packet_in # Check connection was not closed + assert packet_in == disconnect_packet + + finally: + mqttc.loop_stop() + + packet_in = fake_broker.receive_packet(1) + assert not packet_in # Check connection is closed + class TestPublishBroker2Client: def test_invalid_utf8_topic(self, fake_broker):