Skip to content

Commit

Permalink
version 3.02. minor updates in evaluation
Browse files Browse the repository at this point in the history
  • Loading branch information
Vincent Berenz committed Aug 22, 2024
1 parent 1bd04be commit 7d59bfd
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 23 deletions.
2 changes: 2 additions & 0 deletions cute_shm/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ def bytes_to_human(size_bytes: int) -> str:
if size_bytes == 0:
return "0 bytes"
size_name = ("bytes", "KB", "MB", "GB", "TB", "PB")
if size_bytes <= 0:
return "<unknown>"
i = int(math.floor(math.log(size_bytes, 1024)))
p = math.pow(1024, i)
s = round(size_bytes / p, 2)
Expand Down
75 changes: 53 additions & 22 deletions cute_shm/evaluation.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from rich.console import Console
from rich.table import Table

from .core import ArrayDict
from .core import ArrayDict, bytes_to_human
from .hdf5_shm import unlinked_hdf5_to_shm
from .numpy_shm import shm_to_arrays

Expand Down Expand Up @@ -51,7 +51,10 @@ def access_data(key, array):
for key, array in shm_arrays.items():
access_data(key, array)
end_time = time.time()
result["frequency"] = iterations / (end_time - start_time)
try:
result["frequency"] += iterations / (end_time - start_time)
except KeyError:
result["frequency"] = iterations / (end_time - start_time)


def access_directly(
Expand All @@ -72,22 +75,45 @@ def access_data(key, dataset):
for key, dataset in data_dict.items():
access_data(key, dataset)
end_time = time.time()
result["frequency"] = iterations / (end_time - start_time)
try:
result["frequency"] += iterations / (end_time - start_time)
except KeyError:
result["frequency"] = iterations / (end_time - start_time)


from .core import bytes_to_human


def monitor_ram_usage(
stop_event: multiprocessing.Event, result: multiprocessing.Manager().dict
def _monitor_ram_usage(
stop_event: multiprocessing.Event, max_value: multiprocessing.Value
) -> None:
max_ram_usage = 0
while not stop_event.is_set():
ram_usage = psutil.virtual_memory().used
memory = psutil.virtual_memory()
ram_usage = memory.total - memory.available
if ram_usage > max_ram_usage:
max_ram_usage = ram_usage
time.sleep(0.1)
result["max_ram_usage"] = max_ram_usage
time.sleep(0.01)
max_value.value = max_ram_usage


from contextlib import contextmanager


class RAM_Monitor:

@classmethod
def start(cls):
cls._manager = multiprocessing.Manager()
cls._max_usage = multiprocessing.Value("Q", 0)
cls._stop_event = multiprocessing.Event()
cls._ram_monitor = multiprocessing.Process(
target=_monitor_ram_usage, args=(cls._stop_event, cls._max_usage)
)
cls._ram_monitor.start()

@classmethod
def stop(cls) -> int:
cls._stop_event.set()
cls._ram_monitor.join()
return cls._max_usage.value


def run_experiment(
Expand All @@ -101,13 +127,10 @@ def run_experiment(

manager = multiprocessing.Manager()
results = manager.dict()
stop_event = multiprocessing.Event()

ram_monitor = multiprocessing.Process(
target=monitor_ram_usage, args=(stop_event, results)
)
ram_monitor.start()
RAM_Monitor.start()

ts1 = time.time()
processes = []
for _ in range(num_processes):
if use_shm:
Expand All @@ -120,16 +143,16 @@ def run_experiment(
target=access_directly, args=(data_dict, iterations, results)
)
processes.append(p)

for p in processes:
p.start()

for p in processes:
p.join()

stop_event.set()
ram_monitor.join()
max_ram_usage = RAM_Monitor.stop()

avg_frequency = results["frequency"] / num_processes
max_ram_usage = results["max_ram_usage"]

return {
"num_processes": num_processes,
Expand All @@ -147,7 +170,7 @@ def evaluation():
parser.add_argument(
"--iterations",
type=int,
default=1000,
default=5000,
help="Number of iterations for each process",
)
args = parser.parse_args()
Expand All @@ -167,7 +190,11 @@ def evaluation():
table.add_column("Max RAM Usage (Direct Access)", justify="center")

logging.info("Loading data from HDF5 file to the RAM (direct, no shared memory)")
RAM_Monitor.start()
data_dict = hdf5_to_dict(hdf5_path)
logging.info(
f"Max observed RAM usage during load: {bytes_to_human(RAM_Monitor.stop())}"
)

logging.info("Running experiments with direct access to the RAM")
direct_results = []
Expand All @@ -187,7 +214,11 @@ def evaluation():
data_dict = None

logging.info("Loading data from HDF5 file to the shared memory")
RAM_Monitor.start()
with unlinked_hdf5_to_shm(hdf5_path, project_name, overwrite=True, progress=True):
logging.info(
f"Max observed RAM usage during load: {bytes_to_human(RAM_Monitor.stop())}"
)
logging.info("Running experiments with shared memory")
shm_results = []
for num_processes in [1, 5, 10, 15]:
Expand All @@ -207,8 +238,8 @@ def evaluation():
str(direct_result["num_processes"]),
f"{shm_result['avg_frequency']:.2f}",
f"{direct_result['avg_frequency']:.2f}",
psutil._common.bytes2human(shm_result["max_ram_usage"]),
psutil._common.bytes2human(direct_result["max_ram_usage"]),
bytes_to_human(shm_result["max_ram_usage"]),
bytes_to_human(direct_result["max_ram_usage"]),
)

console.print(table)
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "cute-shm"
version = "1.01"
version = "1.02"
description = "managing np arrays stored in the shared memory"
authors = [
"Vincent Berenz <vberenz@tuebingen.mpg.de>",
Expand Down

0 comments on commit 7d59bfd

Please sign in to comment.