diff --git a/src/olympe/arsdkng/proto.py b/src/olympe/arsdkng/proto.py index 043cf74..6bf5408 100644 --- a/src/olympe/arsdkng/proto.py +++ b/src/olympe/arsdkng/proto.py @@ -318,11 +318,13 @@ def message_type_from_field(self, feature_name, field_descriptor): for enum_type in message_type.enum_types: package = message_type.file.package path = enum_type.full_name[len(package) + 1 :] - feature_name = enum_type.full_name[ - : len(enum_type.full_name) - len(path) - 1 - ] enum_types.append(self._make_enum(feature_name, path, enum_type)) - self.features_package_map[message_type.file.package] = feature_name + if message_type.file.package not in self.features_package_map: + self.features_package_map[message_type.file.package] = feature_name + else: + assert ( + self.features_package_map[message_type.file.package] == feature_name + ) return message_type, enum_types def list_oneof_messages(self, feature_name, oneof_descriptor): @@ -648,7 +650,12 @@ def parse_proto( ) feature_name = filename if module.DESCRIPTOR.package: - self.features_package_map[module.DESCRIPTOR.package] = feature_name + if module.DESCRIPTOR.package not in self.features_package_map: + self.features_package_map[module.DESCRIPTOR.package] = feature_name + else: + assert ( + self.features_package_map[module.DESCRIPTOR.package] == feature_name + ) services = [] if hasattr(module, "Command"): services.append( diff --git a/src/olympe/concurrent/_task.py b/src/olympe/concurrent/_task.py index d76642e..ad48a05 100644 --- a/src/olympe/concurrent/_task.py +++ b/src/olympe/concurrent/_task.py @@ -18,6 +18,7 @@ import concurrent import inspect +import threading from collections import defaultdict from dataclasses import dataclass, field, fields from .future import Future @@ -67,12 +68,17 @@ def cancel(self): return True def _step_blocking_impl(self, blocking, result): + assert self._loop is not None + assert threading.current_thread() is self._loop # Yielded Future must come from Future.__iter__(). if isinstance(result, Future) and result._loop is not self._loop: - new_exc = RuntimeError( - f"Task {self!r} got Future {result!r} attached to a different loop" - ) - self._loop.run_later(self.step, new_exc) + if result._loop is None: + new_exc = RuntimeError( + f"Task {self!r} got Future {result!r} not attached to any loop" + ) + self._loop.run_later(self.step, new_exc) + else: + result.add_done_callback(self._wakeup) elif blocking: if result is self: new_exc = RuntimeError(f"Task cannot await on itself: {self!r}") diff --git a/src/olympe/controller.py b/src/olympe/controller.py index 1411667..fe4ba25 100644 --- a/src/olympe/controller.py +++ b/src/olympe/controller.py @@ -31,17 +31,22 @@ import olympe_deps as od from .arsdkng.cmd_itf import Connect, Disconnect, Connected, Disconnected # noqa from .arsdkng.controller import ControllerBase +from .arsdkng.backend import BackendType from .mixins.streaming import StreamingControllerMixin from .mixins.media import MediaControllerMixin from .mixins.mission import MissionControllerMixin +from .mixins.ipproxy import IpProxyMixin +from .mixins.cellular import CellularPairerMixin from .utils import callback_decorator class ControllerBase( - StreamingControllerMixin, - MissionControllerMixin, - MediaControllerMixin, - ControllerBase): + IpProxyMixin, + StreamingControllerMixin, + MissionControllerMixin, + MediaControllerMixin, + ControllerBase, +): pass @@ -49,23 +54,18 @@ class Drone(ControllerBase): pass -class SkyController(ControllerBase): - def __init__(self, *args, **kwds): - super().__init__(*args, is_skyctrl=True, **kwds) +class SkyController(CellularPairerMixin, ControllerBase): + def __init__(self, *args, backend: BackendType = BackendType.MuxIp, **kwds): + super().__init__(*args, is_skyctrl=True, backend=backend, **kwds) @callback_decorator() - def _link_status_cb( - self, - _arsdk_device, - _arsdk_device_info, - status, - _user_data): + def _link_status_cb(self, _arsdk_device, _arsdk_device_info, status, _user_data): """ - Notify link status. At connection completion, it is assumed to be - initially OK. If called with KO, user is responsible to take action. - It can either wait for link to become OK again or disconnect - immediately. In this case, call arsdk_device_disconnect and the - 'disconnected' callback will be called. + Notify link status. At connection completion, it is assumed to be + initially OK. If called with KO, user is responsible to take action. + It can either wait for link to become OK again or disconnect + immediately. In this case, call arsdk_device_disconnect and the + 'disconnected' callback will be called. """ self.logger.info(f"Link status: {status}") if status == od.ARSDK_LINK_STATUS_KO: diff --git a/src/olympe/doc/examples/cellular.py b/src/olympe/doc/examples/cellular.py new file mode 100644 index 0000000..4e8f1fd --- /dev/null +++ b/src/olympe/doc/examples/cellular.py @@ -0,0 +1,63 @@ +import os + +import olympe +from olympe.messages.drone_manager import connection_state +from olympe.messages import network +from olympe.enums.network import LinkType, LinkStatus + + +olympe.log.update_config({"loggers": {"olympe": {"level": "WARNING"}}}) + +SKYCTRL_IP = os.environ.get("SKYCTRL_IP", "192.168.53.1") + + +def test_cellular(): + """ + This script allows to pair in cellular a SkyController and a Drone previously paired together in wifi. + """ + + print("Test of cellular pairing") + + # Create a skycontroller + skyctrl = olympe.SkyController(SKYCTRL_IP) + # Connect to skycontroller + assert skyctrl.connect() + print("- SkyController connected") + + # Wait for the skycontroller and the drone to be connected + skyctrl(connection_state(state="connected")).wait() + print("- Drone connected") + + # Get the cellular link status before pairing + assert skyctrl(network.Command.GetState() >> network.Event.State()).wait().success() + links = skyctrl.get_state(network.Event.State)["links_status"]["links"] + cellular_link = next( + filter(lambda link: link["type"] == LinkType.LINK_TYPE_CELLULAR, links), None + ) + print(f" cellular link status: {cellular_link['status']}") + # Should be different from LinkStatus.running + + # Pair the SkyController and the Drone in cellular + print("- Cellular pairing of the SkyController and the Drone") + assert skyctrl.cellular.pair() + + print("- Waiting for cellular link to be running") + + # Wait for cellular link status pass to Link Status.running + while cellular_link["status"] != LinkStatus.running: + skyctrl(network.Event.State(_policy="wait")) + links = skyctrl.get_state(network.Event.State)["links_status"]["links"] + cellular_link = next( + filter(lambda link: link["type"] == LinkType.LINK_TYPE_CELLULAR, links), + None, + ) + + # Log cellular link status + print(f" cellular link status: {cellular_link['status']}") + + # Disconnect the skycontroller + skyctrl.disconnect() + print("- SkyController disconnected") + +if __name__ == "__main__": + test_cellular() diff --git a/src/olympe/doc/olympeapi.rst b/src/olympe/doc/olympeapi.rst index 5d705e9..2c80571 100644 --- a/src/olympe/doc/olympeapi.rst +++ b/src/olympe/doc/olympeapi.rst @@ -28,12 +28,14 @@ Olympe API Reference Documentation .. automethod:: __init__ .. automethod:: play .. automethod:: pause - .. automethod:: close + .. automethod:: resume + .. automethod:: stop .. automethod:: set_output_files .. automethod:: set_callbacks .. automethod:: get_session_metadata .. autoproperty:: state .. automethod:: wait + .. automethod:: close .. autoclass:: olympe.Media() diff --git a/src/olympe/doc/userguide.rst b/src/olympe/doc/userguide.rst index 2c7f7c8..edda42d 100644 --- a/src/olympe/doc/userguide.rst +++ b/src/olympe/doc/userguide.rst @@ -816,8 +816,8 @@ Bitfield example: Additional usage examples are available in the unit tests of `olympe.arsdkng.enums`. -Using Olympe exptectation eDSL -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +Using Olympe expectation eDSL +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ Before continuing with this Olympe example, you might want to read the :ref:`Olympe eDSL ` section. diff --git a/src/olympe/http.py b/src/olympe/http.py index 21c9c64..4c85e19 100644 --- a/src/olympe/http.py +++ b/src/olympe/http.py @@ -32,15 +32,17 @@ import logging import queue import re -from collections.abc import Mapping +from collections.abc import Mapping as AbstractMapping import h11 import wsproto import wsproto.connection import wsproto.events import wsproto.utilities +from typing import cast, Dict, Generator, Iterable, Mapping, Optional, Tuple, Union from urllib3.exceptions import LocationParseError from urllib3.util import parse_url +from urllib.parse import urlencode import olympe.networking @@ -61,21 +63,30 @@ class HTTPError(Exception): class Request: - def __init__(self, url, method=None, params=None, headers=None, data=None): + def __init__( + self, + url: str, + method: Optional[str] = None, + params: Optional[Mapping[str, str]] = None, + headers: Optional[Union[Mapping[str, str], Iterable[Tuple[str, str]]]] = None, + data: Optional[bytes] = None, + ): url = url.lstrip() method = method or "GET" - self.method = method - self.url = url - self.params = params or "" + self.method: str = method + self.url: str = url + self.params: Optional[Mapping[str, str]] = params if headers: - if isinstance(headers, Mapping): - headers = {k.lower(): v for k, v in headers.items()} + if isinstance(headers, AbstractMapping): + headers = { + k.lower(): v for k, v in cast(Mapping[str, str], headers).items() + } else: headers = {k.lower(): v for (k, v) in headers} else: headers = _default_headers - self.headers = headers - self.data = data + self.headers: Dict[str, str] = headers + self.data: Optional[bytes] = data try: scheme, auth, host, port, path, query, fragment = parse_url(url) @@ -93,24 +104,37 @@ def __init__(self, url, method=None, params=None, headers=None, data=None): port = 80 elif scheme in ("https", "wss"): port = 443 + else: + port = 80 + else: + port = int(port) if path is None: path = "/" if not self.headers.get("host"): self.headers["host"] = host - self.scheme = scheme - self.auth = auth - self.host = host - self.port = port - self.path = path - self.query = query - self.fragment = fragment - - def _h11(self): - return h11.Request( - method=self.method, target=self.path, headers=list(self.headers.items()) + self.scheme: str = scheme + self.auth: Optional[str] = auth + self.host: str = host + self.port: int = port + self.path: str = path + self.query: Optional[str] = query + self.fragment: Optional[str] = fragment + + def _h11(self) -> Generator[h11.Event, None, None]: + if self.data and "content-length" not in self.headers: + self.headers["content-length"] = str(len(self.data)) + if self.params is None: + target = self.path + else: + target = f"{self.path}?{urlencode(self.params)}" + yield h11.Request( + method=self.method, target=target, headers=list(self.headers.items()) ) + if self.data: + yield h11.Data(data=self.data) + yield h11.EndOfMessage() def _wsproto(self): return wsproto.events.Request( @@ -171,6 +195,12 @@ async def text(self): async def json(self): return json.loads(await self.text()) + async def content(self): + data = b'' + async for chunk in self: + data += chunk + return data + def raise_for_status(self): if not self.ok: raise HTTPError( @@ -204,7 +234,7 @@ def __init__(self, connection, request): self._connection = connection self._request = request - async def aread(self): + async def aread(self) -> Union[str, bytes, None]: if ( self._connection._conn.state is wsproto.connection.ConnectionState.CLOSED and self._connection._events.empty() @@ -223,7 +253,7 @@ async def aread(self): ) return event.data - async def awrite(self, data): + async def awrite(self, data: bytes): if ( self._connection._conn.state is not wsproto.connection.ConnectionState.OPEN or not self._connection.connected @@ -283,9 +313,11 @@ def __init__(self, loop, session, scheme): self._resolver = session._resolver self._scheme = scheme if scheme in ("http", "https"): - self._conn = h11.Connection(our_role=h11.CLIENT) + self._conn_http = h11.Connection(our_role=h11.CLIENT) + self._conn = self._conn_http else: - self._conn = wsproto.WSConnection(wsproto.ConnectionType.CLIENT) + self._conn_ws = wsproto.WSConnection(wsproto.ConnectionType.CLIENT) + self._conn = self._conn_ws if scheme in ("http", "ws"): self._client = olympe.networking.TcpClient(self._loop) else: @@ -301,7 +333,10 @@ def __init__(self, loop, session, scheme): self._endpoint = None self._request = None - async def send(self, request: Request, timeout=None): + async def send( + self, request: Request, timeout=None + ) -> Union[Response, WebSocket, None]: + self._sending = True self._request = request try: @@ -309,36 +344,42 @@ async def send(self, request: Request, timeout=None): self._endpoint = (request.host, request.port) else: assert self._endpoint == (request.host, request.port) - for (_, sockaddr) in await self._resolver.resolve( - request.host, request.port - ): - addr, *_ = sockaddr - break + if self._client.connected: + connected = True else: - raise ConnectionError(f"Cannot resolve {request.host}:{request.port}") - if self._scheme in ("http", "ws"): - connected = await self._client.aconnect( - addr, request.port, timeout=timeout - ) - else: - connected = await self._client.aconnect( - addr, request.port, server_hostname=request.host, timeout=timeout - ) + for (_, sockaddr) in await self._resolver.resolve( + request.host, request.port + ): + addr, *_ = sockaddr + break + else: + raise ConnectionError(f"Cannot resolve {request.host}:{request.port}") + if self._scheme in ("http", "ws"): + connected = await self._client.aconnect( + addr, request.port, timeout=timeout + ) + else: + connected = await self._client.aconnect( + addr, request.port, server_hostname=request.host, timeout=timeout + ) if not connected: raise ConnectionError() return await self._do_send(request) finally: self._sending = False - async def _do_send(self, request: Request): + async def _do_send(self, request: Request) -> Union[Response, WebSocket, None]: if request.scheme in ("http", "https"): return await self._do_send_http(request) else: return await self._do_send_ws(request) async def _do_send_http(self, request: Request): - data = self._conn.send(request._h11()) - data += self._conn.send(h11.EndOfMessage()) + data = bytes() + for h11_event in request._h11(): + chunk = self._conn_http.send(h11_event) + if chunk is not None: + data += chunk self._reusing = False await self.awrite(data) event = await self._get_next_event() @@ -346,8 +387,8 @@ async def _do_send_http(self, request: Request): await self._aclosed_http() # Handle: keep-alive/connection: closed return Response(self, request, event) - async def _do_send_ws(self, request: Request): - data = self._conn.send(request._wsproto()) + async def _do_send_ws(self, request: Request) -> Optional[WebSocket]: + data = self._conn_ws.send(request._wsproto()) self._reusing = False await self.awrite(data) event = await self._get_next_event() @@ -357,7 +398,7 @@ async def _do_send_ws(self, request: Request): ws = WebSocket(self, request) return ws - async def awrite(self, data): + async def awrite(self, data: bytes): return await self._client.awrite(data) async def _get_next_event(self): @@ -368,36 +409,39 @@ def _feed_data(self, data: bytes): self._conn.receive_data(data) if self._scheme in ("http", "https"): while True: - event = self._conn.next_event() + event = self._conn_http.next_event() if event in (h11.NEED_DATA, h11.PAUSED): return if event == h11.ConnectionClosed(): - self._loop.logger.error(f"unexpected end of request for {self._request.url}") + assert self._request is not None + self._loop.logger.error( + f"unexpected end of request for {self._request.url}" + ) self._client.disconnect() self._client.destroy() return self._events.put_nowait(event) self._event_sem.release() if isinstance(event, h11.EndOfMessage): - if self._conn.our_state is h11.MUST_CLOSE: + if self._conn_http.our_state is h11.MUST_CLOSE: self._feed_eof() self._client.disconnect() self._client.destroy() return else: - for event in self._conn.events(): + for event in self._conn_ws.events(): if isinstance(event, wsproto.events.Ping): - pong = self._conn.send(event.response()) + pong = self._conn_ws.send(event.response()) self._client.write(pong) continue if isinstance(event, wsproto.events.Pong): continue if isinstance(event, wsproto.events.CloseConnection): - if self._conn.state not in ( + if self._conn_ws.state not in ( wsproto.connection.ConnectionState.CLOSED, wsproto.connection.ConnectionState.LOCAL_CLOSING, ): - close = self._conn.send(event.response()) + close = self._conn_ws.send(event.response()) self._client.write(close) self._client.disconnect() self._client.destroy() @@ -408,17 +452,17 @@ def _feed_data(self, data: bytes): def _feed_eof(self): if self._scheme in ("http", "https"): close_event = h11.ConnectionClosed() - self._conn.send(close_event) + self._conn_http.send(close_event) self._events.put_nowait(close_event) self._event_sem.release() else: close_event = wsproto.events.CloseConnection(0) - if self._conn.state not in ( + if self._conn_ws.state not in ( wsproto.connection.ConnectionState.CLOSED, wsproto.connection.ConnectionState.LOCAL_CLOSING, ): try: - data = self._conn.send(close_event) + data = self._conn_ws.send(close_event) if self._client.connected: self._client.write(data) except wsproto.utilities.LocalProtocolError: @@ -437,13 +481,19 @@ def reuse(self): return self._reuse_websocket() def _reuse_http(self): - if self._conn.our_state is h11.DONE and self._conn.their_state is h11.DONE: + if ( + self._conn_http.our_state is h11.DONE + and self._conn_http.their_state is h11.DONE + ): self._events = queue.Queue() self._event_sem = Semaphore(value=0) - self._conn.start_next_cycle() + self._conn_http.start_next_cycle() self._reusing = True return True - elif self._conn.our_state is h11.IDLE and self._conn.their_state is h11.IDLE: + elif ( + self._conn_http.our_state is h11.IDLE + and self._conn_http.their_state is h11.IDLE + ): self._events = queue.Queue() self._event_sem = Semaphore(value=0) self._reusing = True @@ -462,14 +512,14 @@ async def aclosed(self): return self._closed_websocket() async def _aclosed_http(self): - if self._conn.their_state is h11.CLOSED: + if self._conn_http.their_state is h11.CLOSED: self._feed_eof() await self._client.adisconnect() return True - return self._conn.our_state is h11.CLOSED + return self._conn_http.our_state is h11.CLOSED def _closed_websocket(self): - return self._conn.state is wsproto.connection.ConnectionState.CLOSED + return self._conn_ws.state is wsproto.connection.ConnectionState.CLOSED def disconnect(self): return self._client.disconnect() @@ -495,7 +545,7 @@ def __init__(self, loop=None): self._connection_pools = dict() self._loop.register_cleanup(self.astop) - async def _get_connection(self, scheme, host, port): + async def _get_connection(self, scheme: str, host: str, port: int): loop = get_running_loop() if (scheme, host, port) not in self._connection_pools: connection = Connection(loop, self, scheme) @@ -515,34 +565,40 @@ async def _get_connection(self, scheme, host, port): pool.append(connection) return connection - async def get(self, url, **kwds) -> Response: - return await self.request("GET", url, **kwds) + async def get(self, url: str, **kwds) -> Response: + return cast(Response, await self.request("GET", url, **kwds)) - async def head(self, url, **kwds) -> Response: - return await self.request("HEAD", url, **kwds) + async def head(self, url: str, **kwds) -> Response: + return cast(Response, await self.request("HEAD", url, **kwds)) - async def patch(self, url, **kwds) -> Response: - return await self.request("PATCH", url, **kwds) + async def patch(self, url: str, **kwds) -> Response: + return cast(Response, await self.request("PATCH", url, **kwds)) - async def post(self, url, **kwds) -> Response: - return await self.request("POST", url, **kwds) + async def post(self, url: str, **kwds) -> Response: + return cast(Response, await self.request("POST", url, **kwds)) - async def delete(self, url, **kwds) -> Response: - return await self.request("DELETE", url, **kwds) + async def delete(self, url: str, **kwds) -> Response: + return cast(Response, await self.request("DELETE", url, **kwds)) async def request( - self, method, url, params=None, data=None, headers=None, timeout=None - ) -> Response: + self, + method: str, + url: str, + params: Optional[str] = None, + data: Optional[bytes] = None, + headers: Optional[Union[Mapping[str, str], Iterable[Tuple[str, str]]]] = None, + timeout: Optional[float] = None, + ) -> Union[Response, WebSocket, None]: req = Request(url, method=method, params=params, data=data, headers=headers) assert req.scheme in ("http", "https") connection = await self._get_connection(req.scheme, req.host, req.port) return await connection.send(req, timeout=timeout) - async def websocket(self, url, timeout=None): + async def websocket(self, url, timeout=None) -> Optional[WebSocket]: req = Request(url, method="GET") assert req.scheme in ("ws", "wss") connection = await self._get_connection(req.scheme, req.host, req.port) - return await connection.send(req, timeout=timeout) + return cast(Optional[WebSocket], await connection.send(req, timeout=timeout)) def stop(self): return self._loop.run_async(self.astop) @@ -563,15 +619,26 @@ async def main(): print(await response.text()) ws = await session.websocket( "wss://demo.piesocket.com/v3/channel_1?" - "api_key=oCdCMcMPQpbvNjUIzqtvF1d2X2okWpDQj4AwARJuAgtjhzKxVEjQU6IdCjwm¬ify_self" + "api_key=VCXCEuvhGcBDP7XhiJJUDvR1e1D3eiVjgZ9VRiaV¬ify_self" ) - print(json.loads(await ws.aread())) + if ws is None: + print("Failed to establish websocket connection") + return + data = await ws.aread() + if data is None: + print("Failed to read data from websocket") + return + print(json.loads(data)) await ws.awrite( json.dumps( {"type": "event", "name": "test-event", "message": "cmd_ping"} ).encode() ) - print(json.loads(await ws.aread())) + data = await ws.aread() + if data is None: + print("Failed to read data from websocket") + return + print(json.loads(data)) loop.stop() diff --git a/src/olympe/media.py b/src/olympe/media.py index ad4930a..6c6a3c6 100644 --- a/src/olympe/media.py +++ b/src/olympe/media.py @@ -273,6 +273,7 @@ class ResourceFormat(MediaEnumBase): "Namedtuple class " + ResourceInfo.__doc__ + """ + - media_id (str): unique id of the media - resource_id (str): unique id of the resource - type ( :py:class:`~olympe.media.MediaType`): type of the resource diff --git a/src/olympe/mixins/cellular.py b/src/olympe/mixins/cellular.py new file mode 100644 index 0000000..5c08f00 --- /dev/null +++ b/src/olympe/mixins/cellular.py @@ -0,0 +1,426 @@ +# Copyright (C) 2023 Parrot Drones SAS +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in +# the documentation and/or other materials provided with the +# distribution. +# * Neither the name of the Parrot Company nor the names +# of its contributors may be used to endorse or promote products +# derived from this software without specific prior written +# permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS +# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE +# PARROT COMPANY BE LIABLE FOR ANY DIRECT, INDIRECT, +# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, +# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS +# OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED +# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT +# OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF +# SUCH DAMAGE. + +from olympe.concurrent import Loop +from olympe.log import LogMixin +from olympe.arsdkng.controller import ControllerBase +from olympe.utils import callback_decorator +from olympe.enums import drone_manager as drone_manager_enums +from olympe.http import Session, HTTPError +from olympe.messages.security import Command +from olympe.controller import Disconnected + +from logging import getLogger +from typing import Dict, Optional + +import hashlib +import time + + +class HeaderField: + authorization = "Authorization" + contentType = "Content-Type" + xApiKey = "x-api-key" + callerId = "X-callerId" + userAgent = "User-Agent" + + +class HeaderValue: + appJson = "application/json" + + +class Cellular(LogMixin): + """ + Controller Cellular API class + Controller mixin providing the cellular pairing. + """ + + """Request timeout""" + _TIMEOUT = 60 + + """Parrot accounts URL.""" + _APC_API_BASE_URL = "https://accounts-api.parrot.com" + """Parrot Academy URL.""" + _ACADEMY_BASE_URL = "https://academy.parrot.com" + + """APC secret key.""" + _APC_SECRECT_KEY = "g%2SW+m,cc9|eDQBgK:qTS2l=;[O~f@W" + """Caller ID associated with the APC secret key.""" + _CALLER_ID = "OpenFlight" + """Academy secret key.""" + _ACADEMY_SECRECT_KEY = "cd7oG8K9h86oCya0u5C0H7mphOuu8LU91o1hBLiG" + """User agent.""" + _USER_AGENT = "Olympe" + """Drone WEB API port.""" + _DRONE_WEB_API_PORT = 80 + + def __init__( + self, controller: "ControllerBase", *args, autoconnect: bool = False, **kwds + ): + """ + Constructor + + :param controller: Controller owner of the Cellular instance + :param autoconnect: `True` to anable the automatic cellular connection, `False` otherwise. + (defaults to `False`) + """ + + super().__init__(controller._name, controller._device_name, "Cellular") + + if controller._device_name is not None: + self.logger = getLogger(f"olympe.cellular.{controller._device_name}") + else: + self.logger = getLogger("olympe.cellular") + + self._controller = controller + self._loop = Loop(self.logger) + self._session = Session(loop=self._loop) + self._loop.start() + self._proxy = None + self._drone_http_url = None + self._autoconnect = autoconnect + self._disconnect_subscriber = self._controller.subscribe( + self._on_disconnection, Disconnected() + ) + + @property + def autoconnect(self) -> bool: + """`True` if the automatic cellular connection is enabled, `False` otherwise.""" + return self._autoconnect + + def pair(self, timeout: Optional[float] = None) -> bool: + """ + Pairs in cellular the SkyController with the Drone currently connected in wifi. + + :return: `True` if the pairing successful, `False` otherwise. + """ + + return self.fpair().result_or_cancel(timeout=timeout) + + def fpair(self): + """ + Retrives a future of :py:func:`pair` + """ + + return self._loop.run_async(self._apair) + + def _destroy(self): + """ + Destructor + """ + self._disconnect_subscriber.unsubscribe() + + self._session.stop() + self._loop.stop() + + def _on_disconnection(self, *_): + """ + Called at the controller disconnection. + """ + if self._proxy is not None: + self._proxy.close() + self._proxy = None + + @classmethod + def _format_acp_query_items( + cls, apc_key: str, params: Dict[str, str] = {} + ) -> Dict[str, str]: + """ + Formats APC query items. + + :param apc_key: APC signature token. + :param params: query parameters. + + :return: formatted query item dictionary. + """ + ts = int(time.time()) + pre_str = "" + keys_lst = params.keys() + sorted(keys_lst) + for key in keys_lst: + pre_str += f"{params[key]}" + pre_str += f"{ts}" + pre_str += apc_key + + token = hashlib.md5(pre_str.encode()).hexdigest() + return {"ts": f"{ts}", "token": f"{token}"} + + async def _get_anonymous_token(self) -> str: + """ + Retrieves an anonymous APC token. + + :return: an anonymous APC token. + """ + + url = f"{Cellular._APC_API_BASE_URL}/V4/account/tmp/create" + + headers = { + HeaderField.callerId: Cellular._CALLER_ID, + HeaderField.contentType: HeaderValue.appJson, + } + + self.logger.info("get an anonymous token") + + apc_query_items = Cellular._format_acp_query_items(Cellular._APC_SECRECT_KEY) + + response = await self._session.post( + url, headers=headers, params=apc_query_items + ) + + response.raise_for_status() + data = await response.json() + + apc_token = data.get("apcToken") + return apc_token + + async def _get_association_challenge(self, apc_token: str) -> str: + """ + Retrieves a drone association challenge. + + :param apc_token: user authentication APC token. + + :return: a drone association challenge. + """ + + url = f"{Cellular._ACADEMY_BASE_URL}/apiv1/4g/pairing/challenge" + headers = { + HeaderField.authorization: f"Bearer {apc_token}", + HeaderField.contentType: HeaderValue.appJson, + HeaderField.xApiKey: Cellular._ACADEMY_SECRECT_KEY, + HeaderField.userAgent: Cellular._USER_AGENT, + } + + params = { + "operation": "associate", + } + + self.logger.info("get the challenge association") + + response = await self._session.get( + url, + headers=headers, + params=params, + timeout=Cellular._TIMEOUT, + ) + + response.raise_for_status() + challenge = await response.text() + + return challenge + + async def _sign_challenge_by_drone(self, challenge) -> bytes: + """ + Signs an association challenge by the connected drone. + + :param challenge: drone challenge association to sign. + + :return: a message containing the signed drone association challenge. + """ + + url = f"{self._drone_http_url}/api/v1/secure-element/sign_challenge" + queryItems = {"operation": "associate", "challenge": challenge} + + self.logger.info("sign the challenge by the drone") + + response = await self._session.get( + url, params=queryItems, timeout=Cellular._TIMEOUT + ) + response.raise_for_status() + drone_signed_challenge = await response.content() + + return drone_signed_challenge + + async def _associate_user_drone(self, apc_token, drone_signed_challenge: bytes): + """ + Associates a user and a drone. + + :param apc_token: authentication APC token of the user to associate with the drone. + :param drone_signed_challenge: message containing the association challenge signed by drone to associate with the drone. + """ + + url = f"{Cellular._ACADEMY_BASE_URL}/apiv1/4g/pairing" + headers = { + HeaderField.authorization: f"Bearer {apc_token}", + HeaderField.contentType: HeaderValue.appJson, + HeaderField.xApiKey: Cellular._ACADEMY_SECRECT_KEY, + HeaderField.userAgent: Cellular._USER_AGENT, + } + + self.logger.info("associate the user APC token and the drone") + + response = await self._session.post( + url, + timeout=Cellular._TIMEOUT, + headers=headers, + data=drone_signed_challenge, + ) + response.raise_for_status() + + async def _get_drone_list(self, apc_token): + """ + Retrieves the drone list paired with this an APC token. + + :param apc_token: user authentication APC token. + + :return: the drone list paired with this APC token. + """ + url = f"{Cellular._ACADEMY_BASE_URL}/apiv1/drone/list" + headers = { + HeaderField.authorization: f"Bearer {apc_token}", + HeaderField.contentType: HeaderValue.appJson, + HeaderField.xApiKey: Cellular._ACADEMY_SECRECT_KEY, + HeaderField.userAgent: Cellular._USER_AGENT, + } + + self.logger.info("get paired drone list") + response = await self._session.get( + url, + headers=headers, + timeout=Cellular._TIMEOUT, + ) + response.raise_for_status() + drone_list = await response.text() + + return drone_list + + async def _apair(self): + """ + Pairs in cellular the SkyController with the Drone currently connected in wifi. + + :return: `True` if the pairing successful, `False` otherwise. + """ + + try: + # get anonymous user apc token + token = await self._get_anonymous_token() + + # get challenge association + challenge = await self._get_association_challenge(token) + + # Sign the challenge association by the drone + drone_signed_challenge = await self._sign_challenge_by_drone(challenge) + + # associate the user and the drone. + await self._associate_user_drone(token, drone_signed_challenge) + except HTTPError as e: + self.logger.warning(f"{e}") + return False + + # send the user APC token to the skycontroller + self.logger.info("send the user APC token to the skycontroller") + if not await self._controller(Command.RegisterApcToken(token=token)): + return False + + try: + # get drone list + drone_list = await self._get_drone_list(token) + except HTTPError as e: + self.logger.warning(f"{e}") + return False + + # send drone list to the skycontroller + self.logger.info("send the drone list to the skycontroller") + if not await self._controller(Command.RegisterApcDroneList(list=drone_list)): + return False + + return True + + async def _create_proxy(self): + """ + Creates the proxy to access to the drone. + """ + self._proxy = await self._controller.fopen_tcp_proxy( + Cellular._DRONE_WEB_API_PORT + ) + + self._drone_http_url = f"http://{self._proxy.address}:{self._proxy.port}" + + # start the pairing if the automatic cellular connection is enabled + if self._autoconnect: + self.fpair() + + @callback_decorator() + def _on_drone_connection_state_change( + self, state: drone_manager_enums.connection_state + ): + """ + Called at the change of the drone connection state. + """ + if state == drone_manager_enums.connection_state.connected: + if self._proxy is not None: + self._proxy.close() + self._proxy = None + + self._controller._thread_loop.run_async(self._create_proxy) + elif self._proxy is not None: + self._proxy.close() + self._proxy = None + + +class CellularPairerMixin: + """ + Controller mixin providing the cellular API. + """ + + def __init__(self, *args, cellular_autoconnect: bool = False, **kwds): + """ + Constructor + + :param cellular_autoconnect: `True` to enable the automatic cellular connection, `False` otherwise. + """ + super().__init__(*args, **kwds) + self._cellular = Cellular(self, autoconnect=cellular_autoconnect) + + def destroy(self): + """ + Destructor + """ + self._cellular._destroy() + super().destroy() + + @property + def cellular(self) -> Cellular: + """Cellular API.""" + return self._cellular + + @property + def cellular_autoconnect(self) -> bool: + """`True` if the automatic cellular connection is enabled, `False` otherwise.""" + return self._cellular.autoconnect + + @callback_decorator() + def _on_connection_state_changed(self, message_event, _): + super()._on_connection_state_changed(message_event, _) + + # Handle drone connection_state events + self._cellular._on_drone_connection_state_change(message_event._args["state"]) + + def set_device_name(self, device_name): + super().set_device_name(device_name) + self._cellular.set_device_name(device_name) diff --git a/src/olympe/mixins/ipproxy.py b/src/olympe/mixins/ipproxy.py new file mode 100644 index 0000000..0d0a66f --- /dev/null +++ b/src/olympe/mixins/ipproxy.py @@ -0,0 +1,215 @@ +# Copyright (C) 2023 Parrot Drones SAS +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above copyright +# notice, this list of conditions and the following disclaimer in +# the documentation and/or other materials provided with the +# distribution. +# * Neither the name of the Parrot Company nor the names +# of its contributors may be used to endorse or promote products +# derived from this software without specific prior written +# permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS +# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE +# PARROT COMPANY BE LIABLE FOR ANY DIRECT, INDIRECT, +# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, +# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS +# OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED +# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT +# OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF +# SUCH DAMAGE. + +import ctypes +import olympe_deps as od +from olympe.utils import callback_decorator +from olympe.concurrent import Condition, Future +from olympe.arsdkng.controller import ControllerBase +from olympe.messages.drone_manager import connection_state +from olympe.controller import Connected +from typing import Optional + + +class IpProxy: + """Drone Ip Proxy""" + + def __init__(self, controller: ControllerBase, device_type: int, remote_port: int): + """ + Constructor + + :param controller: controller owning the proxy + :param device_type: type of the device to access + :param remote_port: port to access + """ + self._controller = controller + self._device_type = device_type + self._remote_port = remote_port + self._port = None + self._address = None + self._proxy = None + self._resolved = False + self._open_condition = Condition(self._controller._thread_loop) + + @property + def port(self): + """ + Port to connect + + :getter: Returns the port to connect + :type: int + """ + return self._port + + @property + def address(self): + """ + Address to connect + + :getter: Returns the address to connect + :type: string + """ + return self._address + + def close(self): + """Closes the proxy""" + self._controller._thread_loop.run_async(self._aclose) + + def _open(self): + """Opens the proxy""" + return self._controller._thread_loop.run_async(self._aopen) + + async def _aopen(self): + """Opens the proxy""" + self._proxy_cbs = od.struct_arsdk_device_tcp_proxy_cbs.bind( + { + "open": self._open_cb, + "close": self._close_cb, + } + ) + + proxy = od.POINTER_T(od.struct_arsdk_device_tcp_proxy)() + res = od.arsdk_device_create_tcp_proxy( + self._controller._device.arsdk_device, + self._device_type, + self._remote_port, + self._proxy_cbs, + ctypes.pointer(proxy), + ) + + if res != 0: + raise RuntimeError(f"Error while opening proxy: {res}.") + + self._proxy = proxy + self._controller._thread_loop.register_cleanup(self._cleanup) + + async with self._open_condition: + await self._open_condition.wait() + + async def _cleanup(self): + """ + Cleans the proxy + """ + if self._proxy is not None: + await self._aclose() + + async def _aclose(self): + """Closes the proxy""" + if self._proxy is None: + return + + res = od.arsdk_device_destroy_tcp_proxy(self._proxy) + if res != 0: + raise RuntimeError(f"Error while closing proxy: {res}") + + self._proxy = None + self._controller._thread_loop.unregister_cleanup(self._cleanup) + + @callback_decorator() + def _open_cb(self, proxy, localport, user_data): + """ + Called at the local socket opening. + """ + + address_native = od.arsdk_device_tcp_proxy_get_addr(self._proxy) + self._address = od.string_cast(address_native) + self._port = od.arsdk_device_tcp_proxy_get_port(self._proxy) + + self._controller._thread_loop.run_async(self._open_condition_notify) + + async def _open_condition_notify(self): + """Notifies the opening condition""" + async with self._open_condition: + self._open_condition.notify_all() + + @callback_decorator() + def _close_cb(self, proxy, userdata): + """ + Called at the local socket closing. + """ + + # Do nothing + # Either the resolution failed and the timeout will be triggered or + # the proxy has already been opened and the local socket will notify, itself, + # of its closure. + pass + + +class IpProxyMixin: + """ + Controller mixin providing the Ip proxy creation. + """ + + def __init__(self, *args, **kwds): + super().__init__(*args, **kwds) + + def open_tcp_proxy(self, port: int, timeout: Optional[float] = None) -> IpProxy: + """Opens a new drone tcp proxy + + :param port: port to access + :param timeout: the timeout in seconds or None for infinite timeout (the + default) + + :return: an :py:class:`IpProxy` object open to the drone + """ + + return self.fopen_tcp_proxy(port).result_or_cancel(timeout=timeout) + + def fopen_tcp_proxy(self, port: int) -> Future: + """ + Retrives a future of :py:func:`open_tcp_proxy` + + :param port: port to access + """ + + return self._thread_loop.run_async(self.aopen_tcp_proxy, port) + + async def aopen_tcp_proxy(self, port: int) -> IpProxy: + """Opens a new drone tcp proxy + + :param port: port to access + + :return: an :py:class:`IpProxy` object open to the drone + + Should run in the :py:class:`~olympe.arsdkng.controller.ControllerBase` backend loop + """ + + # Wait to be connected to a drone to get it model Id + if self._is_skyctrl: + await self(connection_state(state="connected")) + drone_model = self.get_state(connection_state)["model"] + else: + if not self.connected: + # The connected event is also triggered from the backend loop + await self(Connected()) + drone_model = self._device_type + + proxy = IpProxy(self, drone_model, port) + await proxy._open() + return proxy diff --git a/src/olympe/mixins/media.py b/src/olympe/mixins/media.py index 7530e09..cbf4d1e 100644 --- a/src/olympe/mixins/media.py +++ b/src/olympe/mixins/media.py @@ -29,21 +29,22 @@ from olympe.media import Media from olympe.utils import callback_decorator +from olympe.enums import drone_manager as drone_manager_enums class MediaControllerMixin: - def __init__(self, *args, media_autoconnect=True, media_port=80, **kwds): + def __init__( + self, *args, media_autoconnect: bool = True, media_port: int = 80, **kwds + ): self._media = None super().__init__(*args, **kwds) self._media_autoconnect = media_autoconnect self._media_port = media_port - media_hostname = self._ip_addr_str + f":{self._media_port}" self._media = Media( - name=self._name, - hostname=media_hostname, - device_name=self._device_name, - scheduler=self._scheduler + name=self._name, device_name=self._device_name, scheduler=self._scheduler ) + """Proxy through the skyctrl to access to the drone""" + self._proxy = None def destroy(self): self._media.shutdown() @@ -57,17 +58,42 @@ def _connected_cb(self, *args): self._media.set_hostname(media_hostname) self._media.async_disconnect().then(lambda f: self._media.async_connect()) + @callback_decorator() + def _disconnected_cb(self, *args): + super()._disconnected_cb(*args) + # If the direct device is disconnected, the proxy is no longer unusable + if self._proxy is not None: + self._proxy.close() + self._proxy = None + @callback_decorator() def _on_connection_state_changed(self, message_event, _): super()._on_connection_state_changed(message_event, _) # Handle drone connection_state events if self._is_skyctrl: - # The SkyController forwards port tcp/180 to the drone tcp/80 - # for the web API endpoints - if self._media_autoconnect: - media_hostname = self._ip_addr_str + ":180" - self._media.set_hostname(media_hostname) - self._media.async_disconnect().then(lambda f: self._media.async_connect()) + if ( + message_event._args["state"] + == drone_manager_enums.connection_state.connected + ): + if self._proxy is not None: + self._proxy.close() + + self._thread_loop.run_async(self._create_proxy) + elif self._proxy is not None: + self._proxy.close() + self._proxy = None + + async def _create_proxy(self): + """ + Creates the proxy to access to the drone + """ + self._proxy = await self.aopen_tcp_proxy(self._media_port) + + media_hostname = f"{self._proxy.address}:{self._proxy.port}" + self._media.set_hostname(media_hostname) + + if self._media_autoconnect: + self._media.async_disconnect().then(lambda f: self._media.async_connect()) def _reset_instance(self): """ @@ -78,9 +104,9 @@ def _reset_instance(self): super()._reset_instance() @property - def media(self): + def media(self) -> Media: return self._media @property - def media_autoconnect(self): + def media_autoconnect(self) -> bool: return self._media_autoconnect diff --git a/src/olympe/mixins/streaming.py b/src/olympe/mixins/streaming.py index 515923e..573e0e4 100644 --- a/src/olympe/mixins/streaming.py +++ b/src/olympe/mixins/streaming.py @@ -54,9 +54,9 @@ def _dispose_pdraw(self): def _create_pdraw_interface(self): return Pdraw( name=self._name, - server_addr=self._ip_addr.decode(), - device_name=self._device_name, buffer_queue_size=self._video_buffer_queue_size, + controller=self, + pdraw_thread_loop=self._thread_loop, ) def connect(self, **kwds): diff --git a/src/olympe/video/pdraw.py b/src/olympe/video/pdraw.py index 63f7650..5d476da 100644 --- a/src/olympe/video/pdraw.py +++ b/src/olympe/video/pdraw.py @@ -41,10 +41,17 @@ from olympe.utils import py_object_cast, callback_decorator from olympe.concurrent import Condition, Loop from olympe.log import LogMixin +from olympe.arsdkng.backend import CtrlBackendMuxIp from . import VMetaFrameType, PDRAW_LOCAL_STREAM_PORT, PDRAW_LOCAL_CONTROL_PORT # noqa from . import PDRAW_TIMESCALE from .mp4 import Mp4Mux from .frame import VideoFrame +from typing import Optional, TYPE_CHECKING +from warnings import warn + + +if TYPE_CHECKING: + from olympe.arsdkng.controller import ControllerBase class PdrawState(Enum): @@ -59,32 +66,31 @@ class PdrawState(Enum): Error = auto() -class H264Header(namedtuple('H264Header', ['sps', 'spslen', 'pps', 'ppslen'])): - +class H264Header(namedtuple("H264Header", ["sps", "spslen", "pps", "ppslen"])): def tofile(self, f): start = bytearray([0, 0, 0, 1]) if self.spslen > 0: f.write(start) - f.write(bytearray(self.sps[:self.spslen])) + f.write(bytearray(self.sps[: self.spslen])) if self.ppslen > 0: f.write(start) - f.write(bytearray(self.pps[:self.ppslen])) + f.write(bytearray(self.pps[: self.ppslen])) def StreamFactory(): return { - 'id': None, - 'id_userdata': ctypes.c_void_p(), - 'frame_type': od.VDEF_FRAME_TYPE_UNKNOWN, - 'h264_header': None, - 'track_id': None, - 'metadata_track_id': None, - 'video_sink': None, - 'video_sink_cbs': None, - 'video_sink_lock': threading.RLock(), - 'video_queue': None, - 'video_queue_event': od.POINTER_T(od.struct_pomp_evt)(), + "id": None, + "id_userdata": ctypes.c_void_p(), + "frame_type": od.VDEF_FRAME_TYPE_UNKNOWN, + "h264_header": None, + "track_id": None, + "metadata_track_id": None, + "video_sink": None, + "video_sink_cbs": None, + "video_sink_lock": threading.RLock(), + "video_queue": None, + "video_queue_event": od.POINTER_T(od.struct_pomp_evt)(), } @@ -123,25 +129,28 @@ def resync(*args, **kwds): class Pdraw(LogMixin): - - def __init__(self, - name=None, - device_name=None, - server_addr=None, - buffer_queue_size=8, - pdraw_thread_loop=None, - ): + def __init__( + self, + name: Optional[str] = None, + device_name: Optional[str] = None, + server_addr: Optional[str] = None, + buffer_queue_size: int = 8, + pdraw_thread_loop: Optional[Loop] = None, + controller: Optional["ControllerBase"] = None, + ): """ :param name: (optional) pdraw client name (used by Olympe logs) - :type name: str :param device_name: (optional) the drone device name (used by Olympe logs) - :type device_name: str :param buffer_queue_size: (optional) video buffer queue size (defaults to 8) - :type buffer_queue_size: int + :param pdraw_thread_loop: (optional) Thread where run pdraw + if 'None' a new thread is created + :param controller: (optional) Controller owner of the pdraw instance """ + if device_name is None and controller: + device_name = controller._device_name super().__init__(name, device_name, "pdraw") if pdraw_thread_loop is None: @@ -152,12 +161,12 @@ def __init__(self, self.own_pdraw_thread_loop = False self.pdraw_thread_loop = pdraw_thread_loop - self.callbacks_thread_loop = Loop( - self.logger, parent=self.pdraw_thread_loop) + self.callbacks_thread_loop = Loop(self.logger, parent=self.pdraw_thread_loop) self.callbacks_thread_loop.start() self.buffer_queue_size = buffer_queue_size self.pomp_loop = self.pdraw_thread_loop.pomp_loop + self._controller = controller self._open_condition = Condition(self.pdraw_thread_loop) self._close_condition = Condition(self.pdraw_thread_loop) self._play_condition = Condition(self.pdraw_thread_loop) @@ -174,14 +183,14 @@ def __init__(self, self.outfiles = { od.VDEF_FRAME_TYPE_CODED: { - 'data': None, - 'meta': None, - 'info': None, + "data": None, + "meta": None, + "info": None, }, od.VDEF_FRAME_TYPE_RAW: { - 'data': None, - 'meta': None, - 'info': None, + "data": None, + "meta": None, + "info": None, }, } @@ -198,29 +207,30 @@ def __init__(self, } self.url = None - if server_addr is None: - server_addr = "192.168.42.1" - self.server_addr = server_addr - self.resource_name = "live" + self.resource_name = None self.media_name = None - self.demuxer_cbs = od.struct_pdraw_demuxer_cbs.bind({ - "open_resp": self._open_resp, - "close_resp": self._close_resp, - "unrecoverable_error": self._unrecoverable_error, - "ready_to_play": self._ready_to_play, - "play_resp": self._play_resp, - "pause_resp": self._pause_resp, - "seek_resp": self._seek_resp, - "select_media": self._select_media, - "end_of_range": self._end_of_range, - }) - self.pdraw_cbs = od.struct_pdraw_cbs.bind({ - "socket_created": self._socket_created, - "media_added": self._media_added, - "media_removed": self._media_removed, - "stop_resp": self.stop_resp, - }) + self.demuxer_cbs = od.struct_pdraw_demuxer_cbs.bind( + { + "open_resp": self._open_resp, + "close_resp": self._close_resp, + "unrecoverable_error": self._unrecoverable_error, + "ready_to_play": self._ready_to_play, + "play_resp": self._play_resp, + "pause_resp": self._pause_resp, + "seek_resp": self._seek_resp, + "select_media": self._select_media, + "end_of_range": self._end_of_range, + } + ) + self.pdraw_cbs = od.struct_pdraw_cbs.bind( + { + "socket_created": self._socket_created, + "media_added": self._media_added, + "media_removed": self._media_removed, + "stop_resp": self.stop_resp, + } + ) self.video_sink_vt = { od.VDEF_FRAME_TYPE_CODED: _CodedVideoSink, @@ -230,7 +240,7 @@ def __init__(self, self.pdraw_thread_loop.register_cleanup(self._adispose) @property - def state(self): + def state(self) -> PdrawState: """ Return the current Pdraw state @@ -239,14 +249,14 @@ def state(self): return self._state @state.setter - def state(self, value): + def state(self, value: PdrawState): with self._state_lock: self._state = value for event in self._state_wait_events[self._state]: event.set() self._state_wait_events[self._state] = [] - def wait(self, state, timeout=None): + def wait(self, state: PdrawState, timeout: Optional[float] = None) -> bool: """ Wait for the provided Pdraw state @@ -259,10 +269,7 @@ def wait(self, state, timeout=None): This function may block indefinitely when called without a timeout value. - :type state: PdrawState :param timeout: the timeout duration in seconds or None (the default) - :type timeout: float - :rtype: bool """ with self._state_lock: if self._state == state: @@ -277,14 +284,13 @@ def dispose(self): def destroy(self): self.callbacks_thread_loop.stop() try: - self.dispose().result_or_cancel(timeout=2.) + self.dispose().result_or_cancel(timeout=2.0) except FutureTimeoutError: self.logger.error("Pdraw.destroy() timedout") self.pdraw_thread_loop.stop() async def _adispose(self): - self.pdraw_thread_loop.unregister_cleanup( - self._adispose, ignore_error=True) + self.pdraw_thread_loop.unregister_cleanup(self._adispose, ignore_error=True) await self.aclose() if not self._stop(): return False @@ -303,12 +309,12 @@ def _stop(self): # cleanup some FDs from the callbacks thread loop that might # have been lost for stream in self.streams.values(): - if stream['video_queue_event'] is not None: - self.logger.warning( - "cleanup leftover pdraw callbacks eventfds") + if stream["video_queue_event"] is not None: + self.logger.warning("cleanup leftover pdraw callbacks eventfds") self.callbacks_thread_loop.remove_event_from_loop( - stream['video_queue_event']) - stream['video_queue_event'] = None + stream["video_queue_event"] + ) + stream["video_queue_event"] = None if res != 0: self.logger.error(f"cannot stop pdraw session: {os.strerror(-res)}") return False @@ -338,8 +344,7 @@ def _destroy_pdraw(self): ret = True if self.pdraw_demuxer: if not self.pdraw: - self.logger.error( - "Cannot destroy pdraw demuxer: a NULL pdraw session") + self.logger.error("Cannot destroy pdraw demuxer: a NULL pdraw session") return False self.logger.info("destroying pdraw demuxer...") res = od.pdraw_demuxer_destroy(self.pdraw, self.pdraw_demuxer) @@ -367,22 +372,32 @@ def _open_url(self): if self.resource_name.startswith("replay/"): if self.media_name is None: self.logger.error( - "Error media_name should be provided in video stream " - "replay mode" + "Error media_name should be provided in video stream " "replay mode" ) return False - res = od.pdraw_demuxer_new_from_url( - self.pdraw, - self.url, - self.demuxer_cbs, - ctypes.cast( - ctypes.pointer(ctypes.py_object(self)), ctypes.c_void_p), - ctypes.byref(self.pdraw_demuxer) - ) + + if self._controller and isinstance(self._controller._backend, CtrlBackendMuxIp): + res = od.pdraw_demuxer_new_from_url_on_mux( + self.pdraw, + self.url, + self._controller._backend._info.mux_ctx, + self.demuxer_cbs, + ctypes.cast(ctypes.pointer(ctypes.py_object(self)), ctypes.c_void_p), + ctypes.byref(self.pdraw_demuxer), + ) + else: + res = od.pdraw_demuxer_new_from_url( + self.pdraw, + self.url, + self.demuxer_cbs, + ctypes.cast(ctypes.pointer(ctypes.py_object(self)), ctypes.c_void_p), + ctypes.byref(self.pdraw_demuxer), + ) if res != 0: self.logger.error( - f"Error while opening pdraw url: {self.url} {os.strerror(-res)}") + f"Error while opening pdraw url: {self.url} {os.strerror(-res)}" + ) return False else: self.logger.info(f"Opening pdraw url OK: {self.url}") @@ -393,15 +408,10 @@ def _open_stream(self): """ Opening pdraw stream using an rtsp url """ - if self.state not in ( - PdrawState.Error, - PdrawState.Stopped, - PdrawState.Closed, - PdrawState.Created): + if self.state is not PdrawState.Opening: self.logger.warning(f"Cannot open stream from {self.state}") return False - self.state = PdrawState.Opening if not self.pdraw and not self._pdraw_new(): return False @@ -413,7 +423,15 @@ def _open_stream(self): def close(self): """ Close a playing or paused video stream session + + **Deprecated function** + warning:: + this function is deprecated, please use Pdraw.stop instead """ + warn( + "Pdraw.close is deprecated, " "please use Pdraw.stop instead", + DeprecationWarning, + ) return self.pdraw_thread_loop.run_async(self.aclose).result_or_cancel(5.0) async def aclose(self): @@ -426,23 +444,28 @@ async def aclose(self): self.state = PdrawState.Closed return True if self.state in ( - PdrawState.Opened, - PdrawState.Paused, - PdrawState.Playing, - PdrawState.Error): + PdrawState.Opened, + PdrawState.Paused, + PdrawState.Playing, + PdrawState.Error, + ): self.logger.debug(f"pdraw closing from the {self.state} state") self.state = PdrawState.Closing if not self._close_stream(): return False elif self.state is not PdrawState.Closing: return False - self.pdraw_thread_loop.run_delayed(1., self._close_waiter) + self.pdraw_thread_loop.run_delayed(1.0, self._close_waiter) async with self._close_condition: await self._close_condition.wait() if self.state is not PdrawState.Closed: self.logger.warning("Closing pdraw session timedout") # FIXME: workaround TRS-1052 self.state = PdrawState.Closed + + if self._proxy is not None: + self._proxy.close() + self._proxy = None return self.state is PdrawState.Closed async def _close_waiter(self): @@ -520,9 +543,8 @@ def _pdraw_new(self): res = od.pdraw_new( self.pomp_loop, self.pdraw_cbs, - ctypes.cast( - ctypes.pointer(ctypes.py_object(self)), ctypes.c_void_p), - ctypes.byref(self.pdraw) + ctypes.cast(ctypes.pointer(ctypes.py_object(self)), ctypes.c_void_p), + ctypes.byref(self.pdraw), ) if res != 0: self.logger.error(f"Error while creating pdraw interface: {res}") @@ -612,13 +634,14 @@ def _select_media(self, pdraw, demuxer, medias, count, userdata): if bool(medias[idx].is_default): default_media_id = medias[idx].media_id default_media_idx = idx - if (self.media_name is not None and - self.media_name == od.string_cast(medias[idx].name)): + if self.media_name is not None and self.media_name == od.string_cast( + medias[idx].name + ): selected_media_id = medias[idx].media_id selected_media_idx = idx if ( - self.media_name is not None and - od.string_cast(medias[selected_media_idx].name) != self.media_name + self.media_name is not None + and od.string_cast(medias[selected_media_idx].name) != self.media_name ): default_media_name = od.string_cast(medias[default_media_idx].name) self.logger.warning( @@ -626,10 +649,12 @@ def _select_media(self, pdraw, demuxer, medias, count, userdata): f"Selecting the default media instead: {default_media_name}" ) self.session_metadata = od.struct_vmeta_session.as_dict( - medias[default_media_idx].session_meta) + medias[default_media_idx].session_meta + ) else: self.session_metadata = od.struct_vmeta_session.as_dict( - medias[selected_media_idx].session_meta) + medias[selected_media_idx].session_meta + ) if selected_media_id: return 1 << selected_media_id elif default_media_id: @@ -656,11 +681,13 @@ def _media_added(self, pdraw, media_info, userdata): # store the information if it is supported and requested media # otherwise exit - if (frame_type != od.VDEF_FRAME_TYPE_RAW and - frame_type != od.VDEF_FRAME_TYPE_CODED): + if ( + frame_type != od.VDEF_FRAME_TYPE_RAW + and frame_type != od.VDEF_FRAME_TYPE_CODED + ): self.logger.warning( - f"Ignoring unsupported media id {id_} " - f"(type {video_info.format})") + f"Ignoring unsupported media id {id_} " f"(type {video_info.format})" + ) return requested_media = False @@ -671,8 +698,8 @@ def _media_added(self, pdraw, media_info, userdata): if not requested_media: self.logger.info( - f"Skipping non-requested media id {id_} " - f"(type {video_info.format})") + f"Skipping non-requested media id {id_} " f"(type {video_info.format})" + ) return self.streams[id_]["media_type"] = media_type @@ -680,7 +707,8 @@ def _media_added(self, pdraw, media_info, userdata): self.streams[id_]["vdef_format"] = vdef_format if frame_type == od.VDEF_FRAME_TYPE_CODED and ( - od.VDEF_CODED_DATA_FORMAT_BYTE_STREAM): + od.VDEF_CODED_DATA_FORMAT_BYTE_STREAM + ): outfile = self.outfiles[frame_type]["data"] if outfile: header = video_info.pdraw_coded_video_info_0.h264 @@ -690,15 +718,15 @@ def _media_added(self, pdraw, media_info, userdata): bytearray(header.pps), int(header.ppslen), ) - self.streams[id_]['h264_header'] = header - self.streams[id_]['track_id'] = outfile.add_track( + self.streams[id_]["h264_header"] = header + self.streams[id_]["track_id"] = outfile.add_track( type=od.MP4_TRACK_TYPE_VIDEO, name="video", enabled=1, in_movie=1, in_preview=1, ) - self.streams[id_]['metadata_track_id'] = outfile.add_track( + self.streams[id_]["metadata_track_id"] = outfile.add_track( type=od.MP4_TRACK_TYPE_METADATA, name="metadata", enabled=0, @@ -707,55 +735,63 @@ def _media_added(self, pdraw, media_info, userdata): ) outfile.ref_to_track( - self.streams[id_]['metadata_track_id'], - self.streams[id_]['track_id'] + self.streams[id_]["metadata_track_id"], + self.streams[id_]["track_id"], ) # start a video sink attached to the new media - video_sink_params = od.struct_pdraw_video_sink_params.bind(dict( - # drop buffers when the queue is full (buffer_queue_size > 0) - queue_max_count=self.buffer_queue_size, # buffer queue size - )) - self.streams[id_]['id_userdata'] = ctypes.cast( - ctypes.pointer(ctypes.py_object(id_)), ctypes.c_void_p) - self.streams[id_]['id'] = id_ - self.streams[id_]['video_sink_cbs'] = self.video_sink_vt[ - frame_type].cbs.bind({"flush": self._video_sink_flush}) + video_sink_params = od.struct_pdraw_video_sink_params.bind( + dict( + # drop buffers when the queue is full (buffer_queue_size > 0) + queue_max_count=self.buffer_queue_size, # buffer queue size + ) + ) + self.streams[id_]["id_userdata"] = ctypes.cast( + ctypes.pointer(ctypes.py_object(id_)), ctypes.c_void_p + ) + self.streams[id_]["id"] = id_ + self.streams[id_]["video_sink_cbs"] = self.video_sink_vt[frame_type].cbs.bind( + {"flush": self._video_sink_flush} + ) self.streams[id_]["frame_type"] = frame_type - self.streams[id_]['video_sink'] = od.POINTER_T( - self.video_sink_vt[frame_type].video_sink_type)() + self.streams[id_]["video_sink"] = od.POINTER_T( + self.video_sink_vt[frame_type].video_sink_type + )() res = self.video_sink_vt[frame_type].new( pdraw, id_, video_sink_params, - self.streams[id_]['video_sink_cbs'], - self.streams[id_]['id_userdata'], - ctypes.byref(self.streams[id_]['video_sink']) + self.streams[id_]["video_sink_cbs"], + self.streams[id_]["id_userdata"], + ctypes.byref(self.streams[id_]["video_sink"]), ) - if res != 0 or not self.streams[id_]['video_sink']: + if res != 0 or not self.streams[id_]["video_sink"]: self.logger.error("Unable to start video sink") return # Retrieve the queue belonging to the sink queue = self.video_sink_vt[frame_type].get_queue( pdraw, - self.streams[id_]['video_sink'], + self.streams[id_]["video_sink"], ) - self.streams[id_]['video_queue'] = queue + self.streams[id_]["video_queue"] = queue # Retrieve event object and related file descriptor res = self.video_sink_vt[frame_type].queue_get_event( - queue, ctypes.byref(self.streams[id_]['video_queue_event'])) - if res < 0 or not self.streams[id_]['video_queue_event']: - self.logger.error(f"Unable to get video sink queue event: {os.strerror(-res)}") + queue, ctypes.byref(self.streams[id_]["video_queue_event"]) + ) + if res < 0 or not self.streams[id_]["video_queue_event"]: + self.logger.error( + f"Unable to get video sink queue event: {os.strerror(-res)}" + ) return # add the file description to our pomp loop self.callbacks_thread_loop.add_event_to_loop( - self.streams[id_]['video_queue_event'], + self.streams[id_]["video_queue_event"], lambda *args: self._video_sink_queue_event(*args), - id_ + id_, ) def _media_removed(self, pdraw, media_info, userdata): @@ -775,31 +811,29 @@ def _media_removed(self, pdraw, media_info, userdata): self._media_removed_impl(id_) def _media_removed_impl(self, id_): - frame_type = self.streams[id_]['frame_type'] - with self.streams[id_]['video_sink_lock']: - if self.streams[id_]['video_queue_event']: + frame_type = self.streams[id_]["frame_type"] + with self.streams[id_]["video_sink_lock"]: + if self.streams[id_]["video_queue_event"]: self.callbacks_thread_loop.remove_event_from_loop( - self.streams[id_]['video_queue_event']).result_or_cancel( - timeout=5.) - self.streams[id_]['video_queue_event'] = None + self.streams[id_]["video_queue_event"] + ).result_or_cancel(timeout=5.0) + self.streams[id_]["video_queue_event"] = None - if not self.streams[id_]['video_sink']: + if not self.streams[id_]["video_sink"]: self.logger.error( - f"pdraw_video_sink for media_id {id_} " - f"has already been stopped" + f"pdraw_video_sink for media_id {id_} " f"has already been stopped" ) return res = self.video_sink_vt[frame_type].destroy( - self.pdraw, self.streams[id_]['video_sink']) + self.pdraw, self.streams[id_]["video_sink"] + ) if res < 0: self.logger.error(f"pdraw_stop_video_sink() returned {res}") else: - self.logger.debug( - f"_media_removed video sink destroyed id : {id_}" - ) - self.streams[id_]['video_queue'] = None - self.streams[id_]['video_sink'] = None - self.streams[id_]['video_sink_cbs'] = None + self.logger.debug(f"_media_removed video sink destroyed id : {id_}") + self.streams[id_]["video_queue"] = None + self.streams[id_]["video_sink"] = None + self.streams[id_]["video_sink_cbs"] = None def _end_of_range(self, pdraw, demuxer, timestamp, userdata): self.logger.info("_end_of_range") @@ -817,51 +851,56 @@ def _video_sink_flush_impl(self, id_): # FIXME: Workaround video_sink_flush called with destroyed media if not self.pdraw: self.logger.error( - f"_video_sink_flush called with a destroyed pdraw id : {id_}") + f"_video_sink_flush called with a destroyed pdraw id : {id_}" + ) return -errno.EINVAL # FIXME: Workaround video_sink_flush called with destroyed video queue - if not self.streams[id_]['video_queue']: + if not self.streams[id_]["video_queue"]: self.logger.error( - f"_video_sink_flush called with a destroyed queue id : {id_}") + f"_video_sink_flush called with a destroyed queue id : {id_}" + ) return -errno.EINVAL - with self.streams[id_]['video_sink_lock']: + with self.streams[id_]["video_sink_lock"]: self.logger.debug(f"flush_callback {id_}") - flush_callback = self.flush_callbacks[self.streams[id_]['frame_type']] + flush_callback = self.flush_callbacks[self.streams[id_]["frame_type"]] if flush_callback is not None: flushed = self.callbacks_thread_loop.run_async( - flush_callback, self.streams[id_]) + flush_callback, self.streams[id_] + ) try: - if not flushed.result_or_cancel(timeout=5.): + if not flushed.result_or_cancel(timeout=5.0): self.logger.error(f"video sink flush id {id_} error") except FutureTimeoutError: self.logger.error(f"video sink flush id {id_} timeout") # NOTE: If the user failed to flush its buffer at this point, # bad things WILL happen we're acknowledging the buffer flush # in all cases... - frame_type = self.streams[id_]['frame_type'] + frame_type = self.streams[id_]["frame_type"] res = self.video_sink_vt[frame_type].queue_flush( - self.streams[id_]['video_queue']) + self.streams[id_]["video_queue"] + ) if res < 0: self.logger.error( - f"mbuf_coded/raw_video_frame_queue_flush(): {os.strerror(-res)}") + f"mbuf_coded/raw_video_frame_queue_flush(): {os.strerror(-res)}" + ) else: self.logger.info( - f"mbuf_coded/raw_video_frame_queue_flush(): {os.strerror(-res)}") + f"mbuf_coded/raw_video_frame_queue_flush(): {os.strerror(-res)}" + ) res = self.video_sink_vt[frame_type].queue_flushed( - self.pdraw, self.streams[id_]['video_sink']) + self.pdraw, self.streams[id_]["video_sink"] + ) if res < 0: self.logger.error( - f"pdraw_coded/raw_video_sink_queue_flushed() " - f"returned {res}" + f"pdraw_coded/raw_video_sink_queue_flushed() " f"returned {res}" ) else: self.logger.debug( - f"pdraw_coded/raw_video_sink_queue_flushed() " - f"returned {res}" + f"pdraw_coded/raw_video_sink_queue_flushed() " f"returned {res}" ) return 0 @@ -875,34 +914,39 @@ def _video_sink_queue_event(self, pomp_evt, userdata): return # acknowledge event - res = od.pomp_evt_clear(self.streams[id_]['video_queue_event']) + res = od.pomp_evt_clear(self.streams[id_]["video_queue_event"]) if res != 0: - self.logger.error(f"Unable to clear frame received event: {os.strerror(-res)}") + self.logger.error( + f"Unable to clear frame received event: {os.strerror(-res)}" + ) if not self._is_ready_to_play: self.logger.debug("The stream is no longer ready: drop one frame") return # process all available buffers in the queue - with self.streams[id_]['video_sink_lock']: + with self.streams[id_]["video_sink_lock"]: while self._process_stream(id_): pass def _pop_stream_buffer(self, id_): - frame_type = self.streams[id_]['frame_type'] + frame_type = self.streams[id_]["frame_type"] mbuf_video_frame = od.POINTER_T( - self.video_sink_vt[frame_type].mbuf_video_frame_type)() + self.video_sink_vt[frame_type].mbuf_video_frame_type + )() res = self.video_sink_vt[frame_type].queue_pop( - self.streams[id_]['video_queue'], ctypes.byref(mbuf_video_frame) + self.streams[id_]["video_queue"], ctypes.byref(mbuf_video_frame) ) if res < 0: if res not in (-errno.EAGAIN, -errno.ENOENT): self.logger.error( - f"mbuf_coded_video_frame_queue_pop returned error: {os.strerror(-res)}") + f"mbuf_coded_video_frame_queue_pop returned error: {os.strerror(-res)}" + ) mbuf_video_frame = od.POINTER_T( - self.video_sink_vt[frame_type].mbuf_video_frame_type)() + self.video_sink_vt[frame_type].mbuf_video_frame_type + )() elif not mbuf_video_frame: - self.logger.error('mbuf_coded_video_frame_queue_pop returned NULL') + self.logger.error("mbuf_coded_video_frame_queue_pop returned NULL") return mbuf_video_frame def _process_stream(self, id_): @@ -915,7 +959,7 @@ def _process_stream(self, id_): mbuf_video_frame, id_, self.streams[id_], - self.get_session_metadata() + self.get_session_metadata(), ) try: self._process_stream_buffer(id_, video_frame) @@ -930,24 +974,24 @@ def _process_stream(self, id_): def _process_stream_buffer(self, id_, video_frame): stream = self.streams[id_] - frame_type = stream['frame_type'] + frame_type = stream["frame_type"] media_type = stream["media_type"] # write and/or send data over the requested channels # handle output files files = self.outfiles[frame_type] - f = files['meta'] + f = files["meta"] if f and not f.closed: vmeta_type, vmeta = video_frame.vmeta() - files['meta'].write(json.dumps({str(vmeta_type): vmeta}) + '\n') + files["meta"].write(json.dumps({str(vmeta_type): vmeta}) + "\n") - f = files['info'] + f = files["info"] if f and not f.closed: info = video_frame.info() - files['info'].write(json.dumps(info) + '\n') + files["info"].write(json.dumps(info) + "\n") - f = files['data'] + f = files["data"] if f and not f.closed: if frame_type == od.VDEF_FRAME_TYPE_CODED: @@ -958,46 +1002,40 @@ def _process_stream_buffer(self, id_, video_frame): if f.tell(track_id) == 0: now = time.time() f.set_decoder_config( - track_id, - h264_header, - video_frame.width, - video_frame.height + track_id, h264_header, video_frame.width, video_frame.height ) f.set_metadata_mime_type( metadata_track_id, od.VMETA_FRAME_PROTO_CONTENT_ENCODING, - od.VMETA_FRAME_PROTO_MIME_TYPE + od.VMETA_FRAME_PROTO_MIME_TYPE, ) f.add_track_metadata( track_id, "com.parrot.olympe.first_timestamp", - str(now * PDRAW_TIMESCALE) + str(now * PDRAW_TIMESCALE), ) f.add_track_metadata( track_id, "com.parrot.olympe.resolution", f"{video_frame.width}x{video_frame.height}", ) - f.add_coded_frame( - track_id, metadata_track_id, video_frame) + f.add_coded_frame(track_id, metadata_track_id, video_frame) else: frame_array = video_frame.as_ndarray() if frame_array is not None: - f.write(ctypes.string_at( - frame_array.ctypes.data_as( - ctypes.POINTER(ctypes.c_ubyte)), - frame_array.size, - )) + f.write( + ctypes.string_at( + frame_array.ctypes.data_as(ctypes.POINTER(ctypes.c_ubyte)), + frame_array.size, + ) + ) # call callbacks when existing cb = self.frame_callbacks[media_type] if cb is not None: cb(video_frame) - def set_output_files(self, - video=None, - metadata=None, - info=None): + def set_output_files(self, video=None, metadata=None, info=None): """ Records the video stream session to the disk @@ -1012,12 +1050,14 @@ def set_output_files(self, """ if self.state is PdrawState.Playing: raise RuntimeError( - 'Cannot set video streaming files while streaming is on.') + "Cannot set video streaming files while streaming is on." + ) for frame_type, data_type, filepath, attrib in ( - (od.VDEF_FRAME_TYPE_CODED, 'data', video, 'wb'), - (od.VDEF_FRAME_TYPE_CODED, 'meta', metadata, 'w'), - (od.VDEF_FRAME_TYPE_CODED, 'info', info, 'w')): + (od.VDEF_FRAME_TYPE_CODED, "data", video, "wb"), + (od.VDEF_FRAME_TYPE_CODED, "meta", metadata, "w"), + (od.VDEF_FRAME_TYPE_CODED, "info", info, "w"), + ): if self.outfiles[frame_type][data_type]: self.outfiles[frame_type][data_type].close() self.outfiles[frame_type][data_type] = None @@ -1029,15 +1069,17 @@ def set_output_files(self, self.outfiles[frame_type][data_type] = open(filepath, attrib) self.outfiles[frame_type][data_type].close() - def set_callbacks(self, - h264_cb=None, - h264_avcc_cb=None, - h264_bytestream_cb=None, - raw_cb=None, - start_cb=None, - end_cb=None, - flush_h264_cb=None, - flush_raw_cb=None): + def set_callbacks( + self, + h264_cb=None, + h264_avcc_cb=None, + h264_bytestream_cb=None, + raw_cb=None, + start_cb=None, + end_cb=None, + flush_h264_cb=None, + flush_raw_cb=None, + ): """ Set the callback functions that will be called when a new video stream frame is available, when the video stream starts/ends or when the video @@ -1087,34 +1129,38 @@ def set_callbacks(self, h264_avcc_cb = h264_avcc_cb or h264_cb for media_type, cb in ( - ((od.VDEF_FRAME_TYPE_CODED, od.VDEF_CODED_DATA_FORMAT_AVCC), - h264_avcc_cb), - ((od.VDEF_FRAME_TYPE_CODED, od.VDEF_CODED_DATA_FORMAT_BYTE_STREAM), - h264_bytestream_cb), - ((od.VDEF_FRAME_TYPE_RAW, None), - raw_cb)): - self.frame_callbacks[media_type] = callback_decorator( - logger=self.logger)(cb) - for frame_type, cb in ((od.VDEF_FRAME_TYPE_CODED, flush_h264_cb), - (od.VDEF_FRAME_TYPE_RAW, flush_raw_cb)): - self.flush_callbacks[frame_type] = callback_decorator( - logger=self.logger)(cb) + ((od.VDEF_FRAME_TYPE_CODED, od.VDEF_CODED_DATA_FORMAT_AVCC), h264_avcc_cb), + ( + (od.VDEF_FRAME_TYPE_CODED, od.VDEF_CODED_DATA_FORMAT_BYTE_STREAM), + h264_bytestream_cb, + ), + ((od.VDEF_FRAME_TYPE_RAW, None), raw_cb), + ): + self.frame_callbacks[media_type] = callback_decorator(logger=self.logger)( + cb + ) + for frame_type, cb in ( + (od.VDEF_FRAME_TYPE_CODED, flush_h264_cb), + (od.VDEF_FRAME_TYPE_RAW, flush_raw_cb), + ): + self.flush_callbacks[frame_type] = callback_decorator(logger=self.logger)( + cb + ) self.start_callback = callback_decorator(logger=self.logger)(start_cb) self.end_callback = callback_decorator(logger=self.logger)(end_cb) def _open_output_files(self): - self.logger.debug('opening video output files') + self.logger.debug("opening video output files") for frame_type, data in self.outfiles.items(): for data_type, f in data.items(): if f and f.closed: - if data_type == "data" and ( - frame_type == od.VDEF_FRAME_TYPE_CODED): + if data_type == "data" and (frame_type == od.VDEF_FRAME_TYPE_CODED): self.outfiles[frame_type][data_type] = Mp4Mux(f.name) else: self.outfiles[frame_type][data_type] = open(f.name, f.mode) def _close_output_files(self): - self.logger.debug('closing video output files') + self.logger.debug("closing video output files") for files in self.outfiles.values(): for f in files.values(): if f: @@ -1130,9 +1176,9 @@ def stop(self, timeout=5): """ Stops the video stream """ - return self.pdraw_thread_loop.run_async( - self.astop - ).result_or_cancel(timeout=timeout) + return self.pdraw_thread_loop.run_async(self.astop).result_or_cancel( + timeout=timeout + ) async def astop(self): """ @@ -1143,11 +1189,12 @@ async def astop(self): return await self.aclose() def play( - self, - url=None, - media_name="DefaultVideo", - resource_name="live", - timeout=5): + self, + url: Optional[str] = None, + media_name: str = "DefaultVideo", + resource_name: str = "live", + timeout: Optional[float] = 5, + ): """ Play a video @@ -1164,11 +1211,9 @@ def play( `Pdraw.play(url="file://~/Videos/100000010001.MP4")`. :param url: rtsp or local file video URL - :type url: str :param media_name: name of the media/track (defaults to "DefaultVideo"). If the provided media name is not available from the requested video stream, the default media is selected instead. - :type media_name: str """ return self.pdraw_thread_loop.run_async( @@ -1176,55 +1221,62 @@ def play( url=url, media_name=media_name, resource_name=resource_name, - timeout=timeout + timeout=timeout, ).result_or_cancel(timeout=timeout) + async def _format_url(self, url): + if url is None: + self._proxy = await self._controller.fopen_tcp_proxy(554) + self.url = b"rtsp://%s:%d/%s" % ( + self._proxy.address.encode(), + self._proxy.port, + self.resource_name.encode(), + ) + else: + if isinstance(url, bytes): + url = url.decode("utf-8") + if url.startswith("file://"): + url = url[7:] + if url.startswith("~/"): + url = os.path.expanduser(url) + url = os.path.expandvars(url) + url = url.encode("utf-8") + self.url = url + async def aplay( - self, - url=None, - media_name="DefaultVideo", - resource_name="live", - timeout=5): + self, url=None, media_name="DefaultVideo", resource_name="live", timeout=5 + ): if self.pdraw is None: self.logger.error("Error Pdraw interface seems to be destroyed") return False - if self.state in (PdrawState.Opening, PdrawState.Closing): - self.logger.error( - f"Cannot play stream from the {self.state} state") + if self.state not in (PdrawState.Closed, PdrawState.Created): + self.logger.error(f"Cannot play stream from the {self.state} state") return False + self.state = PdrawState.Opening self.resource_name = resource_name self.media_name = media_name - - if url is None: - self.url = b"rtsp://%s/%s" % ( - self.server_addr.encode(), self.resource_name.encode()) - else: - if isinstance(url, bytes): - url = url.decode('utf-8') - if url.startswith('file://'): - url = url[7:] - if url.startswith('~/'): - url = os.path.expanduser(url) - url = os.path.expandvars(url) - url = url.encode('utf-8') - self.url = url + # format url + await self._format_url(url) # reset session metadata from any previous session self.session_metadata = {} self.streams = defaultdict(StreamFactory) self._open_output_files() - if self.state in (PdrawState.Created, PdrawState.Closed): - self.pdraw_thread_loop.run_delayed(timeout, self._open_waiter) - if not self._open_stream(): - return False - async with self._open_condition: - await self._open_condition.wait() - if self.state is not PdrawState.Opened: - return False + + # open + self.pdraw_thread_loop.run_delayed(timeout, self._open_waiter) + if not self._open_stream(): + return False + async with self._open_condition: + await self._open_condition.wait() + if self.state is not PdrawState.Opened: + return False + + # play self.pdraw_thread_loop.run_delayed(timeout, self._play_waiter) async with self._play_condition: await self._play_condition.wait() @@ -1289,6 +1341,23 @@ def _pause_impl(self): return False return True + def resume(self, timeout=5): + """ + Resumes the currently paused video + """ + + return self.pdraw_thread_loop.run_async(self.aresume, timeout=timeout) + + async def aresume(self, timeout=5): + """ + Resumes the currently paused video + """ + + self.pdraw_thread_loop.run_delayed(timeout, self._play_waiter) + async with self._play_condition: + await self._play_condition.wait() + return self.state is PdrawState.Playing + def get_session_metadata(self): """ Returns a dictionary of video stream session metadata diff --git a/src/olympe/video/renderer.py b/src/olympe/video/renderer.py index 678ebe5..d105755 100644 --- a/src/olympe/video/renderer.py +++ b/src/olympe/video/renderer.py @@ -65,9 +65,7 @@ def _async_init(self): sdl2.SDL_WINDOWPOS_UNDEFINED, self.width, self.height, - sdl2.SDL_WINDOW_OPENGL - | sdl2.SDL_WINDOW_RESIZABLE - | sdl2.SDL_WINDOW_UTILITY, + sdl2.SDL_WINDOW_OPENGL, ) # Create an OpenGL context @@ -218,6 +216,7 @@ def init(self, *, pdraw, media_id=0): self._render_zone = od.struct_pdraw_rect(0, 0, self.width, self.height) self._renderer_params = od.struct_pdraw_video_renderer_params.bind( { + "scheduling_mode": od.PDRAW_VIDEO_RENDERER_SCHEDULING_MODE_ADAPTIVE, "fill_mode": od.PDRAW_VIDEO_RENDERER_FILL_MODE_FIT_PAD_BLUR_EXTEND, "enable_transition_flags": od.PDRAW_VIDEO_RENDERER_TRANSITION_FLAG_ALL, "enable_hmd_distortion_correction": 0,