Skip to content

Commit

Permalink
Redo s3 stuff (#53)
Browse files Browse the repository at this point in the history
  • Loading branch information
dakota002 authored Nov 27, 2024
1 parent 62184de commit 934187a
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 3 deletions.
5 changes: 3 additions & 2 deletions gcn_monitor/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ def host_port(host_port_str):
show_default=True,
help="Log level",
)
def main(prometheus, loglevel):
@click.option("--bucket-name", help="Bucket Name")
def main(prometheus, loglevel, bucket_name):
"""Monitor connectivity of a Kafka client.
Specify the Kafka client configuration in environment variables using the
Expand All @@ -60,4 +61,4 @@ def main(prometheus, loglevel):
)
log.info("Prometheus listening on %s", prometheus.netloc)

kafka.run()
kafka.run(bucket_name)
28 changes: 27 additions & 1 deletion gcn_monitor/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import json
import logging
from base64 import b64encode

import boto3
import gcn_kafka
Expand Down Expand Up @@ -63,7 +64,7 @@ def parse_filenames(message):
return file_name, message_key_file_name, headers_file_name


def run():
def run(bucket_name):
log.info("Creating consumer")
config = gcn_kafka.config_from_env()
config["stats_cb"] = stats_cb
Expand All @@ -78,6 +79,31 @@ def run():
while True:
for message in consumer.consume(timeout=1):
topic = message.topic()
file_name, message_key_file_name, headers_file_name = parse_filenames(
message
)
s3_client.put_object(
Bucket=bucket_name,
Key=file_name,
Body=message.value(),
)

if message_key_file_name is not None:
s3_client.put_object(
Bucket=bucket_name,
Key=message_key_file_name,
Body=message.key(),
)

if headers_file_name is not None:
s3_client.put_object(
Bucket=bucket_name,
Key=headers_file_name,
Body={
key: b64encode(value).decode()
for key, value in message.headers()
},
)
if error := message.error():
log.error("topic %s: got error %s", topic, error)
else:
Expand Down

0 comments on commit 934187a

Please sign in to comment.