Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Publish should not wait for confirmation #48

Open
0x0a11c0de opened this issue Mar 1, 2021 · 10 comments
Open

Publish should not wait for confirmation #48

0x0a11c0de opened this issue Mar 1, 2021 · 10 comments

Comments

@0x0a11c0de
Copy link

The publish() method forces the user to wait for the message to be published. It is perfectly reasonable to publish a message with QOS > 0 when the client is disconnected. For example, on unreliable links, a loop in a thread/task is used to reconnect when the connection goes down. The publisher can continue on publishing at QOS > 0 without knowing anything about the status of the connection. This is not possible with the current publish() implementation and puts a bigger implementation burden on the user of this library.

@frederikaalund
Copy link
Collaborator

Thanks for opening this issue. Let me have a look. :)

It is perfectly reasonable to publish a message with QOS > 0 when the client is disconnected. For example, on unreliable links, a loop in a thread/task is used to reconnect when the connection goes down. The publisher can continue on publishing at QOS > 0 without knowing anything about the status of the connection.

I never considered that use case. Thanks for bringing it to my attention. 👍 Your question in #46 makes a lot more sense to me now.

To support this, we need to make some API-breaking changes. I'm up for it, though.

Make Client work in "offline" mode

Old behaviour:

  • Client.publish only works while online (connected to the server)
  • Client can only connect once and only disconnect once. You have to create a new Client instance if you want to, e.g., reconnect to the server.

New behaviour:

  • Client.publish works while offline (not connected to the server)
  • Client can connect to/disconnect from the server multiple times. We use the existing paho.client.reconnect logic to support this.

An example of the new API that these changes bring with them (based on the "Advanced example"):

async def ensure_connection(client):
    reconnect_interval = 3  # [seconds]
    while True:
        try:
            # Connect to the server
            async with client:
                # We pass the (connected) client to the advanced example
                await advanced_example(client)
        except MqttError as error:
            print(f'Error "{error}". Reconnecting in {reconnect_interval} seconds.')
        finally:
            await asyncio.sleep(reconnect_interval)

async def post_to_topics(client, topics):
    while True:
        for topic in topics:
            message = randrange(100)
            print(f'[topic="{topic}"] Publishing message={message}')
            # Note that we no longer use await here! If the client isn't connected, paho-mqtt queues the messages
            # up internally.
            client.publish(topic, message, qos=1)
            await asyncio.sleep(2)

# There is only a single client instance. We reuse it again and again.
client = Client("test.mosquitto.org")

# Keep the client connected indefinitely
task = asyncio.create_task(ensure_connection(client))
tasks.add(task)

# Publish a random value to each of these topics
topics = (
    "floors/basement/humidity",
    "floors/rooftop/humidity",
    "floors/rooftop/illuminance",
    # 👉 Try to add more topics!
)
task = asyncio.create_task(post_to_topics(client, topics))
tasks.add(task)

Would something like the above example work for you?

@0x0a11c0de
Copy link
Author

As long as the publish can still be done outside of the context manager, i.e. the async with client, I think this design will work.

@frederikaalund
Copy link
Collaborator

This is part of the experimental anyio-based branch. See #44.

@robin-blanchard
Copy link

Regarding this statement,

Client can only connect once and only disconnect once. You have to create a new Client instance if you want to, e.g., reconnect to the server.

Has anything changed in asyncio-mqtt ?

I have no problem with the following code, where I re-use the same client and connect/disconnect every second.

class Publisher:

    def __init__(
        self,
        url,
        topics
    ) -> None:
        self.url = url
        if isinstance(topics, str):
            topics = [topics]
        self.topics = topics

    async def run(self):
        i = 0
        client = Client(self.url)
        while True:
            message = f"Message n{i}"
            async with client:
                for topic in self.topics:
                    await client.publish(topic, message.encode())
            print(client._disconnected.done())  # Returns True
            await asyncio.sleep(1)
            i += 1


if __name__ == "__main__":
    publisher = Publisher(MQTT_BROKER_ADDRESS, "some_topic")

    asyncio.get_event_loop().run_until_complete(publisher.run())

@frederikaalund
Copy link
Collaborator

Hi Robin, thanks for the comment here. :)

I have no problem with the following code, where I re-use the same client and connect/disconnect every second.

For now, asyncio_mqtt.Client is officially a single-use context manager.
We (the asyncio-mqtt maintainers) may at some point announce that asyncio_mqtt.Client is a reusable (but not reentrant) context manager.

It may be that asyncio_mqtt.Client already is a reusable context manager. We (the asyncio-mqtt maintainers) just don't guarantee it yet.

Robin, if you want to, you can make a PR with a suite of tests that proves that asyncio_mqtt.Client is reusable. With those tests in place, we can guarantee this property.

@YAGregor
Copy link
Contributor

YAGregor commented Jan 17, 2023

I don't think this is correct, QoS1 means could make sure that message was sent.It's not reliable to store message in memory, the only way to make sure messages sent is to store it in disk(which is too much work for a mqtt client), and send them one by one, or you MUST accept some message sending may be failed.

@spacemanspiff2007
Copy link
Contributor

Making publish() work outside the client context is very unexpected.
If the goal is to provide more convenience then e.g. a new function publish_with_buffer() would imho make way more sense.

@Lazy-Leopard
Copy link

Curiously asking, why is making it work outside the client context so unexpected, shouldn't a client work like that being connected to the broker, publishes and reads messages when they come, but shouldn't the connection be persistent?

@spacemanspiff2007
Copy link
Contributor

I might have misphrased, the unexpected part is the suggested behavior.:
E.g. I call publish on a disconnected client and after the client connects the queued up publish call goes through. It's not clear at all that publish will behave like that and I even think it's reasonable to expect that this behavior is (sometimes) even undesired.

@Lazy-Leopard
Copy link

Yes @spacemanspiff2007, got your point, makes good sense to me now what you say.
PS: It was misinterpretation on my end, thanks for taking out the time to re-explain.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

6 participants