diff --git a/src/paho/mqtt/client.py b/src/paho/mqtt/client.py index f5897328..c7bec50e 100644 --- a/src/paho/mqtt/client.py +++ b/src/paho/mqtt/client.py @@ -836,7 +836,6 @@ def __init__( self._bind_port = 0 self._proxy: Any = {} self._in_callback_mutex = threading.Lock() - self._callback_mutex = threading.RLock() self._msgtime_mutex = threading.Lock() self._out_message_mutex = threading.RLock() self._in_message_mutex = threading.Lock() @@ -1577,8 +1576,7 @@ def reconnect(self) -> MQTTErrorCode: # Put messages in progress in a valid state. self._messages_reconnect_reset() - with self._callback_mutex: - on_pre_connect = self.on_pre_connect + on_pre_connect = self.on_pre_connect if on_pre_connect: try: @@ -2421,8 +2419,7 @@ def on_pre_connect(self) -> CallbackOnPreConnect | None: @on_pre_connect.setter def on_pre_connect(self, func: CallbackOnPreConnect | None) -> None: - with self._callback_mutex: - self._on_pre_connect = func + self._on_pre_connect = func def pre_connect_callback( self, @@ -2487,8 +2484,7 @@ def on_connect(self) -> CallbackOnConnect | None: @on_connect.setter def on_connect(self, func: CallbackOnConnect | None) -> None: - with self._callback_mutex: - self._on_connect = func + self._on_connect = func def connect_callback( self, @@ -2517,8 +2513,7 @@ def on_connect_fail(self) -> CallbackOnConnectFail | None: @on_connect_fail.setter def on_connect_fail(self, func: CallbackOnConnectFail | None) -> None: - with self._callback_mutex: - self._on_connect_fail = func + self._on_connect_fail = func def connect_fail_callback( self, @@ -2567,8 +2562,7 @@ def on_subscribe(self) -> CallbackOnSubscribe | None: @on_subscribe.setter def on_subscribe(self, func: CallbackOnSubscribe | None) -> None: - with self._callback_mutex: - self._on_subscribe = func + self._on_subscribe = func def subscribe_callback( self, @@ -2601,8 +2595,7 @@ def on_message(self) -> CallbackOnMessage | None: @on_message.setter def on_message(self, func: CallbackOnMessage | None) -> None: - with self._callback_mutex: - self._on_message = func + self._on_message = func def message_callback( self, @@ -2657,8 +2650,7 @@ def on_publish(self) -> CallbackOnPublish | None: @on_publish.setter def on_publish(self, func: CallbackOnPublish | None) -> None: - with self._callback_mutex: - self._on_publish = func + self._on_publish = func def publish_callback( self, @@ -2708,8 +2700,7 @@ def on_unsubscribe(self) -> CallbackOnUnsubscribe | None: @on_unsubscribe.setter def on_unsubscribe(self, func: CallbackOnUnsubscribe | None) -> None: - with self._callback_mutex: - self._on_unsubscribe = func + self._on_unsubscribe = func def unsubscribe_callback( self, @@ -2761,8 +2752,7 @@ def on_disconnect(self) -> CallbackOnDisconnect | None: @on_disconnect.setter def on_disconnect(self, func: CallbackOnDisconnect | None) -> None: - with self._callback_mutex: - self._on_disconnect = func + self._on_disconnect = func def disconnect_callback( self, @@ -2793,8 +2783,7 @@ def on_socket_open(self) -> CallbackOnSocket | None: @on_socket_open.setter def on_socket_open(self, func: CallbackOnSocket | None) -> None: - with self._callback_mutex: - self._on_socket_open = func + self._on_socket_open = func def socket_open_callback( self, @@ -2806,8 +2795,7 @@ def decorator(func: CallbackOnSocket) -> CallbackOnSocket: def _call_socket_open(self, sock: SocketLike) -> None: """Call the socket_open callback with the just-opened socket""" - with self._callback_mutex: - on_socket_open = self.on_socket_open + on_socket_open = self.on_socket_open if on_socket_open: with self._in_callback_mutex: @@ -2840,8 +2828,7 @@ def on_socket_close(self) -> CallbackOnSocket | None: @on_socket_close.setter def on_socket_close(self, func: CallbackOnSocket | None) -> None: - with self._callback_mutex: - self._on_socket_close = func + self._on_socket_close = func def socket_close_callback( self, @@ -2853,8 +2840,7 @@ def decorator(func: CallbackOnSocket) -> CallbackOnSocket: def _call_socket_close(self, sock: SocketLike) -> None: """Call the socket_close callback with the about-to-be-closed socket""" - with self._callback_mutex: - on_socket_close = self.on_socket_close + on_socket_close = self.on_socket_close if on_socket_close: with self._in_callback_mutex: @@ -2887,8 +2873,7 @@ def on_socket_register_write(self) -> CallbackOnSocket | None: @on_socket_register_write.setter def on_socket_register_write(self, func: CallbackOnSocket | None) -> None: - with self._callback_mutex: - self._on_socket_register_write = func + self._on_socket_register_write = func def socket_register_write_callback( self, @@ -2903,8 +2888,7 @@ def _call_socket_register_write(self) -> None: if not self._sock or self._registered_write: return self._registered_write = True - with self._callback_mutex: - on_socket_register_write = self.on_socket_register_write + on_socket_register_write = self.on_socket_register_write if on_socket_register_write: try: @@ -2941,8 +2925,7 @@ def on_socket_unregister_write( def on_socket_unregister_write( self, func: CallbackOnSocket | None ) -> None: - with self._callback_mutex: - self._on_socket_unregister_write = func + self._on_socket_unregister_write = func def socket_unregister_write_callback( self, @@ -2963,8 +2946,7 @@ def _call_socket_unregister_write( return self._registered_write = False - with self._callback_mutex: - on_socket_unregister_write = self.on_socket_unregister_write + on_socket_unregister_write = self.on_socket_unregister_write if on_socket_unregister_write: try: @@ -3001,8 +2983,7 @@ def handle_mytopic(client, userdata, message): if callback is None or sub is None: raise ValueError("sub and callback must both be defined.") - with self._callback_mutex: - self._on_message_filtered[sub] = callback + self._on_message_filtered[sub] = callback def topic_callback( self, sub: str @@ -3018,11 +2999,10 @@ def message_callback_remove(self, sub: str) -> None: if sub is None: raise ValueError("sub must defined.") - with self._callback_mutex: - try: - del self._on_message_filtered[sub] - except KeyError: # no such subscription - pass + try: + del self._on_message_filtered[sub] + except KeyError: # no such subscription + pass # ============================================================ # Private functions @@ -3180,8 +3160,7 @@ def _packet_write(self) -> MQTTErrorCode: if packet['to_process'] == 0: if (packet['command'] & 0xF0) == PUBLISH and packet['qos'] == 0: - with self._callback_mutex: - on_publish = self.on_publish + on_publish = self.on_publish if on_publish: with self._in_callback_mutex: @@ -3897,8 +3876,7 @@ def _handle_connack(self) -> MQTTErrorCode: # it won't be the first successful connect any more self._mqttv5_first_connect = False - with self._callback_mutex: - on_connect = self.on_connect + on_connect = self.on_connect if on_connect: flags_dict = {} @@ -4048,8 +4026,7 @@ def _handle_suback(self) -> None: reasoncodes = [ReasonCode(SUBACK >> 4, identifier=c) for c in granted_qos] properties = Properties(SUBACK >> 4) - with self._callback_mutex: - on_subscribe = self.on_subscribe + on_subscribe = self.on_subscribe if on_subscribe: with self._in_callback_mutex: # Don't call loop_write after _send_publish() @@ -4294,8 +4271,7 @@ def _handle_unsuback(self) -> MQTTErrorCode: properties = Properties(UNSUBACK >> 4) self._easy_log(MQTT_LOG_DEBUG, "Received UNSUBACK (Mid: %d)", mid) - with self._callback_mutex: - on_unsubscribe = self.on_unsubscribe + on_unsubscribe = self.on_unsubscribe if on_unsubscribe: with self._in_callback_mutex: @@ -4344,8 +4320,7 @@ def _do_on_disconnect( reason: ReasonCode | None = None, properties: Properties | None = None, ) -> None: - with self._callback_mutex: - on_disconnect = self.on_disconnect + on_disconnect = self.on_disconnect if on_disconnect: with self._in_callback_mutex: @@ -4391,8 +4366,7 @@ def _do_on_disconnect( raise def _do_on_publish(self, mid: int, reason_code: ReasonCode, properties: Properties) -> MQTTErrorCode: - with self._callback_mutex: - on_publish = self.on_publish + on_publish = self.on_publish if on_publish: with self._in_callback_mutex: @@ -4467,14 +4441,13 @@ def _handle_on_message(self, message: MQTTMessage) -> None: topic = None on_message_callbacks = [] - with self._callback_mutex: - if topic is not None: - on_message_callbacks = list(self._on_message_filtered.iter_match(message.topic)) + if topic is not None: + on_message_callbacks = list(self._on_message_filtered.iter_match(message.topic)) - if len(on_message_callbacks) == 0: - on_message = self.on_message - else: - on_message = None + if len(on_message_callbacks) == 0: + on_message = self.on_message + else: + on_message = None for callback in on_message_callbacks: with self._in_callback_mutex: @@ -4502,8 +4475,7 @@ def _handle_on_message(self, message: MQTTMessage) -> None: def _handle_on_connect_fail(self) -> None: - with self._callback_mutex: - on_connect_fail = self.on_connect_fail + on_connect_fail = self.on_connect_fail if on_connect_fail: with self._in_callback_mutex: