Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make process resumable and enhance IP classifier #54

Merged
merged 56 commits into from
Aug 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
ed19f6e
refactor for simplicity and resumability
Aug 20, 2024
f5076e7
refactor for simplicity and resumability
Aug 20, 2024
6a4853b
refactor for simplicity and resumability
Aug 20, 2024
fb08027
refactor for simplicity and resumability
Aug 20, 2024
caf030c
fixes; adapting tests
Aug 20, 2024
b7a83f5
fix
Aug 20, 2024
95e4bda
fix
Aug 20, 2024
fb48dbb
fix
Aug 20, 2024
93bfd3f
fix
Aug 20, 2024
661e1a7
fix
Aug 20, 2024
dc8c6d9
fix
Aug 20, 2024
38ec7e6
fix
Aug 20, 2024
2e863b6
fix
Aug 20, 2024
60b6ae5
adjusting tests
Aug 21, 2024
46f3375
debug reduction tests
Aug 21, 2024
3bfba48
get binning mostly there
Aug 21, 2024
442b5e3
fix
Aug 21, 2024
1f095e6
debug binning
Aug 21, 2024
ae3b352
debug binning
Aug 21, 2024
e6432be
update mapping; debug year span
Aug 21, 2024
d3b4c80
debug skip protocol
Aug 21, 2024
8ad2c88
improve default folder creation
Aug 21, 2024
cb5e7ff
improve default folder creation
Aug 21, 2024
08c1786
improve default folder creation
Aug 21, 2024
e6c68fd
improve default folder creation
Aug 21, 2024
4518145
add CLI for binning
Aug 21, 2024
4fcbe88
update argument name; enhance readme
Aug 21, 2024
344b09b
add binning tracking
Aug 21, 2024
d5ed6da
reformat
Aug 21, 2024
bb16fd4
reformat
Aug 21, 2024
ae5b057
reformat
Aug 21, 2024
0000b2a
adjust name
Aug 21, 2024
4bc5924
add progress bar to binning
Aug 21, 2024
c442696
add a file buffer to binning
Aug 21, 2024
d2830da
add CIDR regions
Aug 21, 2024
297e4ff
make argument simpler
Aug 21, 2024
8dbd722
update readme
Aug 21, 2024
b6f5f97
add extra progress bar for binning
Aug 21, 2024
5925438
skip ipv6 gcp
Aug 21, 2024
5af93c5
add services to cache
Aug 21, 2024
a43572a
add extra test case and debug
Aug 21, 2024
89c0bd5
add helper function for in cidr
Aug 21, 2024
b3a9bcc
add helper function for in cidr
Aug 21, 2024
041abb9
add extra cache
Aug 21, 2024
40cb4e9
add extra cache
Aug 21, 2024
89022a4
add extra cache
Aug 21, 2024
1604016
add extra cache
Aug 21, 2024
74df297
try this again
Aug 21, 2024
85ff066
fix skip condition in reducer
Aug 21, 2024
1694ac4
remove other helper
Aug 21, 2024
e170cd2
remove other helper
Aug 21, 2024
121a974
debugging non-skip
Aug 21, 2024
9730cab
debugging non-skip
Aug 21, 2024
c6f712d
debugging non-skip
Aug 21, 2024
9ed0a5b
debugging non-skip
Aug 21, 2024
75579e9
Merge branch 'main' into make_resumable
CodyCBakerPhD Aug 21, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 72 additions & 33 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,7 @@ A few summary facts as of 2024:
- A single line of a raw S3 log file can be between 400-1000+ bytes.
- Some of the busiest daily logs on the archive can have around 5,014,386 lines.
- There are more than 6 TB of log files collected in total.
- This parser reduces that total to around 20 GB of essential information.

The reduced information is then additionally mapped to currently available assets in persistent published Dandiset versions and current drafts, which only comprise around 100 MB of the original data.

These small Dandiset-specific summaries are soon to be shared publicly.
- This parser reduces that total to around 100 MB of essential information.



Expand All @@ -42,71 +38,114 @@ pip install dandi_s3_log_parser



## Workflow

The process is comprised of three modular steps.

### 1. **Reduction**

Filter out:

- Non-success status codes.
- Excluded IP addresses.
- Operation types other than the one specified (`REST.GET.OBJECT` by default).

Then, only limit data extraction to a handful of specified fields from each full line of the raw logs; by default, `object_key`, `timestamp`, `ip_address`, and `bytes_sent`.

In summer of 2024, this reduced 6 TB of raw logs to around 200 GB.

The process is designed to be easily parallelized and interruptible, meaning that you can feel free to kill any processes while they are running and restart later without losing most progress.

### 2. **Binning**

To make the mapping to Dandisets more efficient, the reduced logs are binned by their object keys (asset blob IDs) for fast lookup.

This step reduces the total file sizes from step (1) even further by reducing repeated object keys, though it does create a large number of small files.

In summer of 2024, this reduced 200 GB of reduced logs to around 20 GB.

### 3. **Mapping**

The final step, which should be run periodically to keep the desired usage logs per Dandiset up to date, is to scan through all currently known Dandisets and their versions, mapping the asset blob IDs to their filenames and generating the most recently parsed usage logs that can be shared publicly.

In summer of 2024, this reduced 20 GB of binned logs to around 100 MB of Dandiset logs.



## Usage

### Reduce entire history
### Reduction

To iteratively parse all historical logs all at once (parallelization strongly recommended):
To reduce:

```bash
reduce_all_dandi_raw_s3_logs \
--base_raw_s3_logs_folder_path < base log folder > \
--reduced_s3_logs_folder_path < output folder > \
--maximum_number_of_workers < number of CPUs to use > \
--raw_s3_logs_folder_path < base raw S3 logs folder > \
--reduced_s3_logs_folder_path < reduced S3 logs folder path > \
--maximum_number_of_workers < number of workers to use > \
--maximum_buffer_size_in_mb < approximate amount of RAM to use > \
--excluded_ips < comma-separated list of known IPs to exclude >
--excluded_ips < comma-separated list of known IPs to exclude >
```

For example, on Drogon:

```bash
reduce_all_dandi_raw_s3_logs \
--base_raw_s3_logs_folder_path /mnt/backup/dandi/dandiarchive-logs \
--reduced_s3_logs_folder_path /mnt/backup/dandi/dandiarchive-logs-cody/parsed_8_15_2024/REST_GET_OBJECT_per_asset_id \
--maximum_number_of_workers 6 \
--maximum_buffer_size_in_mb 5000 \
--raw_s3_logs_folder_path /mnt/backup/dandi/dandiarchive-logs \
--reduced_s3_logs_folder_path /mnt/backup/dandi/reduced-dandiarchive-logs \
--maximum_number_of_workers 3 \
--maximum_buffer_size_in_mb 3000 \
--excluded_ips < Drogons IP >
```

### Reduce a single log file
### Binning

To parse only a single log file at a time, such as in a CRON job:
To bin:

```bash
reduce_dandi_raw_s3_log \
--raw_s3_log_file_path < s3 log file path > \
--reduced_s3_logs_folder_path < output folder > \
--excluded_ips < comma-separated list of known IPs to exclude >
bin_all_reduced_s3_logs_by_object_key \
--reduced_s3_logs_folder_path < reduced S3 logs folder path > \
--binned_s3_logs_folder_path < binned S3 logs folder path >
```

For example, on Drogon:

```bash
reduce_dandi_raw_s3_log \
--raw_s3_log_file_path /mnt/backup/dandi/dandiarchive-logs/2024/08/17.log \
--reduced_s3_logs_folder_path /mnt/backup/dandi/dandiarchive-logs-cody/parsed_8_15_2024/REST_GET_OBJECT_per_asset_id \
--excluded_ips < Drogons IP >
bin_all_reduced_s3_logs_by_object_key \
--reduced_s3_logs_folder_path /mnt/backup/dandi/reduced-dandiarchive-logs \
--binned_s3_logs_folder_path /mnt/backup/dandi/binned-dandiarchive-logs
```

This process is not as friendly to random interruption as the reduction step is. If corruption is detected, the target binning folder will have to be cleaned before re-attempting.

The `--file_processing_limit < integer >` flag can be used to limit the number of files processed in a single run, which can be useful for breaking the process up into resumable pieces, such as:

```bash
bin_all_reduced_s3_logs_by_object_key \
--reduced_s3_logs_folder_path /mnt/backup/dandi/reduced-dandiarchive-logs \
--binned_s3_logs_folder_path /mnt/backup/dandi/binned-dandiarchive-logs \
--file_limit 20
```

### Map to Dandisets
### Mapping

The next step, that should also be updated regularly (daily-weekly), is to iterate through all current versions of all Dandisets, mapping the reduced logs to their assets.

```bash
map_reduced_logs_to_dandisets \
--reduced_s3_logs_folder_path < reduced s3 logs folder path > \
--dandiset_logs_folder_path < mapped logs folder >
map_binned_s3_logs_to_dandisets \
--binned_s3_logs_folder_path < binned S3 logs folder path > \
--dandiset_logs_folder_path < mapped Dandiset logs folder >
```

For example, on Drogon:

```bash
map_reduced_logs_to_dandisets \
--reduced_s3_logs_folder_path /mnt/backup/dandi/dandiarchive-logs-cody/parsed_8_15_2024/REST_GET_OBJECT_per_asset_id \
--dandiset_logs_folder_path /mnt/backup/dandi/dandiarchive-logs-cody/mapped_logs_8_15_2024
map_binned_s3_logs_to_dandisets \
--binned_s3_logs_folder_path /mnt/backup/dandi/binned-dandiarchive-logs \
--dandiset_logs_folder_path /mnt/backup/dandi/mapped-dandiset-logs
```


## Submit line decoding errors

Please email line decoding errors collected from your local config file to the core maintainer before raising issues or submitting PRs contributing them as examples, to more easily correct any aspects that might require anonymization.
Please email line decoding errors collected from your local config file (located in `~/.dandi_s3_log_parser/errors`) to the core maintainer before raising issues or submitting PRs contributing them as examples, to more easily correct any aspects that might require anonymization.
7 changes: 3 additions & 4 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ packages = ["src/dandi_s3_log_parser"]

[project]
name = "dandi_s3_log_parser"
version="0.3.0"
version="0.4.0"
authors = [
{ name="Cody Baker", email="cody.c.baker.phd@gmail.com" },
]
Expand Down Expand Up @@ -38,9 +38,8 @@ classifiers = [

[project.scripts]
reduce_all_dandi_raw_s3_logs = "dandi_s3_log_parser._command_line_interface:_reduce_all_dandi_raw_s3_logs_cli"
reduce_dandi_raw_s3_log = "dandi_s3_log_parser._command_line_interface:_reduce_dandi_raw_s3_log_cli"
map_reduced_logs_to_dandisets = "dandi_s3_log_parser._command_line_interface:_map_reduced_logs_to_dandisets_cli"
find_random_example_line = "dandi_s3_log_parser._command_line_interface:_find_random_example_line_cli"
bin_all_reduced_s3_logs_by_object_key = "dandi_s3_log_parser._command_line_interface:_bin_all_reduced_s3_logs_by_object_key_cli"
map_binned_s3_logs_to_dandisets = "dandi_s3_log_parser._command_line_interface:_map_binned_s3_logs_to_dandisets_cli"



Expand Down
9 changes: 5 additions & 4 deletions src/dandi_s3_log_parser/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,17 @@
from ._config import DANDI_S3_LOG_PARSER_BASE_FOLDER_PATH
from ._s3_log_file_reducer import reduce_raw_s3_log
from ._buffered_text_reader import BufferedTextReader
from ._dandi_s3_log_file_reducer import reduce_dandi_raw_s3_log, reduce_all_dandi_raw_s3_logs
from ._dandi_s3_log_file_reducer import reduce_all_dandi_raw_s3_logs
from ._ip_utils import get_region_from_ip_address
from ._dandiset_mapper import map_reduced_logs_to_dandisets
from ._map_binned_s3_logs_to_dandisets import map_binned_s3_logs_to_dandisets
from ._bin_all_reduced_s3_logs_by_object_key import bin_all_reduced_s3_logs_by_object_key

__all__ = [
"DANDI_S3_LOG_PARSER_BASE_FOLDER_PATH",
"reduce_raw_s3_log",
"BufferedTextReader",
"reduce_dandi_raw_s3_log",
"reduce_all_dandi_raw_s3_logs",
"get_region_from_ip_address",
"map_reduced_logs_to_dandisets",
"map_binned_s3_logs_to_dandisets",
"bin_all_reduced_s3_logs_by_object_key",
]
106 changes: 106 additions & 0 deletions src/dandi_s3_log_parser/_bin_all_reduced_s3_logs_by_object_key.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
"""Bin reduced logs by object key."""

import pathlib

import pandas
import tqdm
from pydantic import DirectoryPath, validate_call


@validate_call
def bin_all_reduced_s3_logs_by_object_key(
*,
reduced_s3_logs_folder_path: DirectoryPath,
binned_s3_logs_folder_path: DirectoryPath,
file_limit: int | None = None,
) -> None:
"""
Bin reduced S3 logs by object keys.

Parameters
----------
reduced_s3_logs_folder_path : str
The path to the folder containing the reduced S3 log files.
binned_s3_logs_folder_path : str
The path to write each binned S3 log file to.
There will be one file per object key.
file_limit : int, optional
The maximum number of files to process per call.
"""
started_tracking_file_path = binned_s3_logs_folder_path / "binned_log_file_paths_started.txt"
completed_tracking_file_path = binned_s3_logs_folder_path / "binned_log_file_paths_completed.txt"

if started_tracking_file_path.exists() != completed_tracking_file_path.exists():
raise FileNotFoundError(
"One of the tracking files is missing, indicating corruption in the binning process. "
"Please clean the binning directory and re-run this function."
)

completed = None
if not started_tracking_file_path.exists():
started_tracking_file_path.touch()
completed_tracking_file_path.touch()
else:
with open(file=started_tracking_file_path, mode="r") as io:
started = set(io.readlines())
with open(file=completed_tracking_file_path, mode="r") as io:
completed = set(io.readlines())

if started != completed:
raise ValueError(
"The tracking files do not agree on the state of the binning process. "
"Please clean the binning directory and re-run this function."
)
completed = completed or set()

reduced_s3_log_files = list(set(reduced_s3_logs_folder_path.rglob("*.tsv")) - completed)[:file_limit]
for reduced_s3_log_file in tqdm.tqdm(
iterable=reduced_s3_log_files,
total=len(reduced_s3_log_files),
desc="Binning reduced logs...",
position=0,
leave=True,
mininterval=3.0,
smoothing=0,
):
with open(file=started_tracking_file_path, mode="a") as started_tracking_file:
started_tracking_file.write(f"{reduced_s3_log_file}: 1\n")

reduced_data_frame = pandas.read_csv(filepath_or_buffer=reduced_s3_log_file, sep="\t")
binned_data_frame = reduced_data_frame.groupby("object_key").agg(
{
"timestamp": list,
"bytes_sent": list,
"ip_address": list,
}
)
del reduced_data_frame

object_keys_to_data = {
row.name: {"timestamp": row["timestamp"], "bytes_sent": row["bytes_sent"], "ip_address": row["ip_address"]}
for _, row in binned_data_frame.iterrows()
}
del binned_data_frame

for object_key, data in tqdm.tqdm(
iterable=object_keys_to_data.items(),
total=len(object_keys_to_data),
desc="Writing binned logs...",
position=1,
leave=False,
mininterval=3.0,
smoothing=0,
):
object_key_as_path = pathlib.Path(object_key)
binned_s3_log_file_path = (
binned_s3_logs_folder_path / object_key_as_path.parent / f"{object_key_as_path.stem}.tsv"
)
binned_s3_log_file_path.parent.mkdir(exist_ok=True, parents=True)

data_frame = pandas.DataFrame(data=data)

header = False if binned_s3_log_file_path.exists() else True
data_frame.to_csv(path_or_buf=binned_s3_log_file_path, mode="a", sep="\t", header=header, index=False)

with open(file=completed_tracking_file_path, mode="a") as started_tracking_file:
started_tracking_file.write(f"{reduced_s3_log_file}\n")
Loading