From 4cf0747a027871b3fb6a5edb5f2c34a434ae836f Mon Sep 17 00:00:00 2001 From: Virgile Bello Date: Mon, 18 Mar 2024 21:46:55 +0900 Subject: [PATCH] echonetMessage: Make sure waiting[host] is decremented in all situations (including any inner exception) to avoid deadlocks --- pychonet/echonetapiclient.py | 121 +++++++++++++++++------------------ 1 file changed, 60 insertions(+), 61 deletions(-) diff --git a/pychonet/echonetapiclient.py b/pychonet/echonetapiclient.py index 83f4382..56ba2ad 100644 --- a/pychonet/echonetapiclient.py +++ b/pychonet/echonetapiclient.py @@ -282,68 +282,67 @@ async def echonetMessage(self, host, deojgc, deojcc, deojci, esv, opc): tx_tid = self._next_tx_tid message_array["TID"] = tx_tid try: - payload = buildEchonetMsg(message_array) - except TIDError: # Quashing the rollover bug hopefully once and for all... - self._next_tx_tid = 1 - tx_tid = self._next_tx_tid - message_array["TID"] = tx_tid - payload = buildEchonetMsg(message_array) - - opc_count = len(opc) - if no_res: - is_success = True - else: - is_success = False - tid_data = {} - for opc_data in opc: - if opc_data.get("EDT") is not None: - if isinstance(opc_data["EDT"], int): - tid_data[opc_data["EPC"]] = opc_data["EDT"].to_bytes( - opc_data["PDC"], "big" - ) - self._message_list[tx_tid] = tid_data - - self._server.send(payload, (host, ENL_PORT)) - - if not no_res: - not_timeout = False - for x in range(0, self._message_timeout): - # Wait up to 20(0.1*200) seconds depending on the Echonet specifications. - await asyncio.sleep(0.1) - # if tx_tid is not in message list then the message listener has received the message - if self._message_list.get(tx_tid) is None: - # Check OPC count in results - if not is_discover and tx_tid in self._opc_counts: - res_opc_count = self._opc_counts[tx_tid] - del self._opc_counts[tx_tid] - if self._debug_flag: - self._logger( - f"OPC count in results is {res_opc_count}/{opc_count} from IP {host}." + try: + payload = buildEchonetMsg(message_array) + except TIDError: # Quashing the rollover bug hopefully once and for all... + self._next_tx_tid = 1 + tx_tid = self._next_tx_tid + message_array["TID"] = tx_tid + payload = buildEchonetMsg(message_array) + + opc_count = len(opc) + if no_res: + is_success = True + else: + is_success = False + tid_data = {} + for opc_data in opc: + if opc_data.get("EDT") is not None: + if isinstance(opc_data["EDT"], int): + tid_data[opc_data["EPC"]] = opc_data["EDT"].to_bytes( + opc_data["PDC"], "big" ) - if res_opc_count < opc_count: - self._waiting[host] -= 1 - raise EchonetMaxOpcError(res_opc_count) - - # transaction sucessful remove from list - if self._failure_list.get(tx_tid, opc_count) < opc_count: - is_success = True - if tx_tid in self._failure_list: - del self._failure_list[tx_tid] - not_timeout = True - break - self._waiting[host] -= 1 - if not is_success: - if self._message_list.get(tx_tid) is not None: - del self._message_list[tx_tid] - if not is_discover and self._state[host]["available"] != not_timeout: - self._state[host]["available"] = not_timeout - # Call update callback functions - # key is f"{host}-{deojgc}-{deojcc}-{deojci}" - for key in self._update_callbacks: - if key.startswith(host): - for update_func in self._update_callbacks[key]: - await update_func(False) - else: + self._message_list[tx_tid] = tid_data + + self._server.send(payload, (host, ENL_PORT)) + + if not no_res: + not_timeout = False + for x in range(0, self._message_timeout): + # Wait up to 20(0.1*200) seconds depending on the Echonet specifications. + await asyncio.sleep(0.1) + # if tx_tid is not in message list then the message listener has received the message + if self._message_list.get(tx_tid) is None: + # Check OPC count in results + if not is_discover and tx_tid in self._opc_counts: + res_opc_count = self._opc_counts[tx_tid] + del self._opc_counts[tx_tid] + if self._debug_flag: + self._logger( + f"OPC count in results is {res_opc_count}/{opc_count} from IP {host}." + ) + if res_opc_count < opc_count: + raise EchonetMaxOpcError(res_opc_count) + + # transaction sucessful remove from list + if self._failure_list.get(tx_tid, opc_count) < opc_count: + is_success = True + if tx_tid in self._failure_list: + del self._failure_list[tx_tid] + not_timeout = True + break + if not is_success: + if self._message_list.get(tx_tid) is not None: + del self._message_list[tx_tid] + if not is_discover and self._state[host]["available"] != not_timeout: + self._state[host]["available"] = not_timeout + # Call update callback functions + # key is f"{host}-{deojgc}-{deojcc}-{deojci}" + for key in self._update_callbacks: + if key.startswith(host): + for update_func in self._update_callbacks[key]: + await update_func(False) + finally: self._waiting[host] -= 1 return is_success