Skip to content

Commit

Permalink
Merge pull request #71 from leggedrobotics/feature/max_layer_filter
Browse files Browse the repository at this point in the history
Added max layer filter.
  • Loading branch information
mktk1117 authored Feb 24, 2024
2 parents 69aea24 + 1b30435 commit 5fd2e00
Show file tree
Hide file tree
Showing 5 changed files with 257 additions and 5 deletions.
12 changes: 11 additions & 1 deletion elevation_mapping_cupy/config/core/plugin_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,14 @@ inpainting:
layer_name: "inpaint"
extra_params:
method: "telea" # telea or ns
# Apply smoothing for inpainted layer
# Apply smoothing for inpainted layer
erosion:
enable: True
fill_nan: False
is_height_layer: False
layer_name: "erosion"
extra_params:
input_layer_name: "traversability"
dilation_size: 3
iteration_n: 20
reverse: True
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ publishers:
fps: 5.0

filtered_elevation_map:
layers: ['inpaint', 'smooth', 'min_filter']
layers: ['inpaint', 'smooth', 'min_filter', 'erosion']
basic_layers: ['inpaint']
fps: 5.0

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
#
# Copyright (c) 2024, Takahiro Miki. All rights reserved.
# Licensed under the MIT license. See LICENSE file in the project root for details.
#
import cv2 as cv
import cupy as cp
import numpy as np

from typing import List

from .plugin_manager import PluginBase


class Erosion(PluginBase):
"""
This class is used for applying erosion to an elevation map or specific layers within it.
Erosion is a morphological operation that is used to remove small-scale details from a binary image.
Args:
kernel_size (int): Size of the erosion kernel. Default is 3, which means a 3x3 square kernel.
iterations (int): Number of times erosion is applied. Default is 1.
**kwargs (): Additional keyword arguments.
"""

def __init__(
self,
input_layer_name="traversability",
kernel_size: int = 3,
iterations: int = 1,
reverse: bool = False,
**kwargs,
):
super().__init__()
self.input_layer_name = input_layer_name
self.kernel_size = kernel_size
self.iterations = iterations
self.reverse = reverse

def __call__(
self,
elevation_map: cp.ndarray,
layer_names: List[str],
plugin_layers: cp.ndarray,
plugin_layer_names: List[str],
semantic_map: cp.ndarray,
semantic_layer_names: List[str],
*args,
) -> cp.ndarray:
"""
Applies erosion to the given elevation map.
Args:
elevation_map (cupy._core.core.ndarray): The elevation map to be eroded.
layer_names (List[str]): Names of the layers in the elevation map.
plugin_layers (cupy._core.core.ndarray): Layers provided by other plugins.
plugin_layer_names (List[str]): Names of the layers provided by other plugins.
*args (): Additional arguments.
Returns:
cupy._core.core.ndarray: The eroded elevation map.
"""
# Convert the elevation map to a format suitable for erosion (if necessary)
layer_data = self.get_layer_data(
elevation_map,
layer_names,
plugin_layers,
plugin_layer_names,
semantic_map,
semantic_layer_names,
self.input_layer_name,
)
layer_np = cp.asnumpy(layer_data)

# Define the erosion kernel
kernel = np.ones((self.kernel_size, self.kernel_size), np.uint8)

if self.reverse:
layer_np = 1 - layer_np
# Apply erosion
layer_min = float(layer_np.min())
layer_max = float(layer_np.max())
layer_np_normalized = ((layer_np - layer_min) * 255 / (layer_max - layer_min)).astype("uint8")
eroded_map_np = cv.erode(layer_np_normalized, kernel, iterations=self.iterations)
eroded_map_np = eroded_map_np.astype(np.float32) * (layer_max - layer_min) / 255 + layer_min
if self.reverse:
eroded_map_np = 1 - eroded_map_np

# Convert back to cupy array and return
return cp.asarray(eroded_map_np)
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
#
# Copyright (c) 2022, Takahiro Miki. All rights reserved.
# Licensed under the MIT license. See LICENSE file in the project root for details.
#
import cupy as cp
import numpy as np
from typing import List

from elevation_mapping_cupy.plugins.plugin_manager import PluginBase


class MaxLayerFilter(PluginBase):
"""Applies a maximum filter to the input layers and updates the traversability map.
This can be used to enhance navigation by identifying traversable areas.
Args:
cell_n (int): The width and height of the elevation map.
reverse (list): A list of boolean values indicating whether to reverse the filter operation for each layer. Default is [True].
min_or_max (str): A string indicating whether to apply a minimum or maximum filter. Accepts "min" or "max". Default is "max".
layers (list): List of layers for semantic traversability. Default is ["traversability"].
thresholds (list): List of thresholds for each layer. If the value is bigger than a threshold, assign 1.0 otherwise 0.0. If it is False, it does not apply. Default is [False].
**kwargs: Additional keyword arguments.
"""

def __init__(
self,
cell_n: int = 100,
layers: list = ["traversability"],
reverse: list = [True],
min_or_max: str = "max",
thresholds: list = [False],
default_value: float = 0.0,
**kwargs,
):
super().__init__()
self.layers = layers
self.reverse = reverse
self.min_or_max = min_or_max
self.thresholds = thresholds
self.default_value = default_value

def __call__(
self,
elevation_map: cp.ndarray,
layer_names: List[str],
plugin_layers: cp.ndarray,
plugin_layer_names: List[str],
semantic_map: cp.ndarray,
semantic_layer_names: List[str],
*args,
) -> cp.ndarray:
"""
Args:
elevation_map (cupy._core.core.ndarray):
layer_names (List[str]):
plugin_layers (cupy._core.core.ndarray):
plugin_layer_names (List[str]):
semantic_map (elevation_mapping_cupy.semantic_map.SemanticMap):
*args ():
Returns:
cupy._core.core.ndarray:
"""
layers = []
for it, name in enumerate(self.layers):
layer = self.get_layer_data(
elevation_map, layer_names, plugin_layers, plugin_layer_names, semantic_map, semantic_layer_names, name
)
if layer is None:
continue
if isinstance(self.default_value, float):
layer = cp.where(layer == 0.0, float(self.default_value), layer)
elif isinstance(self.default_value, str):
default_layer = self.get_layer_data(
elevation_map,
layer_names,
plugin_layers,
plugin_layer_names,
semantic_map,
semantic_layer_names,
self.default_value,
)
layer = cp.where(layer == 0, default_layer, layer)
if self.reverse[it]:
layer = 1.0 - layer
if isinstance(self.thresholds[it], float):
layer = cp.where(layer > float(self.thresholds[it]), 1, 0)
layers.append(layer)
if len(layers) == 0:
print("No layers are found, returning traversability!")
idx = layer_names.index("traversability")
return elevation_map[idx]
result = cp.stack(layers, axis=0)
if self.min_or_max == "min":
result = cp.min(result, axis=0)
else:
result = cp.max(result, axis=0)
return result
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
#
from abc import ABC
import cupy as cp
from typing import List, Dict
from typing import List, Dict, Optional
import importlib
import inspect
from dataclasses import dataclass
Expand Down Expand Up @@ -39,6 +39,8 @@ def __call__(
layer_names: List[str],
plugin_layers: cp.ndarray,
plugin_layer_names: List[str],
semantic_map: cp.ndarray,
semantic_layer_names: List[str],
*args,
**kwargs,
) -> cp.ndarray:
Expand All @@ -50,6 +52,8 @@ def __call__(
layer_names ():
plugin_layers ():
plugin_layer_names ():
semantic_map ():
semantic_layer_names ():
Run your processing here and return the result.
layer of elevation_map 0: elevation
Expand All @@ -64,6 +68,45 @@ def __call__(
"""
pass

def get_layer_data(
self,
elevation_map: cp.ndarray,
layer_names: List[str],
plugin_layers: cp.ndarray,
plugin_layer_names: List[str],
semantic_map: cp.ndarray,
semantic_layer_names: List[str],
name: str,
) -> Optional[cp.ndarray]:
"""
Retrieve a copy of the layer data from the elevation, plugin, or semantic maps based on the layer name.
Args:
elevation_map (cp.ndarray): The elevation map containing various layers.
layer_names (List[str]): A list of names for each layer in the elevation map.
plugin_layers (cp.ndarray): The plugin layers containing additional data.
plugin_layer_names (List[str]): A list of names for each layer in the plugin layers.
semantic_map (cp.ndarray): The semantic map containing various layers.
semantic_layer_names (List[str]): A list of names for each layer in the semantic map.
name (str): The name of the layer to retrieve.
Returns:
Optional[cp.ndarray]: A copy of the requested layer as a cupy ndarray if found, otherwise None.
"""
if name in layer_names:
idx = layer_names.index(name)
layer = elevation_map[idx].copy()
elif name in plugin_layer_names:
idx = plugin_layer_names.index(name)
layer = plugin_layers[idx].copy()
elif name in semantic_layer_names:
idx = semantic_layer_names.index(name)
layer = semantic_map[idx].copy()
else:
print(f"Could not find layer {name}!")
layer = None
return layer


class PluginManager(object):
"""
Expand Down Expand Up @@ -152,11 +195,22 @@ def update_with_name(
self.layers[idx] = self.plugins[idx](elevation_map, layer_names, self.layers, self.layer_names)
elif n_param == 7:
self.layers[idx] = self.plugins[idx](
elevation_map, layer_names, self.layers, self.layer_names, semantic_map, semantic_params,
elevation_map,
layer_names,
self.layers,
self.layer_names,
semantic_map,
semantic_params,
)
elif n_param == 8:
self.layers[idx] = self.plugins[idx](
elevation_map, layer_names, self.layers, self.layer_names, semantic_map, semantic_params, rotation,
elevation_map,
layer_names,
self.layers,
self.layer_names,
semantic_map,
semantic_params,
rotation,
)
else:
self.layers[idx] = self.plugins[idx](
Expand Down

0 comments on commit 5fd2e00

Please sign in to comment.