From 3f55e745b7c9e80bfac32fd75530940c6cd8370c Mon Sep 17 00:00:00 2001 From: Nicolas Dessart Date: Wed, 14 Dec 2022 15:35:19 +0100 Subject: [PATCH] [INTEGRATION] prepare 7.5.0 rc1 Change-Id: I4345c39a954269733e8a7015050e5749d7a2fda2 --- README.md | 13 ++- atom.mk | 19 ++++- pyproject.toml | 4 +- src/olympe/arsdkng/cmd_itf.py | 19 ++++- src/olympe/arsdkng/controller.py | 51 +++++++---- src/olympe/arsdkng/device.py | 2 +- src/olympe/arsdkng/discovery.py | 123 +++++++++++---------------- src/olympe/arsdkng/expectations.py | 86 ++++++++++++++----- src/olympe/arsdkng/messages.py | 98 +++++++++------------ src/olympe/arsdkng/proto.py | 4 +- src/olympe/concurrent/__init__.py | 58 +++++++++---- src/olympe/concurrent/_task.py | 5 +- src/olympe/concurrent/future.py | 14 +-- src/olympe/doc/examples/streaming.py | 2 +- src/olympe/doc/installation.rst | 4 +- src/olympe/expectations.py | 116 +++++++++++++++++++------ src/olympe/http.py | 14 ++- src/olympe/media.py | 82 ++++++++++++++++-- src/olympe/mixins/media.py | 4 + src/olympe/module_loader.py | 2 +- src/olympe/networking.py | 24 ++++-- src/olympe/subscriber.py | 12 +-- src/olympe/utils/__init__.py | 2 - src/olympe/video/frame.py | 4 +- 24 files changed, 503 insertions(+), 259 deletions(-) diff --git a/README.md b/README.md index 32cab8f..36898ee 100644 --- a/README.md +++ b/README.md @@ -7,28 +7,27 @@ simulator) but may also be used to connect to physical drones. Like GroundSDK-iO GroundSDK-Android, Olympe is based on arsdk-ng/arsdk-xml. Olympe is part of the [Parrot Ground SDK](https://developer.parrot.com/) which allows any developer -to create its own mobile or desktop application for ANAFI and ANAFI Thermal drones. +to create its own mobile or desktop application for ANAFI, ANAFI Thermal and ANAFI AI drones. -## [Olympe Documentation](https://developer.parrot.com/docs/olympe/) +## [Olympe Documentation](https://developer.parrot.com/docs/olympe/installation.html) * **[Olympe - Installation](https://developer.parrot.com/docs/olympe/installation.html)** * **[Olympe - User guide](https://developer.parrot.com/docs/olympe/userguide.html)** * **[Olympe - API Reference](https://developer.parrot.com/docs/olympe/olympeapi.html)** * **[Olympe - SDK Messages Reference](https://developer.parrot.com/docs/olympe/arsdkng.html)** -## [Sphinx Documentation](https://developer.parrot.com/docs/sphinx/) +## [Sphinx Documentation](https://developer.parrot.com/docs/sphinx/installation.html) -* **[Sphinx - System requirements](https://developer.parrot.com/docs/sphinx/system-requirements.html)** +* **[Sphinx - System requirements](https://developer.parrot.com/docs/sphinx/system_requirements.html)** * **[Sphinx - Installation](https://developer.parrot.com/docs/sphinx/installation.html)** -* **[Sphinx - Quick Start guide](https://developer.parrot.com/docs/sphinx/firststep.html)** +* **[Sphinx - Quick Start guide](https://developer.parrot.com/docs/sphinx/quickstart.html)** ## [Parrot developers forums](https://forum.developer.parrot.com/categories) -* **Olympe:** https://forum.developer.parrot.com/c/anafi/olympe -* **Sphinx:** https://forum.developer.parrot.com/c/sphinx * **Parrot Anafi:** https://forum.developer.parrot.com/c/anafi/ +* **Parrot Anafi Ai:** https://forum.developer.parrot.com/c/anafi-ai/ ## License diff --git a/atom.mk b/atom.mk index c663add..b2e9d4e 100644 --- a/atom.mk +++ b/atom.mk @@ -5,7 +5,8 @@ include $(CLEAR_VARS) LOCAL_MODULE := olympe-base LOCAL_CATEGORY_PATH := libs LOCAL_DESCRIPTION := Olympe pure python module -LOCAL_DEPENDS_MODULES := python arsdkparser +LOCAL_DEPENDS_MODULES := python arsdkparser parrot-protobuf-extensions-proto protobuf-base +LOCAL_LIBRARIES := arsdkparser parrot-protobuf-extensions-proto PRIVATE_OLYMPE_OUT_DIR=$(TARGET_OUT_STAGING)$(shell echo $${TARGET_DEPLOY_ROOT:-/usr}) @@ -24,6 +25,22 @@ LOCAL_COPY_FILES := \ $(__f):$(PRIVATE_OLYMPE_OUT_DIR)/lib/python/site-packages/$(strip $(patsubst src/%, %, $(__f))) \ ) +# Install .proto files in python site-packages/olympe_protobuf staging directory +PRIVATE_OLYMPE_PROTOBUF_SRC_DIRS := $(PRIVATE_OLYMPE_OUT_DIR)/share/protobuf:$(PRIVATE_OLYMPE_OUT_DIR)/lib/python/site-packages +PRIVATE_OLYMPE_PROTOBUF_DST_DIR := $(PRIVATE_OLYMPE_OUT_DIR)/lib/python/site-packages/olympe_protobuf +define LOCAL_CMD_POST_INSTALL + while read -d ':' src_dir; do \ + protobuf_src_files=$$(find $$src_dir -type f -name '*.proto'); \ + protobuf_dst_files=$$(echo $$protobuf_src_files | xargs -I{} -d' ' bash -c "echo \"{}\" | \ + sed 's#\s*$$src_dir#$(PRIVATE_OLYMPE_PROTOBUF_DST_DIR)#g'"); \ + while read -ra src <&3 && read -ra dst <&4; do \ + echo "$$src is in $$dst"; \ + install -Dp -m0660 $$src $$dst; \ + done 3<<<"$$protobuf_src_files" 4<<<"$$protobuf_dst_files"; \ + done <<< $(PRIVATE_OLYMPE_PROTOBUF_SRC_DIRS):; \ + echo $$protobuf_files; +endef + include $(BUILD_CUSTOM) include $(CLEAR_VARS) diff --git a/pyproject.toml b/pyproject.toml index 4a23c5c..cd9741d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -46,6 +46,7 @@ dependencies = [ "importlib_metadata; python_version < '3.8'", "numpy", "typing-protocol; python_version < '3.8'", + "protobuf==3.19.4", "pycryptodomex", "pytz", "PyYAML", @@ -87,11 +88,10 @@ olympe = "olympe.app:main" top-levels = [ "olympe", "olympe_deps", + "olympe_protobuf", "arsdkparser.py", "arsdk", - "google", "logness", - "parrot", "ulog.py", "_ulog" ] diff --git a/src/olympe/arsdkng/cmd_itf.py b/src/olympe/arsdkng/cmd_itf.py index ec76591..d3ab609 100644 --- a/src/olympe/arsdkng/cmd_itf.py +++ b/src/olympe/arsdkng/cmd_itf.py @@ -167,6 +167,14 @@ def matched_events(self): def unmatched_events(self): self._disconnected.unmatched_events() + def cancel(self): + disconnected_cancel = self._disconnected.cancel() + return super().cancel() or disconnected_cancel + + def cancelled(self): + disconnected_cancelled = self._disconnected.cancelled() + return super().cancelled() and disconnected_cancelled + class Connect(Expectation): @@ -207,6 +215,14 @@ def matched_events(self): def unmatched_events(self): self._connected.unmatched_events() + def cancel(self): + connected_cancel = self._connected.cancel() + return super().cancel() or connected_cancel + + def cancelled(self): + connected_cancelled = self._connected.cancelled() + return super().cancelled() and connected_cancelled + class CommandInterfaceBase(LogMixin, AbstractScheduler): def __init__(self, *, name=None, drone_type=0, proto_v_min=1, proto_v_max=3, **kwds): @@ -244,7 +260,7 @@ def __init__(self, *, name=None, drone_type=0, proto_v_min=1, proto_v_max=3, **k self._disconnect_future = None self._declare_callbacks() self._thread_loop.register_cleanup(self.destroy) - self.subscribe( + self._drone_manager_subscriber = self.subscribe( self._on_connection_state_changed, drone_manager.connection_state() ) @@ -573,6 +589,7 @@ def destroy(self): if self._thread_loop is not None: self._thread_loop.unregister_cleanup(self.destroy, ignore_error=True) self._on_device_removed() + self._drone_manager_subscriber.unsubscribe() self._scheduler.destroy() if self._thread_loop is not None: self._thread_loop.stop() diff --git a/src/olympe/arsdkng/controller.py b/src/olympe/arsdkng/controller.py index 644e536..35bd1c8 100644 --- a/src/olympe/arsdkng/controller.py +++ b/src/olympe/arsdkng/controller.py @@ -54,8 +54,12 @@ class PilotingCommand: - def __init__(self): + def __init__(self, time_function=None): self.set_default_piloting_command() + if time_function: + self.time_function = time_function + else: + self.time_function = time.time def update_piloting_command(self, roll, pitch, yaw, gaz, piloting_time): self.roll = roll @@ -63,7 +67,7 @@ def update_piloting_command(self, roll, pitch, yaw, gaz, piloting_time): self.yaw = yaw self.gaz = gaz self.piloting_time = piloting_time - self.initial_time = datetime.datetime.now() + self.initial_time = self.time_function() def set_default_piloting_command(self): self.roll = 0 @@ -92,7 +96,11 @@ def __init__(self, self._ip_addr = ip_addr.encode('utf-8') self._is_skyctrl = is_skyctrl self._piloting = False - self._piloting_command = PilotingCommand() + self._time_function = time_function + + self._piloting_command = PilotingCommand( + time_function=self._time_function) + self._backend_type = backend if backend is BackendType.Net: self._backend_class = CtrlBackendNet @@ -108,8 +116,10 @@ def __init__(self, proto_v_min=1, proto_v_max=3, device_addr=self._ip_addr) - if time_function is not None: - self._scheduler.set_time_function(time_function) + + if self._time_function is not None: + self._scheduler.set_time_function(self._time_function) + self._connected_future = None self._last_disconnection_time = None # Setup piloting commands timer @@ -259,6 +269,8 @@ def _disconnection_impl(self): if not self.connected: return f.set_result(True) + if self._disconnect_future is not None and not self._disconnect_future.done(): + self._disconnect_future.cancel() self._disconnect_future = f res = od.arsdk_device_disconnect(self._device.arsdk_device) if res != 0: @@ -332,10 +344,10 @@ def _send_piloting_command(self): if self._piloting_command.piloting_time: # Check if piloting time since last pcmd order has been reached diff_time = ( - datetime.datetime.now() - + self._piloting_command.time_function() - self._piloting_command.initial_time ) - if diff_time.total_seconds() >= self._piloting_command.piloting_time: + if diff_time >= self._piloting_command.piloting_time: self._piloting_command.set_default_piloting_command() # Flag to activate movement on roll and pitch. 1 activate, 0 deactivate @@ -439,10 +451,11 @@ def piloting_pcmd(self, roll, pitch, yaw, gaz, piloting_time): ) return self.piloting(roll, pitch, yaw, gaz, piloting_time) - async def _async_discover_device(self): + async def _async_discover_device(self, deadline): # Try to identify the device type we are attempting to connect to... await self._backend.ready() - discovery = self._discovery_class(self._backend, ip_addr=self._ip_addr) + timeout = (deadline - time.time()) / 2 + discovery = self._discovery_class(self._backend, ip_addr=self._ip_addr, timeout=timeout) device = await discovery.async_get_device() if device is not None: return device, discovery @@ -451,16 +464,17 @@ async def _async_discover_device(self): self.logger.info(f"Net discovery failed for {self._ip_addr}") self.logger.info(f"Trying 'NetRaw' discovery for {self._ip_addr} ...") assert await discovery.async_stop() - discovery = DiscoveryNetRaw(self._backend, ip_addr=self._ip_addr) + timeout = (deadline - time.time()) / 4 + discovery = DiscoveryNetRaw(self._backend, ip_addr=self._ip_addr, timeout=timeout) device = await discovery.async_get_device() if device is None: await discovery.async_stop() return device, discovery - async def _async_get_device(self): + async def _async_get_device(self, deadline): if self._device is not None: return True - device, discovery = await self._async_discover_device() + device, discovery = await self._async_discover_device(deadline) if device is None: self.logger.info(f"Unable to discover the device: {self._ip_addr}") @@ -490,10 +504,12 @@ def _connect_impl(self, deadline): device_id = b"" device_conn_cfg = od.struct_arsdk_device_conn_cfg( - ctypes.create_string_buffer(b"arsdk-ng"), ctypes.create_string_buffer(b"desktop"), + ctypes.create_string_buffer(b"olympe"), ctypes.create_string_buffer(b"desktop"), ctypes.create_string_buffer(bytes(device_id)), ctypes.create_string_buffer(req)) # Send connection command + if self._connect_future is not None and not self._connect_future.done(): + self._connect_future.cancel() self._connect_future = Future(self._thread_loop) res = od.arsdk_device_connect( self._device.arsdk_device, @@ -528,8 +544,11 @@ async def _do_connect(self, timeout, retry): self.logger.error(f"'{self._ip_addr_str} connection timed out") return False self.logger.debug(f"Discovering device {self._ip_addr_str} ...") - if not await self._async_get_device(): + if not await self._async_get_device(deadline): self.logger.debug(f"Discovering device {self._ip_addr_str} failed") + if deadline < (time.time() + backoff): + self.logger.error(f"'{self._ip_addr_str} connection (would) have timed out") + return False await self._thread_loop.asleep(backoff) backoff *= 2. continue @@ -673,12 +692,12 @@ def _on_device_removed(self): self._stop_piloting_impl() self._disconnection_impl() self._last_disconnection_time = time.time() - self._piloting_command = PilotingCommand() + self._piloting_command = PilotingCommand(time_function=self._time_function) super()._on_device_removed() def _reset_instance(self): self._piloting = False - self._piloting_command = PilotingCommand() + self._piloting_command = PilotingCommand(time_function=self._time_function) self._device = None self._device_name = None self._discovery = None diff --git a/src/olympe/arsdkng/device.py b/src/olympe/arsdkng/device.py index 960dc27..7690e4f 100644 --- a/src/olympe/arsdkng/device.py +++ b/src/olympe/arsdkng/device.py @@ -320,7 +320,6 @@ def stop(self): if res < 0: self.logger.error(f"arsdk_backend_mux_stop_listen: {res}") ret = False - self._backend.destroy() self.connected = False self._listening = False return ret @@ -352,3 +351,4 @@ def destroy(self): """ self.stop() super().destroy() + self._backend.destroy() diff --git a/src/olympe/arsdkng/discovery.py b/src/olympe/arsdkng/discovery.py index 0461fa0..16de058 100644 --- a/src/olympe/arsdkng/discovery.py +++ b/src/olympe/arsdkng/discovery.py @@ -33,16 +33,15 @@ import logging import olympe_deps as od import queue -import socket import time import typing from . import DeviceInfo, DEVICE_TYPE_LIST from .backend import CtrlBackend, CtrlBackendNet, CtrlBackendMuxIp, DeviceHandler from olympe.concurrent import Future, Loop, TimeoutError, CancelledError +from olympe.networking import TcpClient from abc import ABC, abstractmethod from collections import OrderedDict -from contextlib import closing from olympe.utils import callback_decorator @@ -50,7 +49,7 @@ class Discovery(ABC, DeviceHandler): timeout = 3.0 - def __init__(self): + def __init__(self, *, timeout: typing.Optional[float] = None): self._backend: CtrlBackend self._thread_loop: Loop self.logger: logging.Logger @@ -59,6 +58,10 @@ def __init__(self): self.userdata = ctypes.c_void_p() self.discovery = None + if timeout is None: + timeout = Discovery.timeout + self.timeout: float = timeout + self.deadline: float = 0.0 @property def discovery_name(self) -> str: @@ -130,6 +133,7 @@ def stop(self) -> bool: self._device_queue = queue.Queue() def async_start(self) -> "Future": + self.deadline = time.time() + self.timeout return self._thread_loop.run_async(self._do_start) def async_stop(self) -> "Future": @@ -173,7 +177,9 @@ def _do_stop(self) -> bool: @callback_decorator() def _device_added_cb( - self, arsdk_device: "od.POINTER_T[od.struct_arsdk_device]", _user_data: "od.POINTER_T[None]" + self, + arsdk_device: "od.POINTER_T[od.struct_arsdk_device]", + _user_data: "od.POINTER_T[None]", ) -> None: """ Called when a new device is detected. @@ -188,7 +194,9 @@ def _device_added_cb( @callback_decorator() def _device_removed_cb( - self, arsdk_device: "od.POINTER_T[od.struct_arsdk_device]", _user_data: "od.POINTER_T[None]" + self, + arsdk_device: "od.POINTER_T[od.struct_arsdk_device]", + _user_data: "od.POINTER_T[None]", ) -> None: """ Called when a device disappear from the discovery search @@ -220,15 +228,11 @@ def _destroy(self) -> None: def destroy(self) -> None: self._thread_loop.run_later(self._destroy) - async def async_devices( - self, timeout: float = None - ) -> typing.AsyncGenerator[DeviceInfo, None]: - if not self.start(): + async def async_devices(self) -> typing.AsyncGenerator[DeviceInfo, None]: + if not await self.async_start(): + self.logger.error("async_start false") return - if timeout is None: - timeout = self.timeout - deadline = time.time() + timeout - while deadline > time.time(): + while self.deadline > time.time(): try: yield self._device_queue.get_nowait() except queue.Empty: @@ -238,48 +242,34 @@ async def async_devices( await self.async_stop() raise - async def async_get_device_count( - self, max_count: int, timeout: float = None - ) -> typing.List[DeviceInfo]: + async def async_get_device_count(self, max_count: int) -> typing.List[DeviceInfo]: devices: typing.List[DeviceInfo] = [] - if timeout is None: - timeout = self.timeout if max_count <= 0: return devices count = 0 - async for device in self.async_devices(timeout=timeout): + async for device in self.async_devices(): devices.append(device) count += 1 if count == max_count: break return devices - async def async_get_device( - self, timeout: float = None - ) -> typing.Optional[DeviceInfo]: - if timeout is None: - timeout = self.timeout - async for device in self.async_devices(timeout=timeout): + async def async_get_device(self) -> typing.Optional[DeviceInfo]: + async for device in self.async_devices(): return device return None def get_device_count( - self, max_count: int, timeout: float = None + self, max_count: int ) -> typing.Optional[typing.List[DeviceInfo]]: - if timeout is None: - timeout = self.timeout - t = self._thread_loop.run_async( - self.async_get_device_count, max_count, timeout=timeout - ) + t = self._thread_loop.run_async(self.async_get_device_count, max_count) try: - return t.result_or_cancel(timeout=timeout) + return t.result_or_cancel(timeout=self.timeout) except concurrent.futures.TimeoutError: return None - def get_device(self, timeout: float = None) -> typing.Optional[DeviceInfo]: - if timeout is None: - timeout = self.timeout - devices = self.get_device_count(max_count=1, timeout=timeout) + def get_device(self) -> typing.Optional[DeviceInfo]: + devices = self.get_device_count(max_count=1) if not devices: return None else: @@ -293,13 +283,14 @@ def __init__( *, ip_addr: str, device_types: typing.Optional[typing.List[int]] = None, + timeout: typing.Optional[float] = None, **kwds, ): self._backend: CtrlBackendNet = backend self._thread_loop = self._backend._thread_loop self._thread_loop.register_cleanup(self._destroy) self.logger = self._backend.logger - super().__init__() + super().__init__(timeout=timeout) if device_types is None: device_types = DEVICE_TYPE_LIST self._device_types = device_types @@ -354,13 +345,14 @@ def __init__( *, ip_addr: str, check_port: typing.Optional[bool] = True, + timeout: typing.Optional[float] = None, **kwds, ): self._backend: CtrlBackendNet = backend self._thread_loop = self._backend._thread_loop self._thread_loop.register_cleanup(self._destroy) self.logger = self._backend.logger - super().__init__() + super().__init__(timeout=timeout) self._check_port = check_port devices = kwds.pop("devices", None) self._raw_devices = [] @@ -382,21 +374,18 @@ def __init__( if device.ip_addr == "192.168.53.1": device.type = od.ARSDK_DEVICE_TYPE_SKYCTRL_3 device.name = "Skycontroller 3" - elif ( - device.ip_addr - in ( - "192.168.42.1", - "192.168.43.1", - ) - or device.ip_addr.startswith("10.202.0.") - ): + elif device.ip_addr in ( + "192.168.42.1", + "192.168.43.1", + ) or device.ip_addr.startswith("10.202.0."): device.type = od.ARSDK_DEVICE_TYPE_ANAFI4K device.name = "ANAFI-{}".format(7 * "X") - def start(self) -> bool: - super().start() + async def _do_start(self) -> bool: + if not await super()._do_start(): + return False for device in self._raw_devices: - self._add_device(device) + await self._add_device(device) return True def _create_discovery(self) -> "od.POINTER_T[od.struct_arsdk_discovery]": @@ -423,32 +412,19 @@ def _stop_discovery(self) -> int: def _destroy_discovery(self) -> int: return od.arsdk_discovery_destroy(self.discovery) - def _add_device(self, device: DeviceInfo) -> None: + async def _add_device(self, device: DeviceInfo) -> None: if self._check_port: - # check that the device port is opened - with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock: - sock.settimeout(self.timeout) - try: - res = sock.connect_ex((device.ip_addr, device.port)) - except OSError: - self.logger.debug( - f"{self.discovery_name}: {device.ip_addr} is unreachable" - ) - return - if res != 0: - self.logger.debug( - f"{self.discovery_name}: {device.ip_addr}:{device.port} is" - " closed" - ) + timeout = self.deadline - time.time() + client = TcpClient(self._thread_loop) + try: + if not await client.aconnect( + device.ip_addr, device.port, timeout=timeout + ): return + finally: + await client.adestroy() # add this device to the "discovered" devices - f = self._thread_loop.run_async(self._do_add_device, device) - try: - f.result_or_cancel(timeout=self.timeout) - except concurrent.futures.TimeoutError: - self.logger.error( - f"{self.discovery_name}: timedout for {device.ip_addr}:{device.port}" - ) + self._do_add_device(device) @callback_decorator() def _do_add_device(self, device: DeviceInfo) -> None: @@ -471,13 +447,14 @@ def __init__( self, backend: CtrlBackendMuxIp, device_types: typing.Optional[typing.List[int]] = None, + timeout: typing.Optional[float] = None, **kwds, ): self._backend: CtrlBackendMuxIp = backend self._thread_loop = self._backend._thread_loop self._thread_loop.register_cleanup(self._destroy) self.logger = self._backend.logger - super().__init__() + super().__init__(timeout=timeout) if device_types is None: device_types = DEVICE_TYPE_LIST self._device_types = device_types diff --git a/src/olympe/arsdkng/expectations.py b/src/olympe/arsdkng/expectations.py index 6299a55..3b45c9e 100644 --- a/src/olympe/arsdkng/expectations.py +++ b/src/olympe/arsdkng/expectations.py @@ -30,7 +30,7 @@ import pprint from abc import abstractmethod -from concurrent.futures import Future +from olympe.concurrent import CancelledError, Future from collections import OrderedDict from collections.abc import Iterable, Mapping from olympe.utils import ( @@ -379,6 +379,13 @@ def timedout(self): self.set_timedout() return super().timedout() + def cancel(self): + if self._command_future is not None and not self._command_future.done(): + # Futures associated to running callbacks are non-cancellable + # We have to use Future.set_exception() here instead. + self._command_future.set_exception(CancelledError()) + return super().cancel() + def cancelled(self): if super().cancelled(): return True @@ -407,14 +414,17 @@ def check(self, received_event, *args, **kwds): if self.success(): return self self._check_command_event(received_event) - if ( - self._command_future is None - or not self._command_future.done() - or (not self._command_future.result()) - ): - return self + command_sent = False + if self._command_future is not None and self._command_future.done(): + if self._command_future.cancelled(): + return self + elif self._command_future.exception(): + return self + elif self._command_future.result(): + command_sent = True if self._no_expect: - self.set_success() + if command_sent: + self.set_success() return self for expectation in self.expectations: if ( @@ -423,9 +433,27 @@ def check(self, received_event, *args, **kwds): self.matched_expectations.add(expectation) if len(self.expectations) == len(self.matched_expectations): - self.set_success() + if command_sent: + self.set_success() return self + def on_subexpectation_done(self, expectation): + if not expectation.success(): + return + self.matched_expectations.add(expectation) + + if self._command_future is None or not self._command_future.done(): + return + if self._command_future.cancelled(): + return + elif self._command_future.exception(): + return + + # the command has been sent, this command is successful if its + # expectations matched. + if len(self.expectations) == len(self.matched_expectations): + self.set_success() + def _fill_default_arguments(self, message, args): super()._fill_default_arguments(message, args) if self.command_message.id != message.id: @@ -449,6 +477,7 @@ def _schedule(self, scheduler): self._command_future = controller._send_command_raw( self.command_message, self.command_args ) + self._command_future.add_done_callback(lambda _: self.check(None)) super()._schedule(scheduler) for expectation in self.expectations: scheduler._schedule(expectation, monitor=expectation.always_monitor) @@ -485,6 +514,13 @@ def __init__(self, message, command_args=None, expectation=None): self._no_expect = False self.expected_event_type = self.command_message._event_type() + def cancel(self): + if self._command_future is not None and not self._command_future.done(): + # Futures associated to running callbacks are non-cancellable + # We have to use Future.set_exception() here instead. + self._command_future.set_exception(CancelledError()) + return super().cancel() + def _check_command_event(self, received_event): if self._command_future is not None: return @@ -504,19 +540,28 @@ def check(self, received_event, *args, **kwds): if self.success(): return self self._check_command_event(received_event) - if self._command_future is None or ( - not self._command_future.done() or not self._command_future.result() - ): - return self if self.success(): return self - if self._no_expect: - self.set_success() - return self - if self.expectation is None or self.expectation.success(): - self.set_success() - elif self.expectation.check(received_event).success(): - self.set_success() + command_sent = False + if self._command_future is not None and self._command_future.done(): + if self._command_future.cancelled(): + return self + elif self._command_future.exception(): + return self + elif self._command_future.result(): + command_sent = True + if self._no_expect: + self.set_success() + elif self.expectation is None or self.expectation.success(): + self.set_success() + if ( + self.expectation is not None and ( + self.expectation.always_monitor or not + self.expectation.success() + ) and self.expectation.check(received_event).success() + ): + if command_sent: + self.set_success() return self def _fill_default_arguments(self, message, args): @@ -544,6 +589,7 @@ def _schedule(self, scheduler): self._command_future = controller._send_protobuf_command( self.command_message, self.command_args ) + self._command_future.add_done_callback(lambda _: self.check(None)) super()._schedule(scheduler) def no_expect(self, value): diff --git a/src/olympe/arsdkng/messages.py b/src/olympe/arsdkng/messages.py index fff80a7..e0d394f 100644 --- a/src/olympe/arsdkng/messages.py +++ b/src/olympe/arsdkng/messages.py @@ -336,6 +336,12 @@ def __new__(mcls, *args, **kwds): else argname for argname in cls.args_name + ["**kwds"] ) + cls.decoded_args = list( + map(lambda ctype: ctypes.pointer(ctype()), cls.decode_ctypes_args) + ) + cls.decoded_args_type = list( + map(lambda ctype: ctypes.POINTER(ctype), cls.decode_ctypes_args) + ) # docstring cls.doc_todos = "" @@ -841,14 +847,13 @@ def _argsmap_from_args(cls, *args, **kwds): ) # bitfield conversion - args = OrderedDict( - starmap( - lambda name, value: (name, cls.args_bitfield[name](value)) - if name in cls.args_bitfield - else (name, value), - args.items(), - ) - ) + for name, value in args.copy().items(): + if name in cls.args_bitfield: + try: + args[name] = cls.args_bitfield[name](value) + except ValueError: + # Bits values outside the bitfield mask are unspecified + pass args = OrderedDict(starmap(lambda k, v: (k, v), args.items())) return args @@ -1006,6 +1011,9 @@ def _expect(cls, *args, **kwds): args, send_command, policy, float_tol, no_expect, timeout = cls._expect_args( *args, **kwds ) + for arg_name in args: + if arg_name not in cls.args_name: + raise ValueError(f"'{cls.fullName}' message has no such '{arg_name}' parameter") if policy != ExpectPolicy.check: if not send_command and cls.message_type == ArsdkMessageType.CMD: expectations = ArsdkWhenAllExpectations( @@ -1165,57 +1173,35 @@ def _decode_args(cls, message_buffer): Decode a ctypes message buffer into a list of python typed arguments. This also perform the necessary enum, bitfield and unicode conversions. """ - decoded_args = list( - map(lambda ctype: ctypes.pointer(ctype()), cls.decode_ctypes_args) - ) - decoded_args_type = list( - map(lambda ctype: ctypes.POINTER(ctype), cls.decode_ctypes_args) - ) - od.arsdk_cmd_dec.argtypes = od.arsdk_cmd_dec.argtypes[:2] + decoded_args_type - - res = od.arsdk_cmd_dec(message_buffer, cls.arsdk_desc, *decoded_args) + od.arsdk_cmd_dec.argtypes = od.arsdk_cmd_dec.argtypes[:2] + cls.decoded_args_type - # ctypes -> python type conversion (exception: arsdk_binary -> c_char array) - decoded_args = list( - map( - lambda a: a.contents.value - if not isinstance(a.contents, od.struct_arsdk_binary) - else (ctypes.c_char * a.contents.len).from_address(a.contents.cdata), - decoded_args, - ) - ) - - # bytes utf-8 -> str conversion - decoded_args = list( - map(lambda a: str(a, "utf-8") if isinstance(a, bytes) else a, decoded_args) - ) - - # ctypes c_char array -> bytes - decoded_args = list( - map(lambda a: bytes(a) if isinstance(a, ctypes.Array) else a, decoded_args) - ) - - # enum conversion - decoded_args = list( - starmap( - lambda name, value: cls.args_enum[name](value) - if name in cls.args_enum - and value in cls.args_enum[name]._value2member_map_ - else value, - zip(cls.args_name, decoded_args), - ) - ) + res = od.arsdk_cmd_dec(message_buffer, cls.arsdk_desc, *cls.decoded_args) - # bitfield conversion - decoded_args = list( - map( - lambda t: cls.args_bitfield[t[0]](t[1]) - if t[0] in cls.args_bitfield - else t[1], - zip(cls.args_name, decoded_args), - ) - ) + decoded_args = cls.decoded_args[:] + for i, (name, arg) in enumerate(zip(cls.args_name, decoded_args)): + # ctypes -> python type conversion (exception: arsdk_binary -> c_char array) + if not isinstance(arg.contents, od.struct_arsdk_binary): + decoded_args[i] = arg = arg.contents.value + else: + decoded_args[i] = arg = (ctypes.c_char * arg.contents.len).from_address( + arg.contents.cdata) + # bytes utf-8 -> str conversion + if isinstance(arg, bytes): + decoded_args[i] = arg = str(arg, "utf-8") + # ctypes c_char array -> bytes + elif isinstance(arg, ctypes.Array): + decoded_args[i] = arg = bytes(arg) + if name in cls.args_enum: + # enum conversion + decoded_args[i] = arg = cls.args_enum[name](arg) + elif name in cls.args_bitfield: + # bitfield conversion + try: + decoded_args[i] = arg = cls.args_bitfield[name](arg) + except ValueError: + # Bits values outside the bitfield mask are unspecified + pass return (res, decoded_args) diff --git a/src/olympe/arsdkng/proto.py b/src/olympe/arsdkng/proto.py index be764da..043cf74 100644 --- a/src/olympe/arsdkng/proto.py +++ b/src/olympe/arsdkng/proto.py @@ -235,8 +235,8 @@ def __init__(self, root, parent=None): od_path = od_path.parent site_path = od_path.parent root_path = site_path.parent.parent.parent - self.arsdk_proto_path = site_path / "arsdk" - self.shared_proto_path = site_path + self.shared_proto_path = site_path / "olympe_protobuf" + self.arsdk_proto_path = self.shared_proto_path / "arsdk" self.parrot_proto_path = self.shared_proto_path / "parrot" self.google_proto_path = self.shared_proto_path / "google" self.proto_def_path = root_path / "include" diff --git a/src/olympe/concurrent/__init__.py b/src/olympe/concurrent/__init__.py index ddc9e0f..7a6b48c 100644 --- a/src/olympe/concurrent/__init__.py +++ b/src/olympe/concurrent/__init__.py @@ -118,7 +118,7 @@ def __init__(self, logger, name=None, parent=None, max_workers=None): self.c_evt_userdata = dict() self.pomp_fd_callbacks = dict() self.cleanup_functions = dict() - self.futures = [] + self.futures = set() self.async_cleanup_running = False self._watchdog_cb_imp = od.pomp_watchdog_cb_t(lambda *_: self._watchdog_cb()) self._watchdog_user_cb = None @@ -177,15 +177,25 @@ def _task_timer_cb(self, *_): return now = int(time.time() * 1000) deadline, task = self._scheduled_tasks.queue[0] - while deadline <= now: + seen = set() + delay = deadline - now + while delay <= 0: self.run_later(task.step) + seen.add(task) self._scheduled_tasks.get_nowait() if self._scheduled_tasks.empty(): - break + return deadline, task = self._scheduled_tasks.queue[0] - if self._scheduled_tasks.empty(): + delay = deadline - now + if task in seen: + if self.running: + self.run_later(self._task_timer_cb) + return + if not self.running: + while not self._scheduled_tasks.empty(): + _, task = self._scheduled_tasks.get_nowait() + task.set_exception(CancelledError()) return - delay = deadline - now self.set_timer(self._task_timer, delay, 0) def _reschedule(self, task, deadline=None): @@ -193,15 +203,20 @@ def _reschedule(self, task, deadline=None): if deadline is not None: deadline = int(deadline * 1000) if deadline is None or deadline < now: - self.run_async(task.step) + self.run_later(task.step) else: current_deadline = None if not self._scheduled_tasks.empty(): current_deadline = self._scheduled_tasks.queue[0].priority - self._scheduled_tasks.put_nowait(_TaskQueueItem(deadline, task)) + delay = deadline - now if current_deadline is None or current_deadline > deadline: - delay = deadline - now - self.set_timer(self._task_timer, delay, 0) + if delay > 0: + self._scheduled_tasks.put_nowait(_TaskQueueItem(deadline, task)) + self.set_timer(self._task_timer, delay, 0) + else: + self.run_later(task.step) + else: + self._scheduled_tasks.put_nowait(_TaskQueueItem(deadline, task)) def _ensure_from_sync_future(self, func, *args, **kwds): if not inspect.iscoroutinefunction(func) and not inspect.isasyncgenfunction( @@ -245,11 +260,15 @@ def run_async(self, func, *args, **kwds): self.async_pomp_task.append((future, func, args, kwds)) self._wake_up() else: + future.set_running_or_notify_cancel() try: ret = func(*args, **kwds) except Exception as e: self.logger.exception("Unhandled exception in async task function") future.set_exception(e) + except: # noqa + future.cancel() + self.running = False else: if future.done(): assert isinstance(future, _Task) @@ -280,6 +299,7 @@ async def wrapper(*args, **kwds): func(*args, **kwds) else: await func(*args, **kwds) + wrapper.__qualname__ = f"{func}" return wrapper def run_delayed(self, delay, func, *args, **kwds): @@ -334,8 +354,6 @@ async def _cancel_and_wait(self, fut): return fut.result() except concurrent.futures.CancelledError as exc: raise concurrent.futures.TimeoutError() from exc - else: - raise concurrent.futures.TimeoutError() def _release_waiter(self, waiter, fut): if not waiter.done(): @@ -372,8 +390,6 @@ async def await_for(self, timeout, fut, *args, **kwds): return fut.result() except concurrent.futures.CancelledError as exc: raise concurrent.futures.TimeoutError() from exc - else: - raise concurrent.futures.TimeoutError() def _wake_up_event_cb(self, pomp_evt, _userdata): """ @@ -666,6 +682,9 @@ def _destroy_pomp_loop_timers(self): self.destroy_timer(pomp_timer) def register_cleanup(self, fn): + if fn in self.cleanup_functions: + # Do not register the same cleanup functions twice + self.unregister_cleanup(fn) if inspect.iscoroutinefunction(fn) or inspect.isasyncgenfunction(fn): task = _Task(self, True, fn) self.cleanup_functions[fn] = task.step @@ -675,7 +694,7 @@ def register_cleanup(self, fn): def unregister_cleanup(self, fn, ignore_error=False): try: func = self.cleanup_functions.pop(fn) - # async cleanup functions need to be properly cancelled if they've not been runned + # async cleanup functions need to be properly cancelled if they've not been run obj = getattr(func, "__self__", None) if isinstance(obj, _Task) and obj not in current_tasks(self): obj.cancel() @@ -684,7 +703,7 @@ def unregister_cleanup(self, fn, ignore_error=False): self.logger.error(f"Failed to unregister cleanup function '{fn}'") def _collect_futures(self): - self.futures = list(filter(lambda f: f.running(), self.futures)) + self.futures = set(filter(lambda f: f.running(), self.futures)) def _cleanup(self): # Execute cleanup functions @@ -699,6 +718,9 @@ def _cleanup(self): self.logger.error(f"Exception caught: {e}") self._run_task_list(self.async_pomp_task) self._run_task_list(self.deferred_pomp_task) + for cleanup_fn in reversed(list(self.cleanup_functions.keys())): + # unregister self registering cleanup functions. + self.unregister_cleanup(cleanup_fn, ignore_error=True) self.cleanup_functions = dict() # Execute asynchronous cleanup actions @@ -723,16 +745,16 @@ def _cleanup(self): self.async_pomp_task = [] self.deferred_pomp_task = [] - self.futures = [] + self.futures = set() self.async_cleanup_running = False def _register_future(self, f): - self.futures.append(f) + self.futures.add(f) def _unregister_future(self, f, ignore_error=False): try: self.futures.remove(f) - except ValueError: + except KeyError: if not self.async_cleanup_running and not ignore_error: self.logger.error(f"Failed to unregister future '{f}'") diff --git a/src/olympe/concurrent/_task.py b/src/olympe/concurrent/_task.py index d3395fa..d76642e 100644 --- a/src/olympe/concurrent/_task.py +++ b/src/olympe/concurrent/_task.py @@ -47,6 +47,9 @@ def __init__(self, loop, from_sync, corofunc, *args, **kwds): self._fut_waiter = None self._must_cancel = False + def __repr__(self): + return super().__repr__() + f" <{self._coro}>" + def cancel(self): if self.done(): return False @@ -86,6 +89,7 @@ def _step_blocking_impl(self, blocking, result): result.add_done_callback(self._wakeup) self._fut_waiter = result if self._must_cancel: + self.cancel() if self._fut_waiter.cancel(): self._must_cancel = False else: @@ -114,7 +118,6 @@ def step(self, exc=None): if exc is None: # We use the `send` method directly, because coroutines # don't have `__iter__` and `__next__` methods. - self._loop result = self._coro.send(None) else: result = self._coro.throw(exc) diff --git a/src/olympe/concurrent/future.py b/src/olympe/concurrent/future.py index a18f7cb..e32bb5d 100644 --- a/src/olympe/concurrent/future.py +++ b/src/olympe/concurrent/future.py @@ -28,7 +28,7 @@ # SUCH DAMAGE. -import concurrent +import concurrent.futures import inspect import threading @@ -46,7 +46,10 @@ class Future(concurrent.futures.Future): def __init__(self, loop=None): super().__init__() self._loop = loop or _get_running_loop() + + def set_running_or_notify_cancel(self): self._register() + return super().set_running_or_notify_cancel() @property def loop(self): @@ -57,12 +60,13 @@ def loop(self, loop): if self._loop is not None: raise RuntimeError("Future is already attached to a loop") self._loop = loop - self._register() def _register(self): - if self._loop is not None: - self._loop._register_future(self) - self.add_done_callback(lambda _: self._loop._unregister_future(self)) + if not self._loop: + self._loop = _get_running_loop() + assert self._loop + self._loop._register_future(self) + self.add_done_callback(lambda _: self._loop._unregister_future(self)) def set_from(self, source): if self.done(): diff --git a/src/olympe/doc/examples/streaming.py b/src/olympe/doc/examples/streaming.py index 54ff08e..3bc31cd 100644 --- a/src/olympe/doc/examples/streaming.py +++ b/src/olympe/doc/examples/streaming.py @@ -45,7 +45,7 @@ def __init__(self): self.renderer = None def start(self): - # Connect the the drone + # Connect to drone assert self.drone.connect(retry=3) if DRONE_RTSP_PORT is not None: diff --git a/src/olympe/doc/installation.rst b/src/olympe/doc/installation.rst index 851ecba..c83f93d 100644 --- a/src/olympe/doc/installation.rst +++ b/src/olympe/doc/installation.rst @@ -217,7 +217,7 @@ What's a Python virtual environment A Python virtual environment is a Python environment isolated from the system-wide Python environment. A package installed in one virtual environment does not change anything in the system-wide environment (or any other virtual environment). Python virtual environment can be -created by any user without any specific priviledges. A "virtual env" resides in a directory +created by any user without any specific privileges. A "virtual env" resides in a directory chosen by the user and contains a "site-packages" where Python packages are installed. To use a specific virtual environment, the user usually has to source or execute a specific script that will activate the environment, set up environment variables and change the current shell prompt. Once a @@ -311,6 +311,6 @@ little virtual experience, we can safely remove this virtual environment from ou $ rm -rf my-virtual-env/ Note: On Debian-based distros, you have little to no use for the apt provided `pip` (the -`python3-pip` package). I personaly use it just to install `virtualenv +`python3-pip` package). I personally use it just to install `virtualenv `_ (a better/faster version of `venv `_). diff --git a/src/olympe/expectations.py b/src/olympe/expectations.py index 4ec686b..5efafde 100644 --- a/src/olympe/expectations.py +++ b/src/olympe/expectations.py @@ -52,8 +52,10 @@ class ExpectationBase(ABC): always_monitor = False _eventloop_future_blocking = False - def __init__(self): - self._future = Future() + def __init__(self, future=None): + if future is None: + future = Future() + self._future = future self._awaited = False self._scheduler = None self._success = False @@ -81,6 +83,8 @@ def _await(self, scheduler): ret = not self._awaited self._awaited = True self._scheduler = scheduler + if self._future.loop is None and ret: + self._future.loop = self._scheduler.expectation_loop return ret def success(self): @@ -136,8 +140,7 @@ def set_timedout(self): def cancel(self): if self._future.done(): return False - self._future.cancel() - return True + return self._future.cancel() def cancelled(self): return self._future.cancelled() @@ -159,7 +162,6 @@ def timedout(self): def base_copy(self, *args, **kwds): other = self.__class__(*args, **kwds) - ExpectationBase.__init__(other) other._timeout = self._timeout return other @@ -242,8 +244,7 @@ def explain(self): class FutureExpectation(ExpectationBase): def __init__(self, future, status_checker=lambda status: True): - super().__init__() - self._future = future + super().__init__(future) self._status_checker = status_checker self._future.add_done_callback(self._on_done) @@ -330,22 +331,23 @@ def _await(self, scheduler): if not ret: return False self._checked = self._check_expectation.success() - self._success = self._checked - if self._success: + if self._checked: + self._wait_expectation.cancel() self.set_success() return ret def _schedule(self, scheduler): super()._schedule(scheduler) - self._check_expectation._schedule(scheduler) + scheduler._schedule(self._check_expectation) self._checked = self._check_expectation.success() - self._success = self._checked - if not self._success: + if not self._checked: scheduler._schedule( self._wait_expectation, monitor=self._wait_expectation.always_monitor ) + self._check_expectation.cancel() else: self.set_success() + self._wait_expectation.cancel() def copy(self): other = super().base_copy( @@ -400,31 +402,48 @@ def timedout(self): self.set_timedout() return self._wait_expectation.timedout() + def cancel(self): + check_cancelled = self._check_expectation.cancel() + wait_cancelled = self._wait_expectation.cancel() + return super().cancel() or check_cancelled or wait_cancelled + def cancelled(self): - return self._wait_expectation.cancelled() + return self._check_expectation.cancelled() and self._wait_expectation.cancelled() class CheckWaitStateExpectation(CheckWaitStateExpectationMixin, Expectation): pass -class MultipleExpectationMixin: +class MultipleExpectationMixin(ABC): def __init__(self, expectations=None): super().__init__() if expectations is None: self.expectations = [] else: self.expectations = expectations - for expectation in self.expectations: - expectation.add_done_callback(self.check) self.matched_expectations = IndexedSet() + def _register_subexpectations(self, *expectations): + for expectation in expectations: + expectation.add_done_callback(self.on_subexpectation_done) + + @abstractmethod + def on_subexpectation_done(self, expectation): + pass + + def _schedule(self, scheduler): + super()._schedule(scheduler) + scheduler.expectation_loop.run_async(self._register_subexpectations, *self.expectations) + def _await(self, scheduler): ret = True if not super()._await(scheduler): ret = False if not all(list(map(lambda e: e._await(scheduler), self.expectations))): ret = False + + scheduler.expectation_loop.run_async(self._register_subexpectations, *self.expectations) return ret def copy(self): @@ -434,11 +453,15 @@ def copy(self): def append(self, expectation): if not isinstance(expectation, self.__class__): self.expectations.append(expectation) - expectation.add_done_callback(self.check) + + if self._scheduler is not None: + self._scheduler.expectation_loop.run_async( + self._register_subexpectations, expectation) else: self.expectations.extend(expectation.expectations) - for expectation in expectation.expectations: - expectation.add_done_callback(self.check) + if self._scheduler is not None: + self._scheduler.expectation_loop.run_async( + self._register_subexpectations, expectation.expectations) return self def expected_events(self): @@ -521,6 +544,13 @@ def as_completed(self, expected_count=None, timeout=None): timeout = end_time - time.monotonic() raise FutureTimeoutError() + def cancel(self): + cancelled = False + for expectation in self.expectations: + if expectation.cancel(): + cancelled = True + return super().cancel() or cancelled + class MultipleExpectation(MultipleExpectationMixin, Expectation): pass @@ -557,15 +587,35 @@ def cancelled(self): return False def check(self, *args, **kwds): + success = False for expectation in self.expectations: if ( expectation.always_monitor or not expectation.success() ) and expectation.check(*args, **kwds).success(): self.matched_expectations.add(expectation) + success = True self.set_success() - return self + break + + if success: + # Cancel every non successful expectations + for expectation in self.expectations: + if not expectation.success(): + expectation.cancel() return self + def on_subexpectation_done(self, expectation): + if not expectation.success(): + return + + self.matched_expectations.add(expectation) + self.set_success() + + # Cancel every non successful expectations + for expectation in self.expectations: + if not expectation.success(): + expectation.cancel() + def __or__(self, other): return self.append(other) @@ -617,6 +667,14 @@ def check(self, *args, **kwds): self.set_success() return self + def on_subexpectation_done(self, expectation): + if not expectation.success(): + return + + self.matched_expectations.add(expectation) + if len(self.expectations) == len(self.matched_expectations): + self.set_success() + def __and__(self, other): return self.append(other) @@ -649,20 +707,26 @@ def _do_schedule(self): # Schedule all available expectations in this sequence until we # encounter a pending asynchronous expectation while self._current_expectation() is not None: - if not self._current_expectation()._awaited: + current = self._current_expectation() + if not current._awaited: self._scheduler._schedule( - self._current_expectation(), - monitor=self._current_expectation().always_monitor, + current, + monitor=current.always_monitor, ) - if not self._current_expectation().success(): + if not current.success(): break - self.matched_expectations.add(self._current_expectation()) + self.matched_expectations.add(current) if len(self.expectations) == len(self.matched_expectations): self.set_success() elif any(expectation.cancelled() for expectation in self.expectations): self.cancel() + def on_subexpectation_done(self, expectation): + if not expectation.success(): + return + self._scheduler.expectation_loop.run_async(self._do_schedule) + def timedout(self): if super().timedout(): return True @@ -708,8 +772,6 @@ def check(self, *args, **kwds): ) and (self._current_expectation().check(*args, **kwds).success()) ): - # Consume the current expectation - self.matched_expectations.add(self._current_expectation()) # Schedule the next expectation(s), if any. # This may also consume one or more synchronous expectations # (i.e. events with policy="check"). diff --git a/src/olympe/http.py b/src/olympe/http.py index 8cdcccb..21c9c64 100644 --- a/src/olympe/http.py +++ b/src/olympe/http.py @@ -184,7 +184,7 @@ async def __aiter__(self): return while True: event = await self._connection._get_next_event() - if event == h11.EndOfMessage(): + if event == h11.EndOfMessage() or event == h11.ConnectionClosed(): break if not isinstance(event, h11.Data): self.logger.error(f"Unexpected event {event} for {self._request.url}") @@ -244,8 +244,12 @@ async def aclose(self): data = self._connection._conn.send(close_event) self._connection._events.put_nowait(close_event) self._connection._event_sem.release() - ws_closed = await self._connection.awrite(data) - return await self._connection.adisconnect() and ws_closed + ws_closed = False + try: + ws_closed = await self._connection.awrite(data) + finally: + ws_closed = await self._connection.adisconnect() and ws_closed + return ws_closed class ConnectionListener(olympe.networking.DataListener): @@ -477,6 +481,10 @@ async def adisconnect(self): def connected(self): return self._client.connected + @property + def fd(self): + return self._client.fd + class Session: def __init__(self, loop=None): diff --git a/src/olympe/media.py b/src/olympe/media.py index 71e46ca..81ebe6d 100644 --- a/src/olympe/media.py +++ b/src/olympe/media.py @@ -887,6 +887,63 @@ def __init__(self, _timeout=None): super().__init__("delete_all_media", "_delete_all_media", _timeout=_timeout) +class ResourceDownloadProgressEvent(Event): + def __init__(self, resource_id, download_percent, policy=None): + super().__init__(policy=policy) + self.resource_id = resource_id + self.download_percent = download_percent + + def copy(self): + return self.__class__( + self.resource_id, self.download_percent, policy=self._policy + ) + + def __str__(self): + return ("resource_downloaded_progress_event(" + f"resource_id={self.resource_id}, downloaded_percent={self.download_percent})") + + +class resource_download_progress(Expectation): + def __init__(self, resource_id=None, downloaded_percent=None): + super().__init__() + self.resource_id = resource_id + self.downloaded_percent = downloaded_percent + self._expected_event = ResourceDownloadProgressEvent( + self.resource_id, self.downloaded_percent) + + def copy(self): + return super().base_copy(self.resource_id, self.downloaded_percent) + + def check(self, resource_download_progress_event, *args, **kwds): + if not isinstance(resource_download_progress_event, ResourceDownloadProgressEvent): + return self + if self.resource_id is None: + self.set_success() + return self + if self.resource_id != resource_download_progress_event.resource_id: + return self + self.set_success() + return self + + def expected_events(self): + return EventContext([self._expected_event]) + + def received_events(self): + return self.matched_events() + + def matched_events(self): + if self._success: + return EventContext([self._expected_event]) + else: + return EventContext() + + def unmatched_events(self): + if not self._success: + return EventContext([self._expected_event]) + else: + return EventContext() + + class _download_resource(Expectation): always_monitor = True @@ -922,11 +979,6 @@ def __init__( self._downloaded_size = 0 self._downloaded_percent = 0 self._write_tasks = deque() - self.add_done_callback(self._on_done) - - def _on_done(self, _): - if self._resource_file is not None: - self._resource_file.close() def copy(self): return super().base_copy( @@ -1108,6 +1160,12 @@ async def _write_chunk(self, chunk): self._downloaded_size += len(chunk) percent = int(100 * (self._downloaded_size / self._resource_size)) if percent > self._downloaded_percent: + await self._media._process_event( + ResourceDownloadProgressEvent( + self._resource.resource_id, + self._downloaded_percent + ) + ) self._downloaded_percent = percent self._media.logger.info( "Downloading {} {} {}%".format( @@ -1216,6 +1274,14 @@ def check(self, *args, **kwds): self.set_success() return self + def on_subexpectation_done(self, expectation): + if not expectation.success(): + return + + self.matched_expectations.add(expectation) + if len(self.expectations) == len(self.matched_expectations): + self.set_success() + def wait(self, _timeout=None): if self._scheduler is None: return self @@ -1454,7 +1520,7 @@ def wait_for_pending_downloads(self, timeout=None): deadline = self._scheduler.time() + timeout super().stream_join(timeout=timeout) while True: - download = resource_downloaded() + download = resource_downloaded() | resource_download_progress() self.schedule(download) download.wait(_timeout=2.0) if not download: @@ -1633,6 +1699,8 @@ async def _update_media_state(self, media_event): """ Update the internal media state from a websocket media event """ + if not isinstance(media_event, MediaEvent): + return if media_event.name == "media_created": media_id, media = _make_media(media_event.data["media"]) if not media_id: @@ -1852,7 +1920,7 @@ async def aresource_info( raise ValueError("resource_info: missing media_id or resource_id") if self._media_state is None: raise RuntimeError( - "resource_info: not currently connected the the drone media API" + "resource_info: not currently connected to the drone media API" ) try: if media_id is not None: diff --git a/src/olympe/mixins/media.py b/src/olympe/mixins/media.py index 8d85486..7530e09 100644 --- a/src/olympe/mixins/media.py +++ b/src/olympe/mixins/media.py @@ -45,6 +45,10 @@ def __init__(self, *args, media_autoconnect=True, media_port=80, **kwds): scheduler=self._scheduler ) + def destroy(self): + self._media.shutdown() + super().destroy() + @callback_decorator() def _connected_cb(self, *args): super()._connected_cb(*args) diff --git a/src/olympe/module_loader.py b/src/olympe/module_loader.py index 379397d..c62c9c8 100644 --- a/src/olympe/module_loader.py +++ b/src/olympe/module_loader.py @@ -42,7 +42,7 @@ from types import ModuleType # Preload olympe_deps bundled libprotobuf so that we don't rely on system installed libprotobuf. -olympe_deps._load_library("libprotobuf.so.18") # noqa +olympe_deps._load_library("libprotobuf.so.30") # noqa from .arsdkng.enums import ArsdkEnums, ArsdkEnum, ArsdkBitfield, ArsdkProtoEnum # noqa from .arsdkng.messages import ArsdkMessages, ArsdkMessageBase, ArsdkProtoMessage # noqa diff --git a/src/olympe/networking.py b/src/olympe/networking.py index 5a4f336..dc8bb85 100644 --- a/src/olympe/networking.py +++ b/src/olympe/networking.py @@ -44,6 +44,7 @@ import collections import concurrent.futures import ctypes +import errno import ipaddress import logging import olympe_deps as od @@ -560,13 +561,22 @@ async def adestroy(self): if self._ctx is None: return await self.astop() - await self._loop.asleep(1.) if self._ctx is None: return res = od.pomp_ctx_destroy(self._ctx) if res != 0: - self.logger.error(f"Failed to destroy pomp context: {os.strerror(-res)}") - self._ctx = None + if res != -errno.EBUSY: + self.logger.error(f"Failed to destroy pomp context: {os.strerror(-res)}") + else: + # Device or resource busy... The connection is still in use. + pass + # Destroying the pomp context is the only way to unregister internal pomp_timer fd, + # so we have to try harder to prevent unnecessary pomp_loop_destroy errors about + # unregisted fds. + self._destroying = False + self._loop.register_cleanup(self.adestroy) + else: + self._ctx = None def destroy(self): self._loop.run_later(self.adestroy) @@ -603,7 +613,7 @@ def _event_cb( elif event == od.POMP_EVENT_DISCONNECTED: self._on_disconnected_cb(conn) elif event == od.POMP_EVENT_MSG: - # pomp message API is not implented, so we should't get here + # pomp message API is not implemented, so we shouldn't get here self.logger.error("Unhandled pomp message event") else: self.logger.error(f"Unknown pomp event {event}") @@ -709,7 +719,7 @@ def _get_sockaddr(self, addr, port): addrlen = ctypes.sizeof(sockaddr_in6) data = sockaddr_in6 else: - raise ValueError(f"Unsupported address familly: {addr}") + raise ValueError(f"Unsupported address family: {addr}") return sockaddr, addrlen, data @@ -918,6 +928,10 @@ async def awrite(self, data): def connected(self): return self._connected and self._client_connection is not None + @property + def fd(self): + return self._client_connection.fileno if self._client_connection else -1 + class TlsClient(TcpClient): def __init__(self, *args, ssl_context=None, **kwds): diff --git a/src/olympe/subscriber.py b/src/olympe/subscriber.py index 859304c..5ebb0ec 100644 --- a/src/olympe/subscriber.py +++ b/src/olympe/subscriber.py @@ -55,15 +55,12 @@ def __init__( # elements as new elements are appended. self._event_queue = deque([], queue_size) - # Await the expectation, this prevent monitored command expectations - # from sending messages - if self._expectation is not None: - self._expectation._await(self._scheduler) - def __enter__(self): pass def __exit__(self, *args, **kwds): + if self._expectation is not None: + self._expectation.cancel() self._scheduler.unsubscribe(self) def _add_event(self, event): @@ -76,6 +73,9 @@ def notify(self, event): self._add_event(event) return True else: + # Await the expectation (this is a no-op if already done). + # This prevent monitored command expectations from sending messages + self._expectation._await(self._scheduler) if self._expectation.success() or self._expectation.cancelled(): # reset already succeeded or cancelled expectations self._expectation = self._expectation.copy() @@ -100,4 +100,6 @@ def timeout(self): return self._timeout def unsubscribe(self): + if self._expectation is not None: + self._expectation.cancel() self._scheduler.unsubscribe(self) diff --git a/src/olympe/utils/__init__.py b/src/olympe/utils/__init__.py index c167316..642e63e 100644 --- a/src/olympe/utils/__init__.py +++ b/src/olympe/utils/__init__.py @@ -175,8 +175,6 @@ def update_mapping(mapping, updater): for k, v in updater.items(): if isinstance(v, Mapping): mapping[k] = update_mapping(mapping.get(k, type(v)()), v) - elif isinstance(v, (list, tuple)): - mapping[k] = mapping.get(k, type(v)()) + v else: mapping[k] = v return mapping diff --git a/src/olympe/video/frame.py b/src/olympe/video/frame.py index 4d534cb..91835a1 100644 --- a/src/olympe/video/frame.py +++ b/src/olympe/video/frame.py @@ -164,9 +164,7 @@ def _get_video_frame(self): res = self._mbuf.copy( self._mbuf_video_frame, self._packed_buffer, - False, # FIXME: remove_stride=True is buggy - # the ffmpeg decoder has no stride, so this is OK-ish - # for now + True, ctypes.byref(self._packed_video_frame), ) if res < 0: