diff --git a/cute_shm/core.py b/cute_shm/core.py index 5dd57d9..44d9d5e 100644 --- a/cute_shm/core.py +++ b/cute_shm/core.py @@ -89,6 +89,8 @@ def bytes_to_human(size_bytes: int) -> str: if size_bytes == 0: return "0 bytes" size_name = ("bytes", "KB", "MB", "GB", "TB", "PB") + if size_bytes <= 0: + return "" i = int(math.floor(math.log(size_bytes, 1024))) p = math.pow(1024, i) s = round(size_bytes / p, 2) diff --git a/cute_shm/evaluation.py b/cute_shm/evaluation.py index ad95ce0..64396b6 100644 --- a/cute_shm/evaluation.py +++ b/cute_shm/evaluation.py @@ -11,7 +11,7 @@ from rich.console import Console from rich.table import Table -from .core import ArrayDict +from .core import ArrayDict, bytes_to_human from .hdf5_shm import unlinked_hdf5_to_shm from .numpy_shm import shm_to_arrays @@ -51,7 +51,10 @@ def access_data(key, array): for key, array in shm_arrays.items(): access_data(key, array) end_time = time.time() - result["frequency"] = iterations / (end_time - start_time) + try: + result["frequency"] += iterations / (end_time - start_time) + except KeyError: + result["frequency"] = iterations / (end_time - start_time) def access_directly( @@ -72,22 +75,45 @@ def access_data(key, dataset): for key, dataset in data_dict.items(): access_data(key, dataset) end_time = time.time() - result["frequency"] = iterations / (end_time - start_time) + try: + result["frequency"] += iterations / (end_time - start_time) + except KeyError: + result["frequency"] = iterations / (end_time - start_time) -from .core import bytes_to_human - - -def monitor_ram_usage( - stop_event: multiprocessing.Event, result: multiprocessing.Manager().dict +def _monitor_ram_usage( + stop_event: multiprocessing.Event, max_value: multiprocessing.Value ) -> None: max_ram_usage = 0 while not stop_event.is_set(): - ram_usage = psutil.virtual_memory().used + memory = psutil.virtual_memory() + ram_usage = memory.total - memory.available if ram_usage > max_ram_usage: max_ram_usage = ram_usage - time.sleep(0.1) - result["max_ram_usage"] = max_ram_usage + time.sleep(0.01) + max_value.value = max_ram_usage + + +from contextlib import contextmanager + + +class RAM_Monitor: + + @classmethod + def start(cls): + cls._manager = multiprocessing.Manager() + cls._max_usage = multiprocessing.Value("Q", 0) + cls._stop_event = multiprocessing.Event() + cls._ram_monitor = multiprocessing.Process( + target=_monitor_ram_usage, args=(cls._stop_event, cls._max_usage) + ) + cls._ram_monitor.start() + + @classmethod + def stop(cls) -> int: + cls._stop_event.set() + cls._ram_monitor.join() + return cls._max_usage.value def run_experiment( @@ -101,13 +127,10 @@ def run_experiment( manager = multiprocessing.Manager() results = manager.dict() - stop_event = multiprocessing.Event() - ram_monitor = multiprocessing.Process( - target=monitor_ram_usage, args=(stop_event, results) - ) - ram_monitor.start() + RAM_Monitor.start() + ts1 = time.time() processes = [] for _ in range(num_processes): if use_shm: @@ -120,16 +143,16 @@ def run_experiment( target=access_directly, args=(data_dict, iterations, results) ) processes.append(p) + + for p in processes: p.start() for p in processes: p.join() - stop_event.set() - ram_monitor.join() + max_ram_usage = RAM_Monitor.stop() avg_frequency = results["frequency"] / num_processes - max_ram_usage = results["max_ram_usage"] return { "num_processes": num_processes, @@ -147,7 +170,7 @@ def evaluation(): parser.add_argument( "--iterations", type=int, - default=1000, + default=5000, help="Number of iterations for each process", ) args = parser.parse_args() @@ -167,7 +190,11 @@ def evaluation(): table.add_column("Max RAM Usage (Direct Access)", justify="center") logging.info("Loading data from HDF5 file to the RAM (direct, no shared memory)") + RAM_Monitor.start() data_dict = hdf5_to_dict(hdf5_path) + logging.info( + f"Max observed RAM usage during load: {bytes_to_human(RAM_Monitor.stop())}" + ) logging.info("Running experiments with direct access to the RAM") direct_results = [] @@ -187,7 +214,11 @@ def evaluation(): data_dict = None logging.info("Loading data from HDF5 file to the shared memory") + RAM_Monitor.start() with unlinked_hdf5_to_shm(hdf5_path, project_name, overwrite=True, progress=True): + logging.info( + f"Max observed RAM usage during load: {bytes_to_human(RAM_Monitor.stop())}" + ) logging.info("Running experiments with shared memory") shm_results = [] for num_processes in [1, 5, 10, 15]: @@ -207,8 +238,8 @@ def evaluation(): str(direct_result["num_processes"]), f"{shm_result['avg_frequency']:.2f}", f"{direct_result['avg_frequency']:.2f}", - psutil._common.bytes2human(shm_result["max_ram_usage"]), - psutil._common.bytes2human(direct_result["max_ram_usage"]), + bytes_to_human(shm_result["max_ram_usage"]), + bytes_to_human(direct_result["max_ram_usage"]), ) console.print(table) diff --git a/pyproject.toml b/pyproject.toml index 714bbbf..8955930 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "cute-shm" -version = "1.01" +version = "1.02" description = "managing np arrays stored in the shared memory" authors = [ "Vincent Berenz ",