diff --git a/deebot_client/map.py b/deebot_client/map.py index 04d315d9..1a957846 100644 --- a/deebot_client/map.py +++ b/deebot_client/map.py @@ -2,19 +2,20 @@ import ast import asyncio import base64 -from collections.abc import Callable, Coroutine +from collections.abc import Callable, Coroutine, Sequence import dataclasses from datetime import UTC, datetime from io import BytesIO +import itertools import lzma -import math import struct from typing import Any, Final import zlib from numpy import float64, reshape, zeros from numpy.typing import NDArray -from PIL import Image, ImageDraw, ImageOps +from PIL import Image, ImageDraw +import svg from deebot_client.events.map import MapChangedEvent @@ -40,10 +41,12 @@ _LOGGER = get_logger(__name__) _PIXEL_WIDTH = 50 -_POSITION_PNG = { - PositionType.DEEBOT: "iVBORw0KGgoAAAANSUhEUgAAAAYAAAAGCAIAAABvrngfAAAACXBIWXMAAAsTAAALEwEAmpwYAAAF0WlUWHRYTUw6Y29tLmFkb2JlLnhtcAAAAAAAPD94cGFja2V0IGJlZ2luPSLvu78iIGlkPSJXNU0wTXBDZWhpSHpyZVN6TlRjemtjOWQiPz4gPHg6eG1wbWV0YSB4bWxuczp4PSJhZG9iZTpuczptZXRhLyIgeDp4bXB0az0iQWRvYmUgWE1QIENvcmUgNS42LWMxNDUgNzkuMTYzNDk5LCAyMDE4LzA4LzEzLTE2OjQwOjIyICAgICAgICAiPiA8cmRmOlJERiB4bWxuczpyZGY9Imh0dHA6Ly93d3cudzMub3JnLzE5OTkvMDIvMjItcmRmLXN5bnRheC1ucyMiPiA8cmRmOkRlc2NyaXB0aW9uIHJkZjphYm91dD0iIiB4bWxuczp4bXA9Imh0dHA6Ly9ucy5hZG9iZS5jb20veGFwLzEuMC8iIHhtbG5zOnhtcE1NPSJodHRwOi8vbnMuYWRvYmUuY29tL3hhcC8xLjAvbW0vIiB4bWxuczpzdEV2dD0iaHR0cDovL25zLmFkb2JlLmNvbS94YXAvMS4wL3NUeXBlL1Jlc291cmNlRXZlbnQjIiB4bWxuczpkYz0iaHR0cDovL3B1cmwub3JnL2RjL2VsZW1lbnRzLzEuMS8iIHhtbG5zOnBob3Rvc2hvcD0iaHR0cDovL25zLmFkb2JlLmNvbS9waG90b3Nob3AvMS4wLyIgeG1wOkNyZWF0b3JUb29sPSJBZG9iZSBQaG90b3Nob3AgQ0MgMjAxOSAoV2luZG93cykiIHhtcDpDcmVhdGVEYXRlPSIyMDIwLTA1LTI0VDEyOjAzOjE2KzAyOjAwIiB4bXA6TWV0YWRhdGFEYXRlPSIyMDIwLTA1LTI0VDEyOjAzOjE2KzAyOjAwIiB4bXA6TW9kaWZ5RGF0ZT0iMjAyMC0wNS0yNFQxMjowMzoxNiswMjowMCIgeG1wTU06SW5zdGFuY2VJRD0ieG1wLmlpZDo0YWM4NWY5MC1hNWMwLTE2NDktYTQ0MC0xMWM0NWY5OGQ1MDYiIHhtcE1NOkRvY3VtZW50SUQ9ImFkb2JlOmRvY2lkOnBob3Rvc2hvcDo3Zjk3MTZjMi1kZDM1LWJiNDItYjMzZS1hYjYwY2Y4ZTZlZDYiIHhtcE1NOk9yaWdpbmFsRG9jdW1lbnRJRD0ieG1wLmRpZDpiMzhiNGZlMS1lOGNkLTJjNDctYmQwZC1lNmZiNzRhMjFkMDciIGRjOmZvcm1hdD0iaW1hZ2UvcG5nIiBwaG90b3Nob3A6Q29sb3JNb2RlPSIzIj4gPHhtcE1NOkhpc3Rvcnk+IDxyZGY6U2VxPiA8cmRmOmxpIHN0RXZ0OmFjdGlvbj0iY3JlYXRlZCIgc3RFdnQ6aW5zdGFuY2VJRD0ieG1wLmlpZDpiMzhiNGZlMS1lOGNkLTJjNDctYmQwZC1lNmZiNzRhMjFkMDciIHN0RXZ0OndoZW49IjIwMjAtMDUtMjRUMTI6MDM6MTYrMDI6MDAiIHN0RXZ0OnNvZnR3YXJlQWdlbnQ9IkFkb2JlIFBob3Rvc2hvcCBDQyAyMDE5IChXaW5kb3dzKSIvPiA8cmRmOmxpIHN0RXZ0OmFjdGlvbj0ic2F2ZWQiIHN0RXZ0Omluc3RhbmNlSUQ9InhtcC5paWQ6NGFjODVmOTAtYTVjMC0xNjQ5LWE0NDAtMTFjNDVmOThkNTA2IiBzdEV2dDp3aGVuPSIyMDIwLTA1LTI0VDEyOjAzOjE2KzAyOjAwIiBzdEV2dDpzb2Z0d2FyZUFnZW50PSJBZG9iZSBQaG90b3Nob3AgQ0MgMjAxOSAoV2luZG93cykiIHN0RXZ0OmNoYW5nZWQ9Ii8iLz4gPC9yZGY6U2VxPiA8L3htcE1NOkhpc3Rvcnk+IDwvcmRmOkRlc2NyaXB0aW9uPiA8L3JkZjpSREY+IDwveDp4bXBtZXRhPiA8P3hwYWNrZXQgZW5kPSJyIj8+AP7+NwAAAFpJREFUCJllzEEKgzAQhtFvMkSsEKj30oUXrYserELA1obhd+nCd4BnksZ53X4Cnr193ov59Iq+o2SA2vz4p/iKkgkRouTYlbhJ/jBqww03avPBTNI4rdtx9ScfWyYCg52e0gAAAABJRU5ErkJggg==", # nopep8 - PositionType.CHARGER: "iVBORw0KGgoAAAANSUhEUgAAAAoAAAAOCAYAAAAWo42rAAAAdUlEQVQoU2NkQAP/nzD8BwkxyjAwIkuhcEASRCmEKYKZhGwq3ER0ReiKSVOIyzRkU8EmwhUyKzAwSNyHyL9QZGD4+wDMBLmVEasimFHIiuEKpcHBhwmeQryBMJFohcjuw2s1SBKHZ8BWo/gauyshvobJEYoZAEOSPXnhzwZnAAAAAElFTkSuQmCC", # nopep8 + +_POSITIONS_SVG_ORDER = { + PositionType.DEEBOT: 0, + PositionType.CHARGER: 1, } + _OFFSET = 400 _TRACE_MAP = "trace_map" _COLORS = { @@ -55,6 +58,51 @@ MapSetType.NO_MOP_ZONES: "#FFA500", } +# SVG definitions referred by map elements +_SVG_DEFS = svg.Defs( + elements=[ + # Gradient used by Bot icon + svg.RadialGradient( + id="device_bg", + cx=svg.Length(50, "%"), + cy=svg.Length(50, "%"), + r=svg.Length(50, "%"), + fx=svg.Length(50, "%"), + fy=svg.Length(50, "%"), + elements=[ + svg.Stop(offset=svg.Length(70, "%"), style="stop-color:#0000FF;"), + svg.Stop(offset=svg.Length(97, "%"), style="stop-color:#0000FF00;"), + ], + ), + # Bot circular icon + svg.G( + id=f"position_{PositionType.DEEBOT}", + elements=[ + svg.Circle(r=5, fill="url(#device_bg)"), + svg.Circle(r=3.5, stroke="white", fill="blue", stroke_width=0.5), + ], + ), + # Charger pin icon (pre-flipped vertically) + svg.G( + id=f"position_{PositionType.CHARGER}", + elements=[ + svg.Path( + fill="#ffe605", + d=[ + svg.M(4, 6.4), + svg.C(4, 4.2, 0, 0, 0, 0), + svg.C(0, 0, -4, 4.2, -4, 6.4), + svg.C(-4, 8.6, -2.2, 10.4, 0, 10.4), + svg.C(2.2, 10.4, 4, 8.6, 4, 6.4), + svg.Z(), + ], + ), + svg.Circle(fill="white", r=2.8, cy=6.4, cx=0), + ], + ), + ] +) + def _decompress_7z_base64_data(data: str) -> bytes: _LOGGER.debug("[decompress7zBase64Data] Begin") @@ -77,10 +125,11 @@ def _decompress_7z_base64_data(data: str) -> bytes: return decompressed_data -def _calc_value(value: int, min_value: int, max_value: int) -> int: +def _calc_value(value: float, min_value: float, max_value: float) -> float: try: if value is not None: - new_value = int((int(value) / _PIXEL_WIDTH) + _OFFSET) + # SVG allows sub-pixel precision, so we use floating point coordinates for better placement. + new_value = (float(value) / _PIXEL_WIDTH) + _OFFSET # return value inside min and max return min(max_value, max(min_value, new_value)) @@ -91,8 +140,8 @@ def _calc_value(value: int, min_value: int, max_value: int) -> int: def _calc_point( - x: int, y: int, image_box: tuple[int, int, int, int] | None -) -> tuple[int, int]: + x: float, y: float, image_box: tuple[float, float, float, float] | None +) -> tuple[float, float]: if image_box is None: image_box = (0, 0, x, y) @@ -102,43 +151,76 @@ def _calc_point( ) -def _draw_positions( +def _points_to_svg_path( + points: Sequence[tuple[float, float]] | Sequence[tuple[float, float, bool]], +) -> list[svg.PathData]: + # Convert a set of simple point (x, y), or trace points (x, y, connected, type) to + # SVG path instructions. + path_data: list[svg.PathData] = [] + + # First instruction: move to the starting point using absolute coordinates + first_p = points[0] + path_data.append(svg.MoveTo(first_p[0], first_p[1])) + + for prev_p, p in itertools.pairwise(points): + if p != prev_p: # Skip repeated points + if len(p) == 2 or p[2]: + path_data.append(svg.LineToRel(p[0] - prev_p[0], p[1] - prev_p[1])) + else: + path_data.append(svg.MoveToRel(p[0] - prev_p[0], p[1] - prev_p[1])) + + return path_data + + +def _get_svg_positions( positions: list[Position], - image: Image.Image, image_box: tuple[int, int, int, int] | None, -) -> None: - for position in positions: - icon = Image.open(BytesIO(base64.b64decode(_POSITION_PNG[position.type]))) - image.paste( - icon, - _calc_point(position.x, position.y, image_box), - icon.convert("RGBA"), +) -> list[svg.Element]: + svg_positions: list[svg.Element] = [] + for position in sorted(positions, key=lambda x: _POSITIONS_SVG_ORDER[x.type]): + pos = _calc_point(position.x, position.y, image_box) + svg_positions.append( + svg.Use(href=f"#position_{position.type}", x=pos[0], y=pos[1]) ) + return svg_positions -def _draw_subset( + +def _get_svg_subset( subset: MapSubsetEvent, - draw: "DashedImageDraw", image_box: tuple[int, int, int, int] | None, -) -> None: - coordinates_ = ast.literal_eval(subset.coordinates) - points: list[tuple[int, int]] = [ - _calc_point(coordinates_[i], coordinates_[i + 1], image_box) - for i in range(0, len(coordinates_), 2) +) -> svg.Path | svg.Polygon: + subset_coordinates: list[int] = ast.literal_eval(subset.coordinates) + + points = [ + _calc_point(subset_coordinates[i], subset_coordinates[i + 1], image_box) + for i in range(0, len(subset_coordinates), 2) ] - if len(points) == 4: - # close rectangle - points.append(points[0]) + if len(points) == 2: + # Only 2 point, use a path + return svg.Path( + stroke=_COLORS[subset.type], + stroke_width=1.5, + stroke_dasharray=[4], + vector_effect="non-scaling-stroke", + d=_points_to_svg_path(points), + ) - draw.dashed_line(points, dash=(3, 2), fill=_COLORS[subset.type], width=1) + # For any other points count, return a polygon that should fit any required shape + return svg.Polygon( + fill=_COLORS[subset.type] + "90", # Set alpha channel to 90 for fill color + stroke=_COLORS[subset.type], + stroke_width=1.5, + stroke_dasharray=[4], + vector_effect="non-scaling-stroke", + points=list(sum(points, [])), # Re-flatten the list of coordinates + ) class Map: """Map representation.""" - RESIZE_FACTOR = 3 - def __init__( self, execute_command: Callable[[Command], Coroutine[Any, Any, None]], @@ -194,15 +276,14 @@ def _update_trace_points(self, data: str) -> None: trace_points = _decompress_7z_base64_data(data) for i in range(0, len(trace_points), 5): - byte_position_x = struct.unpack("> 7 & 1 == 0 + + self._map_data.trace_values.append((position_x, position_y, connected)) _LOGGER.debug("[_update_trace_points] finish") @@ -237,6 +318,22 @@ def _draw_map_pieces(self, draw: ImageDraw.ImageDraw) -> None: if pixel_type in [0x01, 0x02, 0x03]: draw.point((point_x, point_y), fill=_COLORS[pixel_type]) + def _get_svg_traces_path(self) -> svg.Path | None: + if len(self._map_data.trace_values) > 0: + _LOGGER.debug("[get_svg_map] Draw Trace") + + return svg.Path( + fill="none", + stroke=_COLORS[_TRACE_MAP], + stroke_width=1.5, + stroke_linejoin="round", + vector_effect="non-scaling-stroke", + transform=[svg.Translate(_OFFSET, _OFFSET), svg.Scale(0.2, 0.2)], + d=_points_to_svg_path(self._map_data.trace_values), + ) + + return None + def enable(self) -> None: """Enable map.""" if self._unsubscribers: @@ -304,8 +401,8 @@ def refresh(self) -> None: self._event_bus.request_refresh(MapTraceEvent) self._event_bus.request_refresh(MajorMapEvent) - def get_base64_map(self, width: int | None = None) -> bytes: - """Return map as base64 image string.""" + def get_svg_map(self, width: int | None = None) -> str: + """Return map as SVG string.""" if not self._unsubscribers: raise MapError("Please enable the map first") @@ -314,74 +411,101 @@ def get_base64_map(self, width: int | None = None) -> bytes: and width == self._last_image.width and not self._map_data.changed ): - _LOGGER.debug("[get_base64_map] No need to update") - return self._last_image.base64_image + _LOGGER.debug("[get_svg_map] No need to update") + return self._last_image.svg_image - _LOGGER.debug("[get_base64_map] Begin") - image = Image.new("RGBA", (6400, 6400)) - draw = DashedImageDraw(image) + _LOGGER.debug("[get_svg_map] Begin") + image = Image.new("RGBA", (6400, 6400)) + draw = ImageDraw.ImageDraw(image) self._draw_map_pieces(draw) + del draw - # Draw Trace Route - if len(self._map_data.trace_values) > 0: - _LOGGER.debug("[get_base64_map] Draw Trace") - draw.line(self._map_data.trace_values, fill=_COLORS[_TRACE_MAP], width=1) + svg_map = svg.SVG() + if image_box := image.getbbox(): + image_box_center = ( + (image_box[0] + image_box[2]) / 2, + (image_box[1] + image_box[3]) / 2, + ) - image_box = image.getbbox() - for subset in self._map_data.map_subsets.values(): - _draw_subset(subset, draw, image_box) + _LOGGER.debug("[get_svg_map] Crop Image") + cropped = image.crop(image_box) + del image - del draw + _LOGGER.debug( + "[get_svg_map] Map current Size: X: %d Y: %d", + cropped.size[0], + cropped.size[1], + ) - _draw_positions(self._map_data.positions, image, image_box) + _LOGGER.debug("[get_svg_map] Saving to buffer") + buffered = BytesIO() + cropped.save(buffered, format="PNG") + del cropped - _LOGGER.debug("[get_base64_map] Crop Image") - cropped = image.crop(image_box) - del image + base64_bg = base64.b64encode(buffered.getvalue()) - _LOGGER.debug("[get_base64_map] Flipping Image") - cropped = ImageOps.flip(cropped) + # Build the SVG elements - _LOGGER.debug( - "[get_base64_map] Map current Size: X: %d Y: %d", - cropped.size[0], - cropped.size[1], - ) + # Elements of the SVG Map to vertically flip + svg_map_group_elements: list[svg.Element] = [] - new_size = None - if width is not None and width > 0: - height = int((width / cropped.size[0]) * cropped.size[1]) - _LOGGER.debug( - "[get_base64_map] Resize based on the requested width: %d and calculated height %d", - width, - height, + # Map background. + svg_map_group_elements.append( + svg.Image( + x=image_box[0], + y=image_box[1], + width=image_box[2] - image_box[0], + height=image_box[3] - image_box[1], + style="image-rendering: pixelated", + href=f"data:image/png;base64,{base64_bg.decode('ascii')}", + ) ) - new_size = (width, height) - elif cropped.size[0] > 400 or cropped.size[1] > 400: - _LOGGER.debug("[get_base64_map] Resize disabled.. map over 400") - else: - resize_factor = Map.RESIZE_FACTOR - _LOGGER.debug("[get_base64_map] Resize factor: %d", resize_factor) - new_size = ( - cropped.size[0] * resize_factor, - cropped.size[1] * resize_factor, + + # Additional subsets (VirtualWalls and NoMopZones) + svg_map_group_elements.extend( + [ + _get_svg_subset(subset, image_box) + for subset in self._map_data.map_subsets.values() + ] + ) + + # Traces (if any) + if svg_traces_path := self._get_svg_traces_path(): + svg_map_group_elements.append(svg_traces_path) + + # Bot and Charge stations + svg_map_group_elements.extend( + _get_svg_positions(self._map_data.positions, image_box) + ) + + # Set map viewBox based on background map bounding box. + svg_map.viewBox = svg.ViewBoxSpec( + image_box[0], + image_box[1], + image_box[2] - image_box[0], + image_box[3] - image_box[1], ) - if new_size is not None: - cropped = cropped.resize(new_size, Image.Resampling.NEAREST) + # Add all elements to the SVG map + svg_map.elements = [ + _SVG_DEFS, + # Elements to vertically flip + svg.G( + transform_origin=f"{image_box_center[0]} {image_box_center[1]}", + transform=[svg.Scale(1, -1)], + elements=svg_map_group_elements, + ), + ] - _LOGGER.debug("[get_base64_map] Saving to buffer") - buffered = BytesIO() - cropped.save(buffered, format="PNG") - del cropped + str_svg_map = str(svg_map) - base64_image = base64.b64encode(buffered.getvalue()) self._map_data.reset_changed() - self._last_image = LastImage(base64_image, width) - _LOGGER.debug("[get_base64_map] Finish") + self._last_image = LastImage(str_svg_map, width) + + _LOGGER.debug("[get_svg_map] Finish") - return base64_image + return str_svg_map async def teardown(self) -> None: """Teardown map.""" @@ -448,95 +572,11 @@ def __eq__(self, obj: object) -> bool: return self._crc32 == obj._crc32 and self._index == obj._index -class DashedImageDraw(ImageDraw.ImageDraw): - """Class extend ImageDraw by dashed line.""" - - # Copied from https://stackoverflow.com/a/65893631 Credits ands - _FILL = str | int | tuple[int, int, int] | tuple[int, int, int, int] | None - - def _thick_line( - self, - xy: list[tuple[int, int]], - direction: list[tuple[int, int]], - fill: _FILL = None, - width: int = 0, - ) -> None: - if xy[0] != xy[1]: - self.line(xy, fill=fill, width=width) - else: - x1, y1 = xy[0] - delta_x = direction[1][0] - direction[0][0] - delta_y = direction[1][1] - direction[0][1] - - if delta_x < 0: - y1 -= 1 - - if delta_y < 0: - x1 -= 1 - - if delta_y != 0: - if delta_x != 0: - k = -delta_x / delta_y - a = 1 / math.sqrt(1 + k**2) - b = (width * a - 1) / 2 - else: - k = 0 - b = (width - 1) / 2 - x1 = x1 - math.floor(b) - y1 = y1 - int(k * b) - x2 = x1 + math.ceil(b) - y2 = y1 + int(k * b) - else: - y1 = y1 - math.floor((width - 1) / 2) - x2 = x1 - y2 = y1 + math.ceil((width - 1) / 2) - self.line([(x1, y1), (x2, y2)], fill=fill, width=1) - - def dashed_line( - self, - xy: list[tuple[int, int]], - dash: tuple[int, int] = (2, 2), - fill: _FILL = None, - width: int = 0, - ) -> None: - """Draw a dashed line, or a connected sequence of line segments.""" - for i in range(len(xy) - 1): - x1, y1 = xy[i] - x2, y2 = xy[i + 1] - x_length = x2 - x1 - y_length = y2 - y1 - length = math.sqrt(x_length**2 + y_length**2) - dash_enabled = True - position = 0 - while position < length: - for dash_step in dash: - if dash_enabled: - start = position / length - end = min((position + dash_step - 1) / length, 1) - self._thick_line( - [ - ( - round(x1 + start * x_length), - round(y1 + start * y_length), - ), - ( - round(x1 + end * x_length), - round(y1 + end * y_length), - ), - ], - xy, - fill, - width, - ) - dash_enabled = not dash_enabled - position += dash_step - - @dataclasses.dataclass(frozen=True) class LastImage: """Last created image.""" - base64_image: bytes + svg_image: str width: int | None @@ -557,7 +597,9 @@ def on_change() -> None: self._map_subsets: OnChangedDict[int, MapSubsetEvent] = OnChangedDict(on_change) self._positions: OnChangedList[Position] = OnChangedList(on_change) self._rooms: OnChangedDict[int, Room] = OnChangedDict(on_change) - self._trace_values: OnChangedList[int] = OnChangedList(on_change) + self._trace_values: OnChangedList[tuple[int, int, bool]] = OnChangedList( + on_change + ) @property def changed(self) -> bool: @@ -592,7 +634,7 @@ def rooms(self) -> dict[int, Room]: return self._rooms @property - def trace_values(self) -> OnChangedList[int]: + def trace_values(self) -> OnChangedList[tuple[int, int, bool]]: """Return trace values.""" return self._trace_values diff --git a/requirements.txt b/requirements.txt index 25bd0299..22ee78dd 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,3 +4,4 @@ cachetools>=5.0.0,<6.0 defusedxml numpy>=1.23.2,<2.0 Pillow>=10.0.1,<11.0 +svg.py>=1.4.2 diff --git a/tests/test_map.py b/tests/test_map.py index cc56e326..f49b2dc6 100644 --- a/tests/test_map.py +++ b/tests/test_map.py @@ -15,9 +15,9 @@ from deebot_client.models import Room _test_calc_point_data = [ - (0, 10, None, (0, 10)), - (10, 100, (100, 0, 200, 50), (200, 50)), - (10, 100, (0, 0, 1000, 1000), (400, 402)), + (0, 10, None, (0.0, 10.0)), + (10, 100, (100, 0, 200, 50), (200.0, 50.0)), + (10, 100, (0, 0, 1000, 1000), (400.2, 402.0)), ] @@ -26,7 +26,7 @@ def test_calc_point( x: int, y: int, image_box: tuple[int, int, int, int] | None, - expected: tuple[int, int], + expected: tuple[float, float], ) -> None: result = _calc_point(x, y, image_box) assert result == expected