Skip to content

Commit

Permalink
Improved handling of retries and edge cases
Browse files Browse the repository at this point in the history
  • Loading branch information
cdpuk committed Jan 26, 2024
1 parent 4607684 commit f5b03ff
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 38 deletions.
5 changes: 1 addition & 4 deletions custom_components/givenergy_local/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,7 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
host = entry.data.get(CONF_HOST)

coordinator = GivEnergyUpdateCoordinator(hass, host)
await coordinator.async_refresh()

if not coordinator.last_update_success:
raise ConfigEntryNotReady
await coordinator.async_config_entry_first_refresh()

hass.data.setdefault(DOMAIN, {})[entry.entry_id] = coordinator
await hass.config_entries.async_forward_entry_setups(entry, _PLATFORMS)
Expand Down
4 changes: 3 additions & 1 deletion custom_components/givenergy_local/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def __init__(self, hass: HomeAssistant, host: str) -> None:
hass,
_LOGGER,
name="Inverter",
update_interval=timedelta(seconds=30),
update_interval=timedelta(seconds=10),
)

self.host = host
Expand Down Expand Up @@ -60,6 +60,8 @@ async def _async_update_data(self) -> Plant:
full_refresh=self.require_full_refresh, retries=2
)
except Exception as err:
_LOGGER.error("Disconnecting from inverter due to unexpected refresh error")
await self.client.close()
raise UpdateFailed(f"Error communicating with inverter: {err}") from err

# The connection sometimes returns what it claims is valid data, but many of the values
Expand Down
73 changes: 40 additions & 33 deletions custom_components/givenergy_local/givenergy_modbus/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ async def connect(self) -> None:
)
# asyncio.create_task(self._task_dump_queues_to_files(), name='dump_queues_to_files'),
self.connected = True
_logger.info(f"Connection established to {self.host}:{self.port}")
_logger.info(f"Connection established to %s:%d", self.host, self.port)

async def close(self) -> None:
"""Disconnect from the remote host and clean up tasks and queues."""
Expand Down Expand Up @@ -156,10 +156,11 @@ async def _task_network_consumer(self):
frame = await self.reader.read(300)
# await self.debug_frames['all'].put(frame)
async for message in self.framer.decode(frame):
_logger.debug(f"Processing {message}")
_logger.debug("Processing %s", message)
if isinstance(message, ExceptionBase):
_logger.warning(
f"Expected response never arrived but resulted in exception: {message}"
"Expected response never arrived but resulted in exception: %s",
message,
)
continue
if isinstance(message, HeartbeatRequest):
Expand All @@ -170,16 +171,16 @@ async def _task_network_consumer(self):
continue
if not isinstance(message, TransparentResponse):
_logger.warning(
f"Received unexpected message type for a client: {message}"
"Received unexpected message type for a client: %s", message
)
continue
if isinstance(message, WriteHoldingRegisterResponse):
if message.error:
_logger.warning(f"{message}")
_logger.warning("%s", message)
else:
_logger.info(f"{message}")
_logger.info("%s", message)

future = self.expected_responses.get(message.shape_hash(), None)
future = self.expected_responses.get(message.shape_hash())

if future and not future.done():
future.set_result(message)
Expand All @@ -188,7 +189,10 @@ async def _task_network_consumer(self):
# except RegisterCacheUpdateFailed as e:
# # await self.debug_frames['error'].put(frame)
# _logger.debug(f'Ignoring {message}: {e}')
_logger.critical("network_consumer reader at EOF, cannot continue")
_logger.error(
"network_consumer reader at EOF, cannot continue, closing connection"
)
await self.close()

async def _task_network_producer(self, tx_message_wait: float = 0.25):
"""Producer loop to transmit queued frames with an appropriate delay."""
Expand All @@ -200,7 +204,10 @@ async def _task_network_producer(self, tx_message_wait: float = 0.25):
if future:
future.set_result(True)
await asyncio.sleep(tx_message_wait)
_logger.critical("network_producer writer is closing, cannot continue")
_logger.error(
"network_producer writer is closing, cannot continue, closing connection"
)
await self.close()

# async def _task_dump_queues_to_files(self):
# """Task to periodically dump debug message frames to disk for debugging."""
Expand Down Expand Up @@ -238,26 +245,26 @@ async def send_request_and_await_response(
self, request: TransparentRequest, timeout: float, retries: int
) -> TransparentResponse:
"""Send a request to the remote, await and return the response."""
raw_frame = request.encode()

# mark the expected response
expected_response = request.expected_response()
expected_shape_hash = expected_response.shape_hash()
existing_response_future = self.expected_responses.get(
expected_shape_hash, None
)
if existing_response_future and not existing_response_future.done():
_logger.debug(
"Cancelling existing in-flight request and replacing: %s", request
)
existing_response_future.cancel()
response_future: Future[
TransparentResponse
] = asyncio.get_event_loop().create_future()
self.expected_responses[expected_shape_hash] = response_future

raw_frame = request.encode()

tries = 0
while tries <= retries:
tries += 1
existing_response_future = self.expected_responses.get(expected_shape_hash)
if existing_response_future and not existing_response_future.done():
_logger.debug(
"Cancelling existing in-flight request and replacing: %s", request
)
existing_response_future.cancel()
response_future: Future[
TransparentResponse
] = asyncio.get_event_loop().create_future()
self.expected_responses[expected_shape_hash] = response_future

frame_sent = asyncio.get_event_loop().create_future()
await self.tx_queue.put((raw_frame, frame_sent))
await asyncio.wait_for(
Expand All @@ -268,23 +275,23 @@ async def send_request_and_await_response(
await asyncio.wait_for(response_future, timeout=timeout)
if response_future.done():
response = response_future.result()
if tries > 0:
_logger.debug("Received %s after %d tries", response, tries)
if tries > 1:
_logger.debug("Received %s after %d attempts", response, tries)
if response.error:
_logger.error("Received error response, retrying: %s", response)
else:
return response
except asyncio.TimeoutError:
pass

tries += 1
_logger.debug(
"Timeout awaiting %s (future: %s), attempting retry %d of %d",
expected_response,
response_future,
tries,
retries,
)
if tries <= retries:
_logger.debug(
"Timeout awaiting %s (future: %s), attempting retry %d of %d",
expected_response,
response_future,
tries,
retries,
)

_logger.warning(
"Timeout awaiting %s after %d tries at %ds, giving up",
Expand Down

0 comments on commit f5b03ff

Please sign in to comment.