Skip to content

Commit

Permalink
Revert to aiohttp WebSocket listener
Browse files Browse the repository at this point in the history
  • Loading branch information
mj23000 committed Mar 26, 2024
1 parent 02037e9 commit c11b98c
Show file tree
Hide file tree
Showing 6 changed files with 93 additions and 79 deletions.
2 changes: 1 addition & 1 deletion docs/mozart-api.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3866,7 +3866,7 @@ info:
- [Beosound Level](https://www.bang-olufsen.com/en/dk/speakers/beosound-level)\n\
- [Beosound Theatre](https://www.bang-olufsen.com/en/dk/soundbars/beosound-theatre)\n\
\n\nThis API documentation has been generated for version 3.4.1.8 of the Mozart\
\ platform and API generation version 3.4.1.8.3.\n\n## Python API\n\nThe Python\
\ platform and API generation version 3.4.1.8.4.\n\n## Python API\n\nThe Python\
\ package has been generated using the [OpenAPI Generator](https:/openapi-generator.tech/)\
\ version 7.4.0. On top of the generated API, a helper file, mozart_client.py,\
\ has been made that makes the API more pythonic. We recommend using this.\n\
Expand Down
2 changes: 1 addition & 1 deletion python_client/mozart_api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
""" # noqa: E501


__version__ = "3.4.1.8.3"
__version__ = "3.4.1.8.4"

# import apis into sdk package
from mozart_api.api.beolink_api import BeolinkApi
Expand Down
2 changes: 1 addition & 1 deletion python_client/mozart_api/api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def __init__(
self.default_headers[header_name] = header_value
self.cookie = cookie
# Set default User-Agent.
self.user_agent = "OpenAPI-Generator/3.4.1.8.3/python"
self.user_agent = "OpenAPI-Generator/3.4.1.8.4/python"
self.client_side_validation = configuration.client_side_validation

async def __aenter__(self):
Expand Down
2 changes: 1 addition & 1 deletion python_client/mozart_api/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ def to_debug_report(self):
"OS: {env}\n"
"Python Version: {pyversion}\n"
"Version of the API: 0.2.0\n"
"SDK Package Version: 3.4.1.8.3".format(
"SDK Package Version: 3.4.1.8.4".format(
env=sys.platform, pyversion=sys.version
)
)
Expand Down
162 changes: 88 additions & 74 deletions python_client/mozart_api/mozart_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,22 @@
import logging
import re
from collections import defaultdict
from dataclasses import dataclass
from datetime import time
from typing import Callable

import websockets
from aiohttp.client_exceptions import ClientConnectorError
from typing import Callable, Literal

from aiohttp import ClientSession
from aiohttp.client_exceptions import (
ClientConnectorError,
ClientOSError,
ServerTimeoutError,
)
from inflection import underscore
from mozart_api.api.mozart_api import MozartApi
from mozart_api.api_client import ApiClient
from mozart_api.configuration import Configuration
from mozart_api.exceptions import ApiException
from mozart_api.models import Art, PlaybackContentMetadata
from websockets.exceptions import ConnectionClosed, InvalidHandshake, InvalidURI

WEBSOCKET_TIMEOUT = 5.0

Expand Down Expand Up @@ -161,36 +165,24 @@ async def __aexit__(self, exc_type, exc_value, traceback):

async def _check_websocket_connection(
self,
) -> (
bool
| InvalidHandshake
| OSError
| asyncio.TimeoutError
| InvalidURI
| ConnectionClosed
| ConnectionError
):
) -> Literal[True] | ClientConnectorError | ClientOSError | ServerTimeoutError:
"""Check if a connection can be made to the device's WebSocket notification channel."""
try:
async with websockets.connect(
uri=f"ws://{self.host}:9339/", open_timeout=WEBSOCKET_TIMEOUT
):
pass
except (
InvalidHandshake,
OSError,
asyncio.TimeoutError,
InvalidURI,
ConnectionClosed,
ConnectionError,
) as error:
async with ClientSession(conn_timeout=WEBSOCKET_TIMEOUT) as session:
async with session.ws_connect(
f"ws://{self.host}:9339/",
timeout=WEBSOCKET_TIMEOUT,
receive_timeout=WEBSOCKET_TIMEOUT,
) as websocket:
if await websocket.receive():
return True

except (ClientConnectorError, ClientOSError, ServerTimeoutError) as error:
return error

return True

async def _check_api_connection(
self,
) -> bool | ApiException | ClientConnectorError | TimeoutError:
) -> Literal[True] | ApiException | ClientConnectorError | TimeoutError:
"""Check if a connection can be made to the device's REST API."""
try:
await self.get_battery_state(_request_timeout=3)
Expand All @@ -202,7 +194,17 @@ async def _check_api_connection(
async def check_device_connection(self, raise_error=False) -> bool:
"""Check API and WebSocket connection."""
# Don't use a taskgroup as both tasks should always be checked
tasks: tuple[asyncio.Task] = (
tasks: tuple[
asyncio.Task[
Literal[True]
| ClientConnectorError
| ClientOSError
| ServerTimeoutError
],
asyncio.Task[
Literal[True] | ApiException | ClientConnectorError | TimeoutError
],
] = (
asyncio.create_task(self._check_websocket_connection(), name="websocket"),
asyncio.create_task(self._check_api_connection(), name="REST API"),
)
Expand All @@ -212,17 +214,23 @@ async def check_device_connection(self, raise_error=False) -> bool:
while not task.done():
await asyncio.sleep(0)

connection_available = True
errors = []
errors: list[
ClientConnectorError
| ClientOSError
| ServerTimeoutError
| ApiException
| TimeoutError
] = []

# Check status
for task in tasks:
if task.result() is not True:
connection_available = False
errors.append(task.result())
if (result := task.result()) is not True:
errors.append(result)

connection_available = bool(len(errors) == 0)

# Raise any exceptions
if raise_error and not connection_available:
if not connection_available and raise_error:
raise ExceptionGroup(f"Can't connect to {self.host}", tuple(errors))

return connection_available
Expand All @@ -232,7 +240,6 @@ async def connect_notifications(
) -> None:
"""Start the WebSocket listener task(s)."""
self.websocket_reconnect = reconnect
self.websocket_reconnect = reconnect

# Always add main WebSocket listener
if not self._websocket_listener_active:
Expand Down Expand Up @@ -272,55 +279,62 @@ def disconnect_notifications(self) -> None:

async def _websocket_listener(self, host: str) -> None:
"""WebSocket listener."""
# Ensure automatic reconnect if needed
async for websocket in websockets.connect(
uri=host,
logger=logger,
ping_interval=WEBSOCKET_TIMEOUT,
ping_timeout=WEBSOCKET_TIMEOUT,
):
while True:
try:
self.websocket_connected = True

if self._on_connection:
await self._trigger_callback(self._on_connection)

# While listener is active
while self._websocket_listeners_active:
with contextlib.suppress(asyncio.TimeoutError):
notification = await asyncio.wait_for(
websocket.recv(),
timeout=WEBSOCKET_TIMEOUT,
)
await self._on_message(json.loads(notification))

# Disconnect
self.websocket_connected = False
await websocket.close()
return
async with ClientSession(conn_timeout=WEBSOCKET_TIMEOUT) as session:
async with session.ws_connect(
url=host, heartbeat=WEBSOCKET_TIMEOUT
) as websocket:
self.websocket_connected = True

if self._on_connection:
await self._trigger_callback(self._on_connection)

while self._websocket_listeners_active:
with contextlib.suppress(asyncio.TimeoutError):
# Receive JSON in order to get the Websocket notification name for deserialization
notification = await asyncio.wait_for(
websocket.receive_json(),
timeout=WEBSOCKET_TIMEOUT,
)

# Ensure that any notifications received after the disconnect command has been executed are not processed
if not self._websocket_listeners_active:
break

await self._on_message(notification)

# Disconnect
self.websocket_connected = False
await websocket.close()
return

except (
InvalidHandshake,
OSError,
asyncio.TimeoutError,
InvalidURI,
ConnectionClosed,
ConnectionError,
ClientConnectorError,
ClientOSError,
TypeError,
ServerTimeoutError,
) as error:
if self.websocket_connected:
self.websocket_connected = False
logger.debug("%s : %s - %s", host, type(error), error)
self.websocket_connected = False

if self._on_connection_lost:
await self._trigger_callback(self._on_connection_lost)

if not self.websocket_reconnect:
logger.error("%s : %s - %s", host, type(error), error)
self.disconnect_notifications()
return
if not self.websocket_reconnect:
logger.error("%s : %s - %s", host, type(error), error)
self.disconnect_notifications()
return

await asyncio.sleep(WEBSOCKET_TIMEOUT)

@dataclass
class _ResponseWrapper:
"""Wrapper class for deserializing WebSocket response."""

data: str

async def _on_message(self, notification) -> None:
"""Handle WebSocket notifications."""

Expand All @@ -329,7 +343,7 @@ async def _on_message(self, notification) -> None:
notification_type = notification["eventType"]

deserialized_data = self.api_client.deserialize(
json.dumps(notification), notification_type
self._ResponseWrapper(json.dumps(notification)), notification_type
).event_data

except (ValueError, AttributeError) as error:
Expand Down
2 changes: 1 addition & 1 deletion python_client/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "mozart_api"
version = "3.4.1.8.3"
version = "3.4.1.8.4"
description = "Mozart platform API"
authors = [
"BangOlufsen <support@bang-olufsen.dk>",
Expand Down

0 comments on commit c11b98c

Please sign in to comment.