diff --git a/deebot_client/map.py b/deebot_client/map.py index 04d315d91..3f5012296 100644 --- a/deebot_client/map.py +++ b/deebot_client/map.py @@ -11,10 +11,12 @@ import struct from typing import Any, Final import zlib +import re +import itertools from numpy import float64, reshape, zeros from numpy.typing import NDArray -from PIL import Image, ImageDraw, ImageOps +from PIL import Image, ImageDraw from deebot_client.events.map import MapChangedEvent @@ -40,10 +42,19 @@ _LOGGER = get_logger(__name__) _PIXEL_WIDTH = 50 -_POSITION_PNG = { - PositionType.DEEBOT: "iVBORw0KGgoAAAANSUhEUgAAAAYAAAAGCAIAAABvrngfAAAACXBIWXMAAAsTAAALEwEAmpwYAAAF0WlUWHRYTUw6Y29tLmFkb2JlLnhtcAAAAAAAPD94cGFja2V0IGJlZ2luPSLvu78iIGlkPSJXNU0wTXBDZWhpSHpyZVN6TlRjemtjOWQiPz4gPHg6eG1wbWV0YSB4bWxuczp4PSJhZG9iZTpuczptZXRhLyIgeDp4bXB0az0iQWRvYmUgWE1QIENvcmUgNS42LWMxNDUgNzkuMTYzNDk5LCAyMDE4LzA4LzEzLTE2OjQwOjIyICAgICAgICAiPiA8cmRmOlJERiB4bWxuczpyZGY9Imh0dHA6Ly93d3cudzMub3JnLzE5OTkvMDIvMjItcmRmLXN5bnRheC1ucyMiPiA8cmRmOkRlc2NyaXB0aW9uIHJkZjphYm91dD0iIiB4bWxuczp4bXA9Imh0dHA6Ly9ucy5hZG9iZS5jb20veGFwLzEuMC8iIHhtbG5zOnhtcE1NPSJodHRwOi8vbnMuYWRvYmUuY29tL3hhcC8xLjAvbW0vIiB4bWxuczpzdEV2dD0iaHR0cDovL25zLmFkb2JlLmNvbS94YXAvMS4wL3NUeXBlL1Jlc291cmNlRXZlbnQjIiB4bWxuczpkYz0iaHR0cDovL3B1cmwub3JnL2RjL2VsZW1lbnRzLzEuMS8iIHhtbG5zOnBob3Rvc2hvcD0iaHR0cDovL25zLmFkb2JlLmNvbS9waG90b3Nob3AvMS4wLyIgeG1wOkNyZWF0b3JUb29sPSJBZG9iZSBQaG90b3Nob3AgQ0MgMjAxOSAoV2luZG93cykiIHhtcDpDcmVhdGVEYXRlPSIyMDIwLTA1LTI0VDEyOjAzOjE2KzAyOjAwIiB4bXA6TWV0YWRhdGFEYXRlPSIyMDIwLTA1LTI0VDEyOjAzOjE2KzAyOjAwIiB4bXA6TW9kaWZ5RGF0ZT0iMjAyMC0wNS0yNFQxMjowMzoxNiswMjowMCIgeG1wTU06SW5zdGFuY2VJRD0ieG1wLmlpZDo0YWM4NWY5MC1hNWMwLTE2NDktYTQ0MC0xMWM0NWY5OGQ1MDYiIHhtcE1NOkRvY3VtZW50SUQ9ImFkb2JlOmRvY2lkOnBob3Rvc2hvcDo3Zjk3MTZjMi1kZDM1LWJiNDItYjMzZS1hYjYwY2Y4ZTZlZDYiIHhtcE1NOk9yaWdpbmFsRG9jdW1lbnRJRD0ieG1wLmRpZDpiMzhiNGZlMS1lOGNkLTJjNDctYmQwZC1lNmZiNzRhMjFkMDciIGRjOmZvcm1hdD0iaW1hZ2UvcG5nIiBwaG90b3Nob3A6Q29sb3JNb2RlPSIzIj4gPHhtcE1NOkhpc3Rvcnk+IDxyZGY6U2VxPiA8cmRmOmxpIHN0RXZ0OmFjdGlvbj0iY3JlYXRlZCIgc3RFdnQ6aW5zdGFuY2VJRD0ieG1wLmlpZDpiMzhiNGZlMS1lOGNkLTJjNDctYmQwZC1lNmZiNzRhMjFkMDciIHN0RXZ0OndoZW49IjIwMjAtMDUtMjRUMTI6MDM6MTYrMDI6MDAiIHN0RXZ0OnNvZnR3YXJlQWdlbnQ9IkFkb2JlIFBob3Rvc2hvcCBDQyAyMDE5IChXaW5kb3dzKSIvPiA8cmRmOmxpIHN0RXZ0OmFjdGlvbj0ic2F2ZWQiIHN0RXZ0Omluc3RhbmNlSUQ9InhtcC5paWQ6NGFjODVmOTAtYTVjMC0xNjQ5LWE0NDAtMTFjNDVmOThkNTA2IiBzdEV2dDp3aGVuPSIyMDIwLTA1LTI0VDEyOjAzOjE2KzAyOjAwIiBzdEV2dDpzb2Z0d2FyZUFnZW50PSJBZG9iZSBQaG90b3Nob3AgQ0MgMjAxOSAoV2luZG93cykiIHN0RXZ0OmNoYW5nZWQ9Ii8iLz4gPC9yZGY6U2VxPiA8L3htcE1NOkhpc3Rvcnk+IDwvcmRmOkRlc2NyaXB0aW9uPiA8L3JkZjpSREY+IDwveDp4bXBtZXRhPiA8P3hwYWNrZXQgZW5kPSJyIj8+AP7+NwAAAFpJREFUCJllzEEKgzAQhtFvMkSsEKj30oUXrYserELA1obhd+nCd4BnksZ53X4Cnr193ov59Iq+o2SA2vz4p/iKkgkRouTYlbhJ/jBqww03avPBTNI4rdtx9ScfWyYCg52e0gAAAABJRU5ErkJggg==", # nopep8 - PositionType.CHARGER: "iVBORw0KGgoAAAANSUhEUgAAAAoAAAAOCAYAAAAWo42rAAAAdUlEQVQoU2NkQAP/nzD8BwkxyjAwIkuhcEASRCmEKYKZhGwq3ER0ReiKSVOIyzRkU8EmwhUyKzAwSNyHyL9QZGD4+wDMBLmVEasimFHIiuEKpcHBhwmeQryBMJFohcjuw2s1SBKHZ8BWo/gauyshvobJEYoZAEOSPXnhzwZnAAAAAElFTkSuQmCC", # nopep8 + +_POSITIONS_SVG_ORDER = { + PositionType.DEEBOT: 0, + PositionType.CHARGER: 1, } + +_SVG_COORDS_COMPACT = re.compile(r"(?:(?<=\D)\s)|(?:\s(?=\D))") + +_SVG_MAP_MARGIN = 5 + +# Categorigal palette for 12 non related elements +_ROOM_COLORS = ["#a6cee3", "#1f78b4", "#b2df8a", "#33a02c", "#fb9a99", "#e31a1c", "#fdbf6f", "#ff7f00", "#cab2d6", "#6a3d9a", "#ffff99", "#b15928"] + _OFFSET = 400 _TRACE_MAP = "trace_map" _COLORS = { @@ -77,10 +88,11 @@ def _decompress_7z_base64_data(data: str) -> bytes: return decompressed_data -def _calc_value(value: int, min_value: int, max_value: int) -> int: +def _calc_value(value: int, min_value: int, max_value: int) -> float: try: if value is not None: - new_value = int((int(value) / _PIXEL_WIDTH) + _OFFSET) + # SVG allows sub-pixel precision, so we use floating point coordinates for better placement. + new_value = (float(value) / _PIXEL_WIDTH) + _OFFSET # return value inside min and max return min(max_value, max(min_value, new_value)) @@ -92,7 +104,7 @@ def _calc_value(value: int, min_value: int, max_value: int) -> int: def _calc_point( x: int, y: int, image_box: tuple[int, int, int, int] | None -) -> tuple[int, int]: +) -> tuple[float, float]: if image_box is None: image_box = (0, 0, x, y) @@ -101,44 +113,64 @@ def _calc_point( _calc_value(y, image_box[1], image_box[3]), ) - -def _draw_positions( - positions: list[Position], - image: Image.Image, - image_box: tuple[int, int, int, int] | None, +def _points_to_svg_path( + points: list[Any] ) -> None: - for position in positions: - icon = Image.open(BytesIO(base64.b64decode(_POSITION_PNG[position.type]))) - image.paste( - icon, - _calc_point(position.x, position.y, image_box), - icon.convert("RGBA"), - ) + """Convert a set of simple point (x, y), or trace points (x, y, connected, type) to a compacted + SVG path instruction. + """ + + path_points = [] + for prev_p, p in itertools.pairwise([None, *points]): + if p != prev_p: # Skip repeated points + if (prev_p): + # Relativize coords in order to generate compacted path + path_points.append("l" if len(p) == 2 or p[2] else "m") + path_points.extend(map(lambda a, b: str(a - b), p[0 : 2], prev_p[0 : 2])) + else: + # No previous point, use absolute coordinates for initial position + path_points.append("M") + path_points.extend(map(str, p[0 : 2])) + # Further compact the path (keep only whitespaces between two numeric characters) + return _SVG_COORDS_COMPACT.sub("", " ".join(path_points)) -def _draw_subset( + + +def _get_svg_positions( + positions: list[Position], + image_box: tuple[int, int, int, int] | None, +) -> str: + svg_positions = [] + for position in sorted(positions, key=lambda x: _POSITIONS_SVG_ORDER[x.type]): + pos = _calc_point(position.x, position.y, image_box) + svg_positions.append(f"") + + return "".join(svg_positions) + +def _get_svg_subset( subset: MapSubsetEvent, - draw: "DashedImageDraw", image_box: tuple[int, int, int, int] | None, -) -> None: +) -> str: coordinates_ = ast.literal_eval(subset.coordinates) + points: list[tuple[int, int]] = [ _calc_point(coordinates_[i], coordinates_[i + 1], image_box) for i in range(0, len(coordinates_), 2) ] if len(points) == 4: - # close rectangle - points.append(points[0]) - - draw.dashed_line(points, dash=(3, 2), fill=_COLORS[subset.type], width=1) - + # Return polygon + svg_coords = list(sum(points, ())) + return f"""""" + # Return path + return f"""""" class Map: """Map representation.""" - RESIZE_FACTOR = 3 - def __init__( self, execute_command: Callable[[Command], Coroutine[Any, Any, None]], @@ -196,13 +228,13 @@ def _update_trace_points(self, data: str) -> None: for i in range(0, len(trace_points), 5): byte_position_x = struct.unpack("> 7) & 1) != 0) + type = point_data & 1 - self._map_data.trace_values.append(position_x) - self._map_data.trace_values.append(position_y) + self._map_data.trace_values.append((byte_position_x[0], byte_position_y[0], connected, type)) _LOGGER.debug("[_update_trace_points] finish") @@ -237,6 +269,57 @@ def _draw_map_pieces(self, draw: ImageDraw.ImageDraw) -> None: if pixel_type in [0x01, 0x02, 0x03]: draw.point((point_x, point_y), fill=_COLORS[pixel_type]) + def _get_svg_traces_path(self) -> str: + if len(self._map_data.trace_values) > 0: + _LOGGER.debug("[get_svg_map] Draw Trace") + + return f"""""" + + return "" + + def _get_svg_rooms(self, image_box: tuple[int, int, int, int], image_box_center: tuple[float, float]) -> tuple[list[str], list[str]] : + svg_rooms_elements = [] + svg_rooms_labels = [] + + for room, color in zip(sorted(self._map_data.rooms.keys()), itertools.cycle(_ROOM_COLORS)): + # Split coordinates into a flat sequence + room_coords = re.split("[;,]",_decompress_7z_base64_data(self._map_data.rooms[room].coordinates).decode('ascii')) + + # SVG compacted presentation + svg_room_coords = _SVG_COORDS_COMPACT.sub("", " ".join(room_coords)) + + # Append to room svg elements + svg_rooms_elements.append( + f"""""") + + + room_name = self._map_data.rooms[room].name + if room_name != "Default": + # Calculate label positions (cannot use SVG transformations, as they are applied to the whole text, + # which would result in text to be vertically flipped...) + + # Get a rough room center. + room_center_x = sum(float(x) for x in room_coords[0::2]) / (len(room_coords) / 2) + room_center_y = sum(float(y) for y in room_coords[1::2]) / (len(room_coords) / 2) + + # Get map relative position + room_center_pos = _calc_point(room_center_x, room_center_y, image_box) + + # Add the text, with position vertically flipped on map center + svg_rooms_labels.append(f"""{room_name}""") + + return (svg_rooms_elements, svg_rooms_labels) + def enable(self) -> None: """Enable map.""" if self._unsubscribers: @@ -304,8 +387,8 @@ def refresh(self) -> None: self._event_bus.request_refresh(MapTraceEvent) self._event_bus.request_refresh(MajorMapEvent) - def get_base64_map(self, width: int | None = None) -> bytes: - """Return map as base64 image string.""" + def get_svg_map(self, width: int | None = None) -> str: + """Return map as SVG string.""" if not self._unsubscribers: raise MapError("Please enable the map first") @@ -314,74 +397,91 @@ def get_base64_map(self, width: int | None = None) -> bytes: and width == self._last_image.width and not self._map_data.changed ): - _LOGGER.debug("[get_base64_map] No need to update") - return self._last_image.base64_image - - _LOGGER.debug("[get_base64_map] Begin") + _LOGGER.debug("[get_svg_map] No need to update") + return self._last_image.svg_image + + _LOGGER.debug("[get_svg_map] Begin") + image = Image.new("RGBA", (6400, 6400)) - draw = DashedImageDraw(image) - + draw = ImageDraw.ImageDraw(image) self._draw_map_pieces(draw) - - # Draw Trace Route - if len(self._map_data.trace_values) > 0: - _LOGGER.debug("[get_base64_map] Draw Trace") - draw.line(self._map_data.trace_values, fill=_COLORS[_TRACE_MAP], width=1) - - image_box = image.getbbox() - for subset in self._map_data.map_subsets.values(): - _draw_subset(subset, draw, image_box) - del draw - _draw_positions(self._map_data.positions, image, image_box) - - _LOGGER.debug("[get_base64_map] Crop Image") - cropped = image.crop(image_box) - del image + image_box = image.getbbox() - _LOGGER.debug("[get_base64_map] Flipping Image") - cropped = ImageOps.flip(cropped) + if image_box: + image_box_center = ((image_box[0] + image_box[2]) / 2, (image_box[1] + image_box[3]) / 2) - _LOGGER.debug( - "[get_base64_map] Map current Size: X: %d Y: %d", - cropped.size[0], - cropped.size[1], - ) + _LOGGER.debug("[get_svg_map] Crop Image") + cropped = image.crop(image_box) + del image - new_size = None - if width is not None and width > 0: - height = int((width / cropped.size[0]) * cropped.size[1]) _LOGGER.debug( - "[get_base64_map] Resize based on the requested width: %d and calculated height %d", - width, - height, + "[get_svg_map] Map current Size: X: %d Y: %d", + cropped.size[0], + cropped.size[1], ) - new_size = (width, height) - elif cropped.size[0] > 400 or cropped.size[1] > 400: - _LOGGER.debug("[get_base64_map] Resize disabled.. map over 400") - else: - resize_factor = Map.RESIZE_FACTOR - _LOGGER.debug("[get_base64_map] Resize factor: %d", resize_factor) - new_size = ( - cropped.size[0] * resize_factor, - cropped.size[1] * resize_factor, - ) - - if new_size is not None: - cropped = cropped.resize(new_size, Image.Resampling.NEAREST) - - _LOGGER.debug("[get_base64_map] Saving to buffer") - buffered = BytesIO() - cropped.save(buffered, format="PNG") - del cropped - base64_image = base64.b64encode(buffered.getvalue()) + _LOGGER.debug("[get_svg_map] Saving to buffer") + buffered = BytesIO() + cropped.save(buffered, format="PNG") + del cropped + + base64_bg = base64.b64encode(buffered.getvalue()) + + # Build the SVG XML + + svg_positions = _get_svg_positions(self._map_data.positions, image_box) + + svg_subset_elements = [_get_svg_subset(subset, image_box) for subset in self._map_data.map_subsets.values()] + + svg_rooms_elements, svg_rooms_labels = self._get_svg_rooms(image_box, image_box_center) + + svg_traces_path = self._get_svg_traces_path() + + svg_map = f""" + + + + + + + + + + + + + + + + + + + + + + + {"".join(svg_rooms_elements)} + {"".join(svg_subset_elements)} + {svg_traces_path} + {svg_positions} + + {"".join(svg_rooms_labels)} + + """ + else: + # No map data yet, generate an empty SVG. + svg_map = """""" + self._map_data.reset_changed() - self._last_image = LastImage(base64_image, width) - _LOGGER.debug("[get_base64_map] Finish") + self._last_image = LastImage(svg_map, width) + _LOGGER.debug("[get_svg_map] Finish") + + return svg_map - return base64_image async def teardown(self) -> None: """Teardown map.""" @@ -448,95 +548,10 @@ def __eq__(self, obj: object) -> bool: return self._crc32 == obj._crc32 and self._index == obj._index -class DashedImageDraw(ImageDraw.ImageDraw): - """Class extend ImageDraw by dashed line.""" - - # Copied from https://stackoverflow.com/a/65893631 Credits ands - _FILL = str | int | tuple[int, int, int] | tuple[int, int, int, int] | None - - def _thick_line( - self, - xy: list[tuple[int, int]], - direction: list[tuple[int, int]], - fill: _FILL = None, - width: int = 0, - ) -> None: - if xy[0] != xy[1]: - self.line(xy, fill=fill, width=width) - else: - x1, y1 = xy[0] - delta_x = direction[1][0] - direction[0][0] - delta_y = direction[1][1] - direction[0][1] - - if delta_x < 0: - y1 -= 1 - - if delta_y < 0: - x1 -= 1 - - if delta_y != 0: - if delta_x != 0: - k = -delta_x / delta_y - a = 1 / math.sqrt(1 + k**2) - b = (width * a - 1) / 2 - else: - k = 0 - b = (width - 1) / 2 - x1 = x1 - math.floor(b) - y1 = y1 - int(k * b) - x2 = x1 + math.ceil(b) - y2 = y1 + int(k * b) - else: - y1 = y1 - math.floor((width - 1) / 2) - x2 = x1 - y2 = y1 + math.ceil((width - 1) / 2) - self.line([(x1, y1), (x2, y2)], fill=fill, width=1) - - def dashed_line( - self, - xy: list[tuple[int, int]], - dash: tuple[int, int] = (2, 2), - fill: _FILL = None, - width: int = 0, - ) -> None: - """Draw a dashed line, or a connected sequence of line segments.""" - for i in range(len(xy) - 1): - x1, y1 = xy[i] - x2, y2 = xy[i + 1] - x_length = x2 - x1 - y_length = y2 - y1 - length = math.sqrt(x_length**2 + y_length**2) - dash_enabled = True - position = 0 - while position < length: - for dash_step in dash: - if dash_enabled: - start = position / length - end = min((position + dash_step - 1) / length, 1) - self._thick_line( - [ - ( - round(x1 + start * x_length), - round(y1 + start * y_length), - ), - ( - round(x1 + end * x_length), - round(y1 + end * y_length), - ), - ], - xy, - fill, - width, - ) - dash_enabled = not dash_enabled - position += dash_step - - @dataclasses.dataclass(frozen=True) class LastImage: """Last created image.""" - - base64_image: bytes + svg_image: str width: int | None @@ -557,7 +572,7 @@ def on_change() -> None: self._map_subsets: OnChangedDict[int, MapSubsetEvent] = OnChangedDict(on_change) self._positions: OnChangedList[Position] = OnChangedList(on_change) self._rooms: OnChangedDict[int, Room] = OnChangedDict(on_change) - self._trace_values: OnChangedList[int] = OnChangedList(on_change) + self._trace_values: OnChangedList[tuple[int, int, bool, int]] = OnChangedList(on_change) @property def changed(self) -> bool: @@ -592,7 +607,7 @@ def rooms(self) -> dict[int, Room]: return self._rooms @property - def trace_values(self) -> OnChangedList[int]: + def trace_values(self) -> OnChangedList[tuple[int, int, bool, int]]: """Return trace values.""" return self._trace_values