Skip to content

Commit

Permalink
Hotfix RAM usage (#70)
Browse files Browse the repository at this point in the history
* try to reduce RAM usage

* version bump

* fix
  • Loading branch information
CodyCBakerPhD authored Sep 14, 2024
1 parent b59ae1d commit 0f8f0b6
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 11 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ packages = ["src/dandi_s3_log_parser"]

[project]
name = "dandi_s3_log_parser"
version="0.4.1"
version="0.4.2"
authors = [
{ name="Cody Baker", email="cody.c.baker.phd@gmail.com" },
]
Expand Down
33 changes: 23 additions & 10 deletions src/dandi_s3_log_parser/_map_binned_s3_logs_to_dandisets.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@ def _map_binned_logs_to_dandiset(
dandiset_id = dandiset.identifier
dandiset_log_folder_path = dandiset_logs_folder_path / dandiset_id

all_reduced_s3_logs_per_blob_id = dict()
all_reduced_s3_logs_per_blob_id_aggregated_by_day = dict()
all_reduced_s3_logs_per_blob_id_aggregated_by_region = dict()
blob_id_to_asset_path = dict()
total_bytes_across_versions_by_blob_id = dict()
dandiset_versions = list(dandiset.get_versions())
Expand All @@ -130,7 +131,8 @@ def _map_binned_logs_to_dandiset(

dandiset_version = client.get_dandiset(dandiset_id=dandiset_id, version_id=version_id)

reduced_s3_logs_per_day = []
all_reduced_s3_logs_aggregated_by_day_for_version = []
all_reduced_s3_logs_aggregated_by_region_for_version = []
total_bytes_per_asset_path = dict()
dandiset_version_assets = list(dandiset_version.get_assets())
for asset in tqdm.tqdm(
Expand Down Expand Up @@ -191,34 +193,45 @@ def _map_binned_logs_to_dandiset(
)

reordered_reduced_s3_log["date"] = [entry[:10] for entry in reordered_reduced_s3_log["timestamp"]]
reduced_s3_logs_per_day.append(reordered_reduced_s3_log)
all_reduced_s3_logs_per_blob_id[blob_id] = reordered_reduced_s3_log

# Aggregate per asset to save memory (most impactful for 000108)
aggregated_activity_by_day = _aggregate_activity_by_day(reduced_s3_logs_per_day=[reordered_reduced_s3_log])
all_reduced_s3_logs_aggregated_by_day_for_version.append(aggregated_activity_by_day)
all_reduced_s3_logs_per_blob_id_aggregated_by_day[blob_id] = aggregated_activity_by_day

aggregated_activity_by_region = _aggregate_activity_by_region(
reduced_s3_logs_per_day=[reordered_reduced_s3_log]
)
all_reduced_s3_logs_aggregated_by_region_for_version.append(aggregated_activity_by_region)
all_reduced_s3_logs_per_blob_id_aggregated_by_region[blob_id] = aggregated_activity_by_region

total_bytes = sum(reduced_s3_log_binned_by_blob_id["bytes_sent"])
total_bytes_per_asset_path[asset.path] = total_bytes

blob_id_to_asset_path[blob_id] = asset.path
total_bytes_across_versions_by_blob_id[blob_id] = total_bytes

if len(reduced_s3_logs_per_day) == 0:
if len(all_reduced_s3_logs_aggregated_by_day_for_version) == 0:
continue # No activity found (possible dandiset version was never accessed); skip to next version

version_summary_by_day_file_path = dandiset_version_log_folder_path / "version_summary_by_day.tsv"
_write_aggregated_activity_by_day(
reduced_s3_logs_per_day=reduced_s3_logs_per_day, file_path=version_summary_by_day_file_path
reduced_s3_logs_per_day=all_reduced_s3_logs_aggregated_by_day_for_version,
file_path=version_summary_by_day_file_path,
)

version_summary_by_region_file_path = dandiset_version_log_folder_path / "version_summary_by_region.tsv"
_write_aggregated_activity_by_region(
reduced_s3_logs_per_day=reduced_s3_logs_per_day, file_path=version_summary_by_region_file_path
reduced_s3_logs_per_day=all_reduced_s3_logs_aggregated_by_region_for_version,
file_path=version_summary_by_region_file_path,
)

version_summary_by_asset_file_path = dandiset_version_log_folder_path / "version_summary_by_asset.tsv"
_write_aggregated_activity_by_asset(
total_bytes_per_asset_path=total_bytes_per_asset_path, file_path=version_summary_by_asset_file_path
)

if len(all_reduced_s3_logs_per_blob_id) == 0:
if len(all_reduced_s3_logs_per_blob_id_aggregated_by_day) == 0:
return None # No activity found (possible dandiset was never accessed); skip to next version

# Single path across versions could have been replaced at various points by a new blob
Expand All @@ -228,13 +241,13 @@ def _map_binned_logs_to_dandiset(

dandiset_summary_by_day_file_path = dandiset_log_folder_path / "dandiset_summary_by_day.tsv"
_write_aggregated_activity_by_day(
reduced_s3_logs_per_day=all_reduced_s3_logs_per_blob_id.values(),
reduced_s3_logs_per_day=all_reduced_s3_logs_per_blob_id_aggregated_by_day.values(),
file_path=dandiset_summary_by_day_file_path,
)

dandiset_summary_by_region_file_path = dandiset_log_folder_path / "dandiset_summary_by_region.tsv"
_write_aggregated_activity_by_region(
reduced_s3_logs_per_day=all_reduced_s3_logs_per_blob_id.values(),
reduced_s3_logs_per_day=all_reduced_s3_logs_per_blob_id_aggregated_by_region.values(),
file_path=dandiset_summary_by_region_file_path,
)

Expand Down

0 comments on commit 0f8f0b6

Please sign in to comment.