Skip to content

Commit

Permalink
Improved thread management. Now the code catches Ctrl + C. Still clos…
Browse files Browse the repository at this point in the history
…ing the simulation window will lead to a mess.
  • Loading branch information
AntoineRichard committed Sep 24, 2024
1 parent 00a3aa7 commit eb0ef3b
Show file tree
Hide file tree
Showing 6 changed files with 370 additions and 67 deletions.
3 changes: 3 additions & 0 deletions src/environments/large_scale_lunar.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,9 @@ def update(self) -> None:
p, _ = self.pose_tracker()
self.LSTM.update_visual_mesh((p[0], p[1]))

def monitor_thread_is_alive(self) -> None:
return self.LSTM.map_manager.hr_dem_gen.monitor_thread.thread.is_alive()

def load(self) -> None:
"""
Loads the lab interactive elements in the stage.
Expand Down
7 changes: 7 additions & 0 deletions src/environments_wrappers/ros2/base_wrapper_ros2.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,3 +210,10 @@ def clean_scene(self):
"""

self.destroy_node()

def monitor_thread_is_alive(self):
"""
Checks if the monitor thread is alive.
"""

return True
8 changes: 7 additions & 1 deletion src/environments_wrappers/ros2/largescale_ros2.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ def __init__(
self,
environment_cfg: dict = None,
is_simulation_alive: callable = lambda: True,
close_simulation: callable = lambda: None,
**kwargs,
) -> None:
"""
Expand All @@ -37,7 +38,9 @@ def __init__(
"""

super().__init__(environment_cfg=environment_cfg, **kwargs)
self.LC = LargeScaleController(**environment_cfg, is_simulation_alive=is_simulation_alive)
self.LC = LargeScaleController(
**environment_cfg, is_simulation_alive=is_simulation_alive, close_simulation=close_simulation
)
self.LC.load()

self.create_subscription(Float32, "/OmniLRS/Sun/Intensity", self.set_sun_intensity, 1)
Expand Down Expand Up @@ -118,3 +121,6 @@ def set_sun_pose(self, data: Pose) -> None:
position = [data.position.x, data.position.y, data.position.z]
orientation = [data.orientation.w, data.orientation.y, data.orientation.z, data.orientation.x]
self.modifications.append([self.LC.set_sun_pose, {"position": position, "orientation": orientation}])

def monitor_thread_is_alive(self):
return self.LC.monitor_thread_is_alive()
41 changes: 31 additions & 10 deletions src/environments_wrappers/ros2/simulation_manager_ros2.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from omni.isaac.kit import SimulationApp
from omni.isaac.core import World
from typing import Union
import logging
import omni
import time

Expand All @@ -21,6 +22,10 @@
from src.configurations.procedural_terrain_confs import TerrainManagerConf
from rclpy.executors import SingleThreadedExecutor as Executor
from src.physics.physics_scene import PhysicsSceneManager
import rclpy

logger = logging.getLogger(__name__)
logging.basicConfig(format="%(asctime)s %(message)s", datefmt="%m/%d/%Y %I:%M:%S %p")


class Rate:
Expand Down Expand Up @@ -93,7 +98,7 @@ def register(
def __call__(
self,
cfg: dict,
is_simulation_alive: callable,
**kwargs,
) -> Union[ROS_LunalabManager, ROS_LunaryardManager]:
"""
Returns an instance of the lab manager corresponding to the environment name.
Expand All @@ -107,7 +112,7 @@ def __call__(

return self._lab_managers[cfg["environment"]["name"]](
environment_cfg=cfg["environment"],
is_simulation_alive=is_simulation_alive,
**kwargs,
)


Expand Down Expand Up @@ -160,16 +165,18 @@ def __init__(
self.rate = Rate(is_disabled=True)

# Lab manager thread
self.ROSLabManager = ROS2_LMF(cfg, self.simulation_app.is_running)
exec1 = Executor()
exec1.add_node(self.ROSLabManager)
self.exec1_thread = Thread(target=exec1.spin, daemon=True, args=())
self.ROSLabManager = ROS2_LMF(
cfg, is_simulation_alive=self.simulation_app.is_running, close_simulation=self.simulation_app.close
)
self.exec1 = Executor()
self.exec1.add_node(self.ROSLabManager)
self.exec1_thread = Thread(target=self.exec1.spin, daemon=True, args=())
self.exec1_thread.start()
# Robot manager thread
self.ROSRobotManager = ROS_RobotManager(cfg["environment"]["robots_settings"])
exec2 = Executor()
exec2.add_node(self.ROSRobotManager)
self.exec2_thread = Thread(target=exec2.spin, daemon=True, args=())
self.exec2 = Executor()
self.exec2.add_node(self.ROSRobotManager)
self.exec2_thread = Thread(target=self.exec2.spin, daemon=True, args=())
self.exec2_thread.start()

# Have you ever asked your self: "Is there a limit of topics one can subscribe to in ROS2?"
Expand Down Expand Up @@ -222,6 +229,20 @@ def run_simulation(self) -> None:
if self.world.current_time_step_index >= (self.deform_delay * self.world.get_physics_dt()):
self.ROSLabManager.LC.deform_terrain()
# self.ROSLabManager.LC.applyTerramechanics()
self.rate.sleep()
if not self.ROSLabManager.monitor_thread_is_alive():
logger.debug("Destroying the ROS nodes")
self.ROSLabManager.destroy_node()
self.ROSRobotManager.destroy_node()
logger.debug("Shutting down the ROS executors")
self.exec1.shutdown()
self.exec2.shutdown()
logger.debug("Joining the ROS threads")
self.exec1_thread.join()
self.exec2_thread.join()
logger.debug("Shutting down ROS2")
rclpy.shutdown()
break

self.rate.sleep()
self.world.stop()
self.timeline.stop()
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import numpy as np
import dataclasses
import threading
import warnings
import logging
import time
import copy

Expand All @@ -34,6 +34,9 @@
ThreadMonitor,
)

logger = logging.getLogger(__name__)
logging.basicConfig(format="%(asctime)s %(message)s", datefmt="%m/%d/%Y %I:%M:%S %p")


@dataclasses.dataclass
class HighResDEMConf:
Expand Down Expand Up @@ -111,15 +114,21 @@ def __init__(
low_res_dem: np.ndarray,
settings: HighResDEMGenConf,
profiling: bool = True,
is_simulation_alive: callable = lambda: True,
close_simulation: callable = lambda: None,
) -> None:
"""
Args:
low_res_dem (np.ndarray): The low resolution DEM.
settings (HighResDEMGenConf): The settings for the high resolution DEM generation.
profiling (bool): True if the profiling is enabled, False otherwise.
is_simulation_alive (callable): A callable that returns True if the simulation is alive, False otherwise.
close_simulation (callable): A callable that closes the simulation.
"""
self.low_res_dem = low_res_dem
self.settings = settings
self.is_simulation_alive = is_simulation_alive
self.close_simulation = close_simulation

self.current_block_coord = (0, 0)
self.sim_is_warm = False
Expand Down Expand Up @@ -149,7 +158,9 @@ def build(self) -> None:
# Creates the worker managers that will distribute the work to the workers.
# This enables the generation of craters and the interpolation of the terrain
# data to be done in parallel.
self.monitor_thread = ThreadMonitor()
self.monitor_thread = ThreadMonitor(
is_simulation_alive=self.is_simulation_alive, close_simulation=self.close_simulation
)

self.crater_builder_manager = CraterBuilderManager(
settings=self.settings.crater_worker_manager_cfg,
Expand All @@ -161,6 +172,7 @@ def build(self) -> None:
interp=self.interpolator,
parent_thread=self.monitor_thread.thread,
)
self.monitor_thread.add_shutdowns(self.crater_builder_manager.shutdown, self.interpolator_manager.shutdown)
# Instantiates the high resolution DEM with the given settings.
self.settings = self.settings.high_res_dem_cfg
self.build_block_grid()
Expand Down Expand Up @@ -429,7 +441,7 @@ def shift(self, coordinates: Tuple[float, float]) -> None:
# even if the terrain reconstruction is not complete. This would prevent a full simulation lock
# that waits for one reconstruction to finish before starting the next one.

print("Called shift")
logger.debug("Called shift")
with ScopedTimer("Shift", active=self.profiling):
# Compute initial coordinates in block space
with ScopedTimer("Cast coordinates to block space", active=self.profiling):
Expand Down Expand Up @@ -459,7 +471,7 @@ def shift(self, coordinates: Tuple[float, float]) -> None:
with ScopedTimer("Add terrain blocks to queues", active=self.profiling):
# Asynchronous terrain block generation
self.generate_terrain_blocks() # <-- This is the bit that needs to be edited to support multiple calls
print("Shift done")
logger.debug("Shift done")

def get_height(self, coordinates: Tuple[float, float]) -> float:
"""
Expand Down Expand Up @@ -576,30 +588,29 @@ def update_high_res_dem(self, coords: Tuple[float, float]) -> bool:

# Initial map generation
if not self.sim_is_warm:
print("Warming up simulation")
logger.debug("Warming up simulation")
self.shift(block_coordinates)
# Threaded update, the function will return before the update is done
threading.Thread(target=self.threaded_high_res_dem_update).start()
thread = threading.Thread(target=self.threaded_high_res_dem_update)
thread.start()
self.sim_is_warm = True
updated = True

# Map update if the block has changed
if self.current_block_coord != block_coordinates:
print("Triggering high res DEM update")
logger.debug("Triggering high res DEM update")
if not self.terrain_is_primed:
print("Map is not done, waiting for the terrain data")
logger.debug("Map is not done, waiting for the terrain data")
while not self.terrain_is_primed:
time.sleep(0.2)
print([self.block_grid_tracker[coord] for coord in self.list_missing_blocks()])
# print(self.crater_builder_manager.get_load_per_worker())
# print(self.interpolator_manager.get_load_per_worker())
print("Map is done carrying on")
logger.debug([self.block_grid_tracker[coord] for coord in self.list_missing_blocks()])
logger.debug("Map is done carrying on")
# Threaded update, the function will return before the update is done
if self.thread is None:
self.shift(coords)
self.thread = threading.Thread(target=self.threaded_high_res_dem_update).start()
elif self.thread.is_alive():
print("Thread is alive waiting for it to finish")
logger.debug("Thread is alive waiting for it to finish")
while self.thread.is_alive():
time.sleep(0.1)
self.shift(coords)
Expand Down Expand Up @@ -639,12 +650,12 @@ def threaded_high_res_dem_update(self) -> None:
# even if the terrain reconstruction is not complete. This would prevent a full simulation lock
# that waits for one reconstruction to finish before starting the next one.

print("Opening thread")
logger.debug("Opening thread")
self.terrain_is_primed = False
while (not self.is_map_done()) and (self.monitor_thread.thread.is_alive()):
self.collect_terrain_data()
time.sleep(0.1)
print("Thread closing map is done")
logger.debug("Thread closing map is done")
self.terrain_is_primed = True

def generate_craters_metadata(self, new_block_coord) -> None:
Expand Down Expand Up @@ -672,7 +683,7 @@ def generate_craters_metadata(self, new_block_coord) -> None:
self.block_grid_tracker[local_coords]["has_crater_metadata"] = True
else:
self.block_grid_tracker[local_coords]["has_crater_metadata"] = False
print(f"Block {coords} does not have crater metadata")
logger.warn(f"Block {coords} does not have crater metadata")

def querry_low_res_dem(self, coordinates: Tuple[float, float]) -> np.ndarray:
"""
Expand Down Expand Up @@ -766,7 +777,7 @@ def collect_terrain_data(self) -> None:

# Offset to account for the padding blocks
offset = int((self.settings.num_blocks + 1) * self.settings.block_size / self.settings.resolution)
print("collecting...")
logger.debug("collecting...")
# Collect the results from the workers responsible for adding craters
crater_results = self.crater_builder_manager.collect_results()
for coords, data in crater_results:
Expand Down
Loading

0 comments on commit eb0ef3b

Please sign in to comment.