Skip to content

Commit

Permalink
Add a websocket proxy to support JSMPEG on the Frigate Lovelace card (#…
Browse files Browse the repository at this point in the history
…144)

* Add a websocket proxy (for JSMPEG at least).

* Take some inspiration from ingress websocket proxy.
  • Loading branch information
dermotduffy authored Sep 12, 2021
1 parent 609fe87 commit 2bcf2a4
Show file tree
Hide file tree
Showing 3 changed files with 291 additions and 6 deletions.
4 changes: 3 additions & 1 deletion custom_components/frigate/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
STARTUP_MESSAGE,
)
from .views import (
JSMPEGProxyView,
NotificationsProxyView,
SnapshotsProxyView,
VodProxyView,
Expand Down Expand Up @@ -127,8 +128,9 @@ async def async_setup(hass: HomeAssistant, config: Config) -> bool:
hass.data.setdefault(DOMAIN, {})

session = async_get_clientsession(hass)
hass.http.register_view(SnapshotsProxyView(session))
hass.http.register_view(JSMPEGProxyView(session))
hass.http.register_view(NotificationsProxyView(session))
hass.http.register_view(SnapshotsProxyView(session))
hass.http.register_view(VodProxyView(session))
hass.http.register_view(VodSegmentProxyView(session))
return True
Expand Down
97 changes: 97 additions & 0 deletions custom_components/frigate/views.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Frigate HTTP views."""
from __future__ import annotations

import asyncio
from ipaddress import ip_address
import logging
from typing import Any, Optional, cast
Expand Down Expand Up @@ -80,6 +81,10 @@ def get_frigate_instance_id_for_config_entry(
return get_frigate_instance_id(config) if config else None


# These proxies are inspired by:
# - https://github.com/home-assistant/supervisor/blob/main/supervisor/api/ingress.py


class ProxyView(HomeAssistantView): # type: ignore[misc]
"""HomeAssistant view."""

Expand Down Expand Up @@ -281,6 +286,98 @@ async def get(
return await super().get(request, **kwargs)


class WebsocketProxyView(ProxyView):
"""A simple proxy for websockets."""

async def _proxy_msgs(
self,
ws_in: aiohttp.ClientWebSocketResponse | web.WebSocketResponse,
ws_out: aiohttp.ClientWebSocketResponse | web.WebSocketResponse,
) -> None:

async for msg in ws_in:
try:
if msg.type == aiohttp.WSMsgType.TEXT:
await ws_out.send_str(msg.data)
elif msg.type == aiohttp.WSMsgType.BINARY:
await ws_out.send_bytes(msg.data)
elif msg.type == aiohttp.WSMsgType.PING:
await ws_out.ping()
elif msg.type == aiohttp.WSMsgType.PONG:
await ws_out.pong()
except ConnectionResetError:
return

async def _handle_request(
self,
request: web.Request,
path: str,
frigate_instance_id: str | None = None,
**kwargs: Any,
) -> web.Response | web.StreamResponse:
"""Handle route for request."""

config_entry = self._get_config_entry_for_request(request, frigate_instance_id)
if not config_entry:
return web.Response(status=HTTP_BAD_REQUEST)

if not self._permit_request(request, config_entry):
return web.Response(status=HTTP_FORBIDDEN)

full_path = self._create_path(path=path, **kwargs)
if not full_path:
return web.Response(status=HTTP_NOT_FOUND)

req_protocols = []
if hdrs.SEC_WEBSOCKET_PROTOCOL in request.headers:
req_protocols = [
str(proto.strip())
for proto in request.headers[hdrs.SEC_WEBSOCKET_PROTOCOL].split(",")
]

ws_to_user = web.WebSocketResponse(
protocols=req_protocols, autoclose=False, autoping=False
)
await ws_to_user.prepare(request)

# Preparing
url = str(URL(config_entry.data[CONF_URL]) / full_path)
source_header = _init_header(request)

# Support GET query
if request.query_string:
url = f"{url}?{request.query_string}"

async with self._websession.ws_connect(
url,
headers=source_header,
protocols=req_protocols,
autoclose=False,
autoping=False,
) as ws_to_frigate:
await asyncio.wait(
[
self._proxy_msgs(ws_to_frigate, ws_to_user),
self._proxy_msgs(ws_to_user, ws_to_frigate),
],
return_when=asyncio.tasks.FIRST_COMPLETED,
)
return ws_to_user


class JSMPEGProxyView(WebsocketProxyView):
"""A proxy for JSMPEG websocket."""

url = "/api/frigate/{frigate_instance_id:.+}/jsmpeg/{path:.+}"
extra_urls = ["/api/frigate/jsmpeg/{path:.+}"]

name = "api:frigate:jsmpeg"

def _create_path(self, path: str, **kwargs: Any) -> str | None:
"""Create path."""
return f"live/{path}"


def _init_header(request: web.Request) -> CIMultiDict | dict[str, str]:
"""Create initial header."""
headers = {}
Expand Down
196 changes: 191 additions & 5 deletions tests/test_views.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Test the frigate binary sensor."""
from __future__ import annotations

import asyncio
import copy
from datetime import timedelta
import logging
Expand All @@ -12,6 +13,7 @@
from aiohttp.web_exceptions import HTTPUnauthorized
import pytest

from custom_components.frigate import views
from custom_components.frigate.const import (
ATTR_CLIENT_ID,
ATTR_MQTT,
Expand Down Expand Up @@ -98,23 +100,50 @@ async def hass_client_local_frigate(
) -> Any:
"""Point the integration at a local fake Frigate server."""

async def handler(request: web.Request) -> web.Response:
def _assert_expected_headers(request: web.Request, allow_ws: bool = False) -> None:
for header in (
hdrs.CONTENT_LENGTH,
hdrs.CONTENT_ENCODING,
hdrs.SEC_WEBSOCKET_EXTENSIONS,
hdrs.SEC_WEBSOCKET_PROTOCOL,
hdrs.SEC_WEBSOCKET_VERSION,
hdrs.SEC_WEBSOCKET_KEY,
):
assert header not in request.headers

if not allow_ws:
for header in (
hdrs.SEC_WEBSOCKET_EXTENSIONS,
hdrs.SEC_WEBSOCKET_PROTOCOL,
hdrs.SEC_WEBSOCKET_VERSION,
hdrs.SEC_WEBSOCKET_KEY,
):
assert header not in request.headers

for header in (
hdrs.X_FORWARDED_HOST,
hdrs.X_FORWARDED_PROTO,
hdrs.X_FORWARDED_FOR,
):
assert header in request.headers

async def ws_qs_echo_handler(request: web.Request) -> web.WebSocketResponse:
"""Verify the query string and act as echo handler."""
assert request.query["key"] == "value"
return await ws_echo_handler(request)

async def ws_echo_handler(request: web.Request) -> web.WebSocketResponse:
"""Act as echo handler."""
_assert_expected_headers(request, allow_ws=True)

ws = web.WebSocketResponse()
await ws.prepare(request)

async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
await ws.send_str(msg.data)
elif msg.type == aiohttp.WSMsgType.BINARY:
await ws.send_bytes(msg.data)
return ws

async def handler(request: web.Request) -> web.Response:
_assert_expected_headers(request)
return web.json_response({})

server = await start_frigate_server(
Expand All @@ -126,6 +155,8 @@ async def handler(request: web.Request) -> web.Response:
web.get("/api/events/event_id/thumbnail.jpg", handler),
web.get("/api/events/event_id/snapshot.jpg", handler),
web.get("/api/events/event_id/clip.mp4", handler),
web.get("/live/front_door", ws_echo_handler),
web.get("/live/querystring", ws_qs_echo_handler),
],
)

Expand Down Expand Up @@ -493,3 +524,158 @@ async def test_notifications_with_disabled_option(
"/api/frigate/private_id/notifications/event_id/snapshot.jpg"
)
assert resp.status == HTTP_FORBIDDEN


async def test_jsmpeg_text_binary(
hass_client_local_frigate: Any,
hass: Any,
) -> None:
"""Test JSMPEG proxying text/binary data."""
async with hass_client_local_frigate.ws_connect(
f"/api/frigate/{TEST_FRIGATE_INSTANCE_ID}/jsmpeg/front_door"
) as ws:
# Test sending text data.
result = await asyncio.gather(
ws.send_str("hello!"),
ws.receive(),
)
assert result[1].type == aiohttp.WSMsgType.TEXT
assert result[1].data == "hello!"

# Test sending binary data.
result = await asyncio.gather(
ws.send_bytes(b"\x00\x01"),
ws.receive(),
)

assert result[1].type == aiohttp.WSMsgType.BINARY
assert result[1].data == b"\x00\x01"


async def test_jsmpeg_frame_type_ping_pong(
hass_client_local_frigate: Any,
) -> None:
"""Test JSMPEG proxying handles ping-pong."""

async with hass_client_local_frigate.ws_connect(
f"/api/frigate/{TEST_FRIGATE_INSTANCE_ID}/jsmpeg/front_door"
) as ws:
await ws.ping()

# Push some data through after the ping.
result = await asyncio.gather(
ws.send_bytes(b"\x00\x01"),
ws.receive(),
)
assert result[1].type == aiohttp.WSMsgType.BINARY
assert result[1].data == b"\x00\x01"


async def test_ws_proxy_specify_protocol(
hass_client_local_frigate: Any,
) -> None:
"""Test websocket proxy handles the SEC_WEBSOCKET_PROTOCOL header."""

ws = await hass_client_local_frigate.ws_connect(
f"/api/frigate/{TEST_FRIGATE_INSTANCE_ID}/jsmpeg/front_door",
headers={hdrs.SEC_WEBSOCKET_PROTOCOL: "foo,bar"},
)
assert ws
await ws.close()


async def test_ws_proxy_query_string(
hass_client_local_frigate: Any,
) -> None:
"""Test websocket proxy passes on the querystring."""

async with hass_client_local_frigate.ws_connect(
f"/api/frigate/{TEST_FRIGATE_INSTANCE_ID}/jsmpeg/querystring?key=value",
) as ws:
result = await asyncio.gather(
ws.send_str("hello!"),
ws.receive(),
)
assert result[1].type == aiohttp.WSMsgType.TEXT
assert result[1].data == "hello!"


async def test_jsmpeg_connection_reset(
hass_client_local_frigate: Any,
) -> None:
"""Test JSMPEG proxying handles connection resets."""

# Tricky: This test is intended to test a ConnectionResetError to the
# Frigate server, which is the _second_ call to send*. The first call (from
# this test) needs to succeed.
real_send_str = views.aiohttp.web.WebSocketResponse.send_str

called_once = False

async def send_str(*args: Any, **kwargs: Any) -> None:
nonlocal called_once
if called_once:
raise ConnectionResetError
else:
called_once = True
return await real_send_str(*args, **kwargs)

with patch(
"custom_components.frigate.views.aiohttp.ClientWebSocketResponse.send_str",
new=send_str,
):
async with hass_client_local_frigate.ws_connect(
f"/api/frigate/{TEST_FRIGATE_INSTANCE_ID}/jsmpeg/front_door"
) as ws:
await ws.send_str("data")


async def test_ws_proxy_bad_instance_id(
hass_client_local_frigate: Any,
) -> None:
"""Test websocket proxy handles bad instance id."""

resp = await hass_client_local_frigate.get(
"/api/frigate/NOT_A_REAL_ID/jsmpeg/front_door"
)
assert resp.status == HTTP_BAD_REQUEST


async def test_ws_proxy_forbidden(
hass_client_local_frigate: Any,
hass: Any,
) -> None:
"""Test websocket proxy handles forbidden paths."""

# Note: The ability to forbid websocket proxy calls is currently not used,
# but included for completeness and feature combatability with the other
# proxies. As such, there's no 'genuine' way to test this other than mocking
# out the call to _permit_request.

with patch(
"custom_components.frigate.views.WebsocketProxyView._permit_request",
return_value=False,
):
resp = await hass_client_local_frigate.get(
f"/api/frigate/{TEST_FRIGATE_INSTANCE_ID}/jsmpeg/front_door"
)
assert resp.status == HTTP_FORBIDDEN


async def test_ws_proxy_missing_path(
hass_client_local_frigate: Any,
hass: Any,
) -> None:
"""Test websocket proxy handles missing/invalid paths."""

# Note: With current uses of the WebsocketProxy it's not possible to have a
# bad/missing path. Since that may not be the case in future for other views
# that inherit from WebsocketProxyView, this is tested here.
with patch(
"custom_components.frigate.views.JSMPEGProxyView._create_path",
return_value=None,
):
resp = await hass_client_local_frigate.get(
f"/api/frigate/{TEST_FRIGATE_INSTANCE_ID}/jsmpeg/front_door"
)
assert resp.status == HTTP_NOT_FOUND

0 comments on commit 2bcf2a4

Please sign in to comment.