Skip to content

Commit

Permalink
Fix binning on empty reduced logs (#55)
Browse files Browse the repository at this point in the history
* fix empty reduced log

* enhance README with time estimates; properly time the writing of start/stop for binning

* enhance README with time estimates; properly time the writing of start/stop for binning

---------

Co-authored-by: CodyCBakerPhD <codycbakerphd@gmail.com>
  • Loading branch information
CodyCBakerPhD and CodyCBakerPhD authored Aug 22, 2024
1 parent 95d437e commit 46a6b02
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 6 deletions.
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ reduce_all_dandi_raw_s3_logs \
--excluded_ips < Drogons IP >
```

In summer of 2024, this process took less than 10 hours to process all 6 TB of raw log data (using 3 workers at 3 GB buffer size).

### Binning

To bin:
Expand Down Expand Up @@ -127,6 +129,8 @@ bin_all_reduced_s3_logs_by_object_key \
--file_limit 20
```

In summer of 2024, this process took less than ?? hours to bin all ?? GB of reduced log data.

### Mapping

The next step, that should also be updated regularly (daily-weekly), is to iterate through all current versions of all Dandisets, mapping the reduced logs to their assets.
Expand All @@ -145,6 +149,9 @@ map_binned_s3_logs_to_dandisets \
--dandiset_logs_folder_path /mnt/backup/dandi/mapped-dandiset-logs
```

In summer of 2024, this process took less than ?? hours to run and in the current design should be run fresh on a regular basis to keep the logs up to date.



## Submit line decoding errors

Expand Down
20 changes: 14 additions & 6 deletions src/dandi_s3_log_parser/_bin_all_reduced_s3_logs_by_object_key.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ def bin_all_reduced_s3_logs_by_object_key(
completed_tracking_file_path.touch()
else:
with open(file=started_tracking_file_path, mode="r") as io:
started = set(io.readlines())
started = set(pathlib.Path(path) for path in io.readlines())
with open(file=completed_tracking_file_path, mode="r") as io:
completed = set(io.readlines())
completed = set(pathlib.Path(path) for path in io.readlines())

if started != completed:
raise ValueError(
Expand All @@ -63,8 +63,13 @@ def bin_all_reduced_s3_logs_by_object_key(
mininterval=3.0,
smoothing=0,
):
with open(file=started_tracking_file_path, mode="a") as started_tracking_file:
started_tracking_file.write(f"{reduced_s3_log_file}: 1\n")
if reduced_s3_log_file.stat().st_size == 0:
with open(file=started_tracking_file_path, mode="a") as io:
io.write(f"{reduced_s3_log_file}\n")
with open(file=completed_tracking_file_path, mode="a") as io:
io.write(f"{reduced_s3_log_file}\n")

continue

reduced_data_frame = pandas.read_csv(filepath_or_buffer=reduced_s3_log_file, sep="\t")
binned_data_frame = reduced_data_frame.groupby("object_key").agg(
Expand All @@ -82,6 +87,9 @@ def bin_all_reduced_s3_logs_by_object_key(
}
del binned_data_frame

with open(file=started_tracking_file_path, mode="a") as io:
io.write(f"{reduced_s3_log_file}\n")

for object_key, data in tqdm.tqdm(
iterable=object_keys_to_data.items(),
total=len(object_keys_to_data),
Expand All @@ -102,5 +110,5 @@ def bin_all_reduced_s3_logs_by_object_key(
header = False if binned_s3_log_file_path.exists() else True
data_frame.to_csv(path_or_buf=binned_s3_log_file_path, mode="a", sep="\t", header=header, index=False)

with open(file=completed_tracking_file_path, mode="a") as started_tracking_file:
started_tracking_file.write(f"{reduced_s3_log_file}\n")
with open(file=completed_tracking_file_path, mode="a") as io:
io.write(f"{reduced_s3_log_file}\n")
Empty file.

0 comments on commit 46a6b02

Please sign in to comment.