Skip to content

Commit

Permalink
Use semaphore to prevent concurrent sessions (#30)
Browse files Browse the repository at this point in the history
Instead of just preventing multiple commands from being sent at once,
prevent multiple sessions from being used at the same time.

This should prevent the coordinator from polling new values while a
command is being sent.

Fixes #28
  • Loading branch information
tonyroberts authored Apr 15, 2024
1 parent bfba598 commit b7de28f
Show file tree
Hide file tree
Showing 6 changed files with 41 additions and 42 deletions.
2 changes: 1 addition & 1 deletion custom_components/wundasmart/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ async def _async_update_data(self):
while attempts < max_attempts:
attempts += 1

async with get_session() as session:
async with get_session(self._wunda_ip) as session:
result = await get_devices(
session,
self._wunda_ip,
Expand Down
10 changes: 5 additions & 5 deletions custom_components/wundasmart/climate.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ async def async_added_to_hass(self) -> None:

async def async_set_temperature(self, temperature, **kwargs):
# Set the new target temperature
async with get_session() as session:
async with get_session(self._wunda_ip) as session:
await send_command(
session,
self._wunda_ip,
Expand All @@ -307,7 +307,7 @@ async def async_set_temperature(self, temperature, **kwargs):
async def async_set_hvac_mode(self, hvac_mode: HVACMode):
if hvac_mode == HVACMode.AUTO:
# Set to programmed mode
async with get_session() as session:
async with get_session(self._wunda_ip) as session:
await send_command(
session,
self._wunda_ip,
Expand All @@ -323,7 +323,7 @@ async def async_set_hvac_mode(self, hvac_mode: HVACMode):
})
elif hvac_mode == HVACMode.HEAT:
# Set the target temperature to the t_hi preset temp
async with get_session() as session:
async with get_session(self._wunda_ip) as session:
await send_command(
session,
self._wunda_ip,
Expand All @@ -339,7 +339,7 @@ async def async_set_hvac_mode(self, hvac_mode: HVACMode):
})
elif hvac_mode == HVACMode.OFF:
# Set the target temperature to zero
async with get_session() as session:
async with get_session(self._wunda_ip) as session:
await send_command(
session,
self._wunda_ip,
Expand Down Expand Up @@ -367,7 +367,7 @@ async def async_set_preset_mode(self, preset_mode) -> None:

t_preset = float(self.__state[state_key])

async with get_session() as session:
async with get_session(self._wunda_ip) as session:
await send_command(
session,
self._wunda_ip,
Expand Down
2 changes: 1 addition & 1 deletion custom_components/wundasmart/config_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def __init__(self, hass, wunda_ip, wunda_user, wunda_pass):

async def authenticate(self):
"""Wundasmart Hub class authenticate."""
async with get_session() as session:
async with get_session(self._wunda_ip) as session:
return await get_devices(
session,
self._wunda_ip,
Expand Down
27 changes: 7 additions & 20 deletions custom_components/wundasmart/pywundasmart.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,6 @@

DEVICE_DEFS = {'device_sn', 'prod_sn', 'device_name', 'device_type', 'eth_mac', 'name', 'id', 'i'}

_semaphores = {}

def _get_semaphore(wunda_ip):
"""Return a semaphore object to restrict making concurrent requests to the Wundasmart hub switch."""
semaphore = _semaphores.get(wunda_ip)
if semaphore is None:
semaphore = asyncio.Semaphore(value=1)
_semaphores[wunda_ip] = semaphore
return semaphore


def get_device_id_ranges(hw_version: float):
id_ranges = DEVICE_ID_RANGES.get(int(math.floor(hw_version)))
Expand Down Expand Up @@ -137,10 +127,9 @@ async def get_devices(httpsession: aiohttp.ClientSession, wunda_ip, wunda_user,
# Query the syncvalues API, which returns a list of all sensor values for all devices. Data is formatted as semicolon-separated k;v pairs
wunda_url = f"http://{wunda_ip}/syncvalues.cgi"
try:
async with _get_semaphore(wunda_ip), \
httpsession.get(wunda_url,
auth=aiohttp.BasicAuth(wunda_user, wunda_pass),
timeout=timeout) as resp:
async with httpsession.get(wunda_url,
auth=aiohttp.BasicAuth(wunda_user, wunda_pass),
timeout=timeout) as resp:
status = resp.status
if status == 200:
data = await resp.text()
Expand Down Expand Up @@ -169,12 +158,10 @@ async def send_command(session: aiohttp.ClientSession,
attempts = 0
while attempts < retries:
attempts += 1

async with _get_semaphore(wunda_ip), \
session.get(wunda_url,
auth=aiohttp.BasicAuth(wunda_user, wunda_pass),
params=params,
timeout=timeout) as resp:
async with session.get(wunda_url,
auth=aiohttp.BasicAuth(wunda_user, wunda_pass),
params=params,
timeout=timeout) as resp:
status = resp.status
if status == 200:
return json.loads(await resp.text())
Expand Down
32 changes: 22 additions & 10 deletions custom_components/wundasmart/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,17 @@
import weakref
import socket

# Limit the number of sessions that can be in use at any one time
_semaphores = {}

def _get_semaphore(wunda_ip):
"""Return a semaphore object to restrict making concurrent requests to the Wundasmart hub switch."""
semaphore = _semaphores.get(wunda_ip)
if semaphore is None:
semaphore = asyncio.Semaphore(value=1)
_semaphores[wunda_ip] = semaphore
return semaphore


class ResponseHandler(aiohttp.client_proto.ResponseHandler):
"""Patched ResponseHandler that calls socket.shutdown
Expand Down Expand Up @@ -49,13 +60,14 @@ def __init__(self, *args, **kwargs):


@asynccontextmanager
async def get_session():
connector = TCPConnector(force_close=True, limit=1)
try:
async with aiohttp.ClientSession(connector=connector) as session:
yield session
finally:
await connector.close()
# Zero-sleep to allow underlying connections to close
# https://docs.aiohttp.org/en/stable/client_advanced.html#graceful-shutdown
await asyncio.sleep(0)
async def get_session(wunda_ip=None):
async with _get_semaphore(wunda_ip):
connector = TCPConnector(force_close=True, limit=1)
try:
async with aiohttp.ClientSession(connector=connector) as session:
yield session
finally:
await connector.close()
# Zero-sleep to allow underlying connections to close
# https://docs.aiohttp.org/en/stable/client_advanced.html#graceful-shutdown
await asyncio.sleep(0)
10 changes: 5 additions & 5 deletions custom_components/wundasmart/water_heater.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ async def async_set_operation_mode(self, operation_mode: str) -> None:
if operation_mode:
if operation_mode in HW_OFF_OPERATIONS:
_, duration = _split_operation(operation_mode)
async with get_session() as session:
async with get_session(self._wunda_ip) as session:
await send_command(
session,
self._wunda_ip,
Expand All @@ -242,7 +242,7 @@ async def async_set_operation_mode(self, operation_mode: str) -> None:
})
elif operation_mode in HW_BOOST_OPERATIONS:
_, duration = _split_operation(operation_mode)
async with get_session() as session:
async with get_session(self._wunda_ip) as session:
await send_command(
session,
self._wunda_ip,
Expand All @@ -254,7 +254,7 @@ async def async_set_operation_mode(self, operation_mode: str) -> None:
"hw_boost_time": duration
})
elif operation_mode == OPERATION_AUTO:
async with get_session() as session:
async with get_session(self._wunda_ip) as session:
await send_command(
session,
self._wunda_ip,
Expand All @@ -278,7 +278,7 @@ async def async_set_operation_mode(self, operation_mode: str) -> None:
async def async_set_boost(self, duration: timedelta):
seconds = int((duration.days * 24 * 3600) + math.ceil(duration.seconds))
if seconds > 0:
async with get_session() as session:
async with get_session(self._wunda_ip) as session:
await send_command(
session,
self._wunda_ip,
Expand All @@ -296,7 +296,7 @@ async def async_set_boost(self, duration: timedelta):
async def async_set_off(self, duration: timedelta):
seconds = int((duration.days * 24 * 3600) + math.ceil(duration.seconds))
if seconds > 0:
async with get_session() as session:
async with get_session(self._wunda_ip) as session:
await send_command(
session,
self._wunda_ip,
Expand Down

0 comments on commit b7de28f

Please sign in to comment.