Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove superfluous lock #825

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 35 additions & 63 deletions src/paho/mqtt/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -836,7 +836,6 @@ def __init__(
self._bind_port = 0
self._proxy: Any = {}
self._in_callback_mutex = threading.Lock()
self._callback_mutex = threading.RLock()
self._msgtime_mutex = threading.Lock()
self._out_message_mutex = threading.RLock()
self._in_message_mutex = threading.Lock()
Expand Down Expand Up @@ -1577,8 +1576,7 @@ def reconnect(self) -> MQTTErrorCode:
# Put messages in progress in a valid state.
self._messages_reconnect_reset()

with self._callback_mutex:
on_pre_connect = self.on_pre_connect
on_pre_connect = self.on_pre_connect

if on_pre_connect:
try:
Expand Down Expand Up @@ -2421,8 +2419,7 @@ def on_pre_connect(self) -> CallbackOnPreConnect | None:

@on_pre_connect.setter
def on_pre_connect(self, func: CallbackOnPreConnect | None) -> None:
with self._callback_mutex:
self._on_pre_connect = func
self._on_pre_connect = func

def pre_connect_callback(
self,
Expand Down Expand Up @@ -2487,8 +2484,7 @@ def on_connect(self) -> CallbackOnConnect | None:

@on_connect.setter
def on_connect(self, func: CallbackOnConnect | None) -> None:
with self._callback_mutex:
self._on_connect = func
self._on_connect = func

def connect_callback(
self,
Expand Down Expand Up @@ -2517,8 +2513,7 @@ def on_connect_fail(self) -> CallbackOnConnectFail | None:

@on_connect_fail.setter
def on_connect_fail(self, func: CallbackOnConnectFail | None) -> None:
with self._callback_mutex:
self._on_connect_fail = func
self._on_connect_fail = func

def connect_fail_callback(
self,
Expand Down Expand Up @@ -2567,8 +2562,7 @@ def on_subscribe(self) -> CallbackOnSubscribe | None:

@on_subscribe.setter
def on_subscribe(self, func: CallbackOnSubscribe | None) -> None:
with self._callback_mutex:
self._on_subscribe = func
self._on_subscribe = func

def subscribe_callback(
self,
Expand Down Expand Up @@ -2601,8 +2595,7 @@ def on_message(self) -> CallbackOnMessage | None:

@on_message.setter
def on_message(self, func: CallbackOnMessage | None) -> None:
with self._callback_mutex:
self._on_message = func
self._on_message = func

def message_callback(
self,
Expand Down Expand Up @@ -2657,8 +2650,7 @@ def on_publish(self) -> CallbackOnPublish | None:

@on_publish.setter
def on_publish(self, func: CallbackOnPublish | None) -> None:
with self._callback_mutex:
self._on_publish = func
self._on_publish = func

def publish_callback(
self,
Expand Down Expand Up @@ -2708,8 +2700,7 @@ def on_unsubscribe(self) -> CallbackOnUnsubscribe | None:

@on_unsubscribe.setter
def on_unsubscribe(self, func: CallbackOnUnsubscribe | None) -> None:
with self._callback_mutex:
self._on_unsubscribe = func
self._on_unsubscribe = func

def unsubscribe_callback(
self,
Expand Down Expand Up @@ -2761,8 +2752,7 @@ def on_disconnect(self) -> CallbackOnDisconnect | None:

@on_disconnect.setter
def on_disconnect(self, func: CallbackOnDisconnect | None) -> None:
with self._callback_mutex:
self._on_disconnect = func
self._on_disconnect = func

def disconnect_callback(
self,
Expand Down Expand Up @@ -2793,8 +2783,7 @@ def on_socket_open(self) -> CallbackOnSocket | None:

@on_socket_open.setter
def on_socket_open(self, func: CallbackOnSocket | None) -> None:
with self._callback_mutex:
self._on_socket_open = func
self._on_socket_open = func

def socket_open_callback(
self,
Expand All @@ -2806,8 +2795,7 @@ def decorator(func: CallbackOnSocket) -> CallbackOnSocket:

def _call_socket_open(self, sock: SocketLike) -> None:
"""Call the socket_open callback with the just-opened socket"""
with self._callback_mutex:
on_socket_open = self.on_socket_open
on_socket_open = self.on_socket_open

if on_socket_open:
with self._in_callback_mutex:
Expand Down Expand Up @@ -2840,8 +2828,7 @@ def on_socket_close(self) -> CallbackOnSocket | None:

@on_socket_close.setter
def on_socket_close(self, func: CallbackOnSocket | None) -> None:
with self._callback_mutex:
self._on_socket_close = func
self._on_socket_close = func

def socket_close_callback(
self,
Expand All @@ -2853,8 +2840,7 @@ def decorator(func: CallbackOnSocket) -> CallbackOnSocket:

def _call_socket_close(self, sock: SocketLike) -> None:
"""Call the socket_close callback with the about-to-be-closed socket"""
with self._callback_mutex:
on_socket_close = self.on_socket_close
on_socket_close = self.on_socket_close

if on_socket_close:
with self._in_callback_mutex:
Expand Down Expand Up @@ -2887,8 +2873,7 @@ def on_socket_register_write(self) -> CallbackOnSocket | None:

@on_socket_register_write.setter
def on_socket_register_write(self, func: CallbackOnSocket | None) -> None:
with self._callback_mutex:
self._on_socket_register_write = func
self._on_socket_register_write = func

def socket_register_write_callback(
self,
Expand All @@ -2903,8 +2888,7 @@ def _call_socket_register_write(self) -> None:
if not self._sock or self._registered_write:
return
self._registered_write = True
with self._callback_mutex:
on_socket_register_write = self.on_socket_register_write
on_socket_register_write = self.on_socket_register_write

if on_socket_register_write:
try:
Expand Down Expand Up @@ -2941,8 +2925,7 @@ def on_socket_unregister_write(
def on_socket_unregister_write(
self, func: CallbackOnSocket | None
) -> None:
with self._callback_mutex:
self._on_socket_unregister_write = func
self._on_socket_unregister_write = func

def socket_unregister_write_callback(
self,
Expand All @@ -2963,8 +2946,7 @@ def _call_socket_unregister_write(
return
self._registered_write = False

with self._callback_mutex:
on_socket_unregister_write = self.on_socket_unregister_write
on_socket_unregister_write = self.on_socket_unregister_write

if on_socket_unregister_write:
try:
Expand Down Expand Up @@ -3001,8 +2983,7 @@ def handle_mytopic(client, userdata, message):
if callback is None or sub is None:
raise ValueError("sub and callback must both be defined.")

with self._callback_mutex:
self._on_message_filtered[sub] = callback
self._on_message_filtered[sub] = callback
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't a simple assignment. I'm unsure whether this is safe to remove the lock.


def topic_callback(
self, sub: str
Expand All @@ -3018,11 +2999,10 @@ def message_callback_remove(self, sub: str) -> None:
if sub is None:
raise ValueError("sub must defined.")

with self._callback_mutex:
try:
del self._on_message_filtered[sub]
except KeyError: # no such subscription
pass
try:
del self._on_message_filtered[sub]
except KeyError: # no such subscription
pass

# ============================================================
# Private functions
Expand Down Expand Up @@ -3180,8 +3160,7 @@ def _packet_write(self) -> MQTTErrorCode:

if packet['to_process'] == 0:
if (packet['command'] & 0xF0) == PUBLISH and packet['qos'] == 0:
with self._callback_mutex:
on_publish = self.on_publish
on_publish = self.on_publish

if on_publish:
with self._in_callback_mutex:
Expand Down Expand Up @@ -3897,8 +3876,7 @@ def _handle_connack(self) -> MQTTErrorCode:
# it won't be the first successful connect any more
self._mqttv5_first_connect = False

with self._callback_mutex:
on_connect = self.on_connect
on_connect = self.on_connect

if on_connect:
flags_dict = {}
Expand Down Expand Up @@ -4048,8 +4026,7 @@ def _handle_suback(self) -> None:
reasoncodes = [ReasonCode(SUBACK >> 4, identifier=c) for c in granted_qos]
properties = Properties(SUBACK >> 4)

with self._callback_mutex:
on_subscribe = self.on_subscribe
on_subscribe = self.on_subscribe

if on_subscribe:
with self._in_callback_mutex: # Don't call loop_write after _send_publish()
Expand Down Expand Up @@ -4294,8 +4271,7 @@ def _handle_unsuback(self) -> MQTTErrorCode:
properties = Properties(UNSUBACK >> 4)

self._easy_log(MQTT_LOG_DEBUG, "Received UNSUBACK (Mid: %d)", mid)
with self._callback_mutex:
on_unsubscribe = self.on_unsubscribe
on_unsubscribe = self.on_unsubscribe

if on_unsubscribe:
with self._in_callback_mutex:
Expand Down Expand Up @@ -4344,8 +4320,7 @@ def _do_on_disconnect(
reason: ReasonCode | None = None,
properties: Properties | None = None,
) -> None:
with self._callback_mutex:
on_disconnect = self.on_disconnect
on_disconnect = self.on_disconnect

if on_disconnect:
with self._in_callback_mutex:
Expand Down Expand Up @@ -4391,8 +4366,7 @@ def _do_on_disconnect(
raise

def _do_on_publish(self, mid: int, reason_code: ReasonCode, properties: Properties) -> MQTTErrorCode:
with self._callback_mutex:
on_publish = self.on_publish
on_publish = self.on_publish

if on_publish:
with self._in_callback_mutex:
Expand Down Expand Up @@ -4467,14 +4441,13 @@ def _handle_on_message(self, message: MQTTMessage) -> None:
topic = None

on_message_callbacks = []
with self._callback_mutex:
if topic is not None:
on_message_callbacks = list(self._on_message_filtered.iter_match(message.topic))
if topic is not None:
on_message_callbacks = list(self._on_message_filtered.iter_match(message.topic))

if len(on_message_callbacks) == 0:
on_message = self.on_message
else:
on_message = None
if len(on_message_callbacks) == 0:
on_message = self.on_message
else:
on_message = None

for callback in on_message_callbacks:
with self._in_callback_mutex:
Expand Down Expand Up @@ -4502,8 +4475,7 @@ def _handle_on_message(self, message: MQTTMessage) -> None:


def _handle_on_connect_fail(self) -> None:
with self._callback_mutex:
on_connect_fail = self.on_connect_fail
on_connect_fail = self.on_connect_fail

if on_connect_fail:
with self._in_callback_mutex:
Expand Down