Skip to content
This repository has been archived by the owner on Nov 15, 2024. It is now read-only.

Feature MD5 verification #8

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ optional arguments:
--aws-region AWS_REGION
The AWS region (should match SQS queue region)
--debug We do the debug?
--checksum If set, the MD5 sum is verified
--tmpdir TMPDIR The temp directory where the work will be done
```

Expand Down
68 changes: 46 additions & 22 deletions fdr2humio.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import sys
import tempfile
import urllib.parse
from hashlib import md5

import boto3
import botocore
Expand Down Expand Up @@ -65,8 +66,8 @@ def is_valid_hostname(hostname):
return f"{parsed_uri.scheme}://{parsed_uri.netloc}/"
else:
msg = (
"%s is not a valid Humio hostname. Must start with http:// or https://"
% hostname
"%s is not a valid Humio hostname. Must start with http:// or https://"
% hostname
)
raise argparse.ArgumentTypeError(msg)

Expand Down Expand Up @@ -149,6 +150,13 @@ def setup_args():
# Are we going to do the debug?
parser.add_argument("--debug", action="store_true", help="We do the debug?")

# MD5 validation of downloaded files (will still verify file size)
parser.add_argument(
"--checksum",
action="store_true",
help="If set, the MD5 sum is verified",
)

# Where can we do our workings
parser.add_argument(
"--tmpdir",
Expand Down Expand Up @@ -195,8 +203,8 @@ def check_valid(args, payload, s3):
s3.head_object(Bucket=args["bucket"], Key=success_path)
except botocore.exceptions.ClientError as e:
if (
str(e)
== "An error occurred (404) when calling the HeadObject operation: Not Found"
str(e)
== "An error occurred (404) when calling the HeadObject operation: Not Found"
):
return False
logging.warning(
Expand All @@ -221,31 +229,47 @@ def post_files_to_humio(args, payload, s3, http):
# Download the source file from S3
s3.download_file(args["bucket"], asset["path"], local_file_path)

# TODO: Check the checksum

# TODO: check the size!
processed["files"] += 1
processed["bytes"] += os.path.getsize(local_file_path)
# TODO: check if space available on disk!

# POST to Humio HEC Raw w/ compression
with open(local_file_path, "rb") as f:
r = http.request(
"POST",
humio_url(args),
body=f.read(),
headers=humio_headers(args),
)

# TODO: Better error handling needed here as we may partially process a message
if r.status != 200:
data = f.read()

# TODO: Better error handling when MD5 have mismatch
if args["checksum"]:
local_file_md5 = md5(data).hexdigest()
if local_file_md5 != asset["checksum"]:
logging.debug(
f"MD5 checksum ({local_file_md5}) of file \"{asset['path']}\" "
f'matches with file on-disk "{local_file_path}".'
)
else:
logging.error(
f"MD5 mismatch {asset['checksum']} ({asset['path']}) doesn't match local file "
f"MD5 {local_file_md5} ({local_file_path})."
)
return False

r = http.request(
"POST",
humio_url(args),
body=data,
headers=humio_headers(args),
)

processed["files"] += 1
processed["bytes"] += os.path.getsize(local_file_path)

# TODO: Better error handling needed here as we may partially process a message
if r.status != 200:
return False

# Everything sent as expected
return processed


if __name__ == "__main__":
# We only need to do the argparse if we're running interactivley
# We only need to do the argparse if we're running interactively
args = setup_args()

# Always pretty print the args when starting
Expand Down Expand Up @@ -289,7 +313,7 @@ def post_files_to_humio(args, payload, s3, http):
flag = GracefulExit()
while True:
for message in get_new_events(
args, sqs, maxEvents=5, reserveSeconds=3600, maxWaitSeconds=20
args, sqs, maxEvents=5, reserveSeconds=3600, maxWaitSeconds=20
):
payload = json.loads(message.body)

Expand All @@ -306,8 +330,8 @@ def post_files_to_humio(args, payload, s3, http):
f"bytes of {payload['totalSize']}) from {timestamp} "
)
if (
stats["files"] == payload["fileCount"]
and stats["bytes"] == payload["totalSize"]
stats["files"] == payload["fileCount"]
and stats["bytes"] == payload["totalSize"]
):
logging.info(msg)
else:
Expand Down