Skip to content

Commit

Permalink
updates the consumer loop to put messages in the s3 bucket (#30)
Browse files Browse the repository at this point in the history
  • Loading branch information
dakota002 authored Nov 22, 2024
1 parent a11a191 commit 12616bd
Show file tree
Hide file tree
Showing 10 changed files with 653 additions and 169 deletions.
8 changes: 4 additions & 4 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ jobs:
- name: Install package
run: poetry install

# - name: Run unit tests
# run: poetry run pytest . --cov --cov-report=xml
- name: Run unit tests
run: poetry run pytest . --cov --cov-report=xml

# - name: Upload coverage to codecov
# uses: codecov/codecov-action@v3
- name: Upload coverage to codecov
uses: codecov/codecov-action@v3
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
.coverage
__pycache__
7 changes: 6 additions & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
{
"editor.defaultFormatter": "ms-python.black-formatter"
"editor.defaultFormatter": "ms-python.black-formatter",
"python.testing.pytestArgs": [
"."
],
"python.testing.unittestEnabled": false,
"python.testing.pytestEnabled": true
}
19 changes: 19 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,22 @@ This package uses [Poetry](https://python-poetry.org) for packaging and Python v
5. Run the following command to launch a shell that is preconfigured with the project's virtual environment:

poetry shell


## S3 Output

We are not currently using the S3 Sink connector. In its place, in `kafka.py` we are uploading files directly to an S3 bucket. The raw message value is uploaded as-is (a byte string). If a key is present on the message, it is also stored as a byte string. If headers are included in the message, their key-value pairs are parsed and stored as a JSON object, where the value is a base64 encoded string.

For example:

Headers provided in a message would be read in a python consumer from the `message.headers()` function as:
```
[('header1', b'value'), ('header2', b'value2')]
```
and subsequently stored as:

```
{'header1': 'dmFsdWU=', 'header2': 'dmFsdWUy'}
```

This varies from the connector behavior slightly in that the connector's json converter classes base the output json structure on some provided schema. This should not have any impact on loss of data between the two methods.
5 changes: 3 additions & 2 deletions gcn_monitor/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ def host_port(host_port_str):
show_default=True,
help="Log level",
)
def main(prometheus, loglevel):
@click.option("--bucket-name", help="Bucket Name")
def main(prometheus, loglevel, bucket_name):
"""Monitor connectivity of a Kafka client.
Specify the Kafka client configuration in environment variables using the
Expand All @@ -60,4 +61,4 @@ def main(prometheus, loglevel):
)
log.info("Prometheus listening on %s", prometheus.netloc)

kafka.run()
kafka.run(bucket_name)
69 changes: 68 additions & 1 deletion gcn_monitor/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,15 @@

import json
import logging
from base64 import b64encode

import boto3
import gcn_kafka

from . import metrics

log = logging.getLogger(__name__)
s3_client = boto3.client("s3")


def stats_cb(data):
Expand All @@ -23,7 +26,45 @@ def stats_cb(data):
metrics.broker_state.labels(broker["name"]).state(broker["state"])


def run():
def parse_filenames(message):
"""Parses a Kafka message into a file names for s3 upload.
Parameters
----------
message : Message
Any Kafka message.
Returns
-------
fileName: string
The Key for S3's put_object method, formatted as `topics/<topic>/<encodedPartition>/<topic>+<kafkaPartition>+<startOffset>.bin`
This format for all returned names is defined at https://docs.confluent.io/kafka-connectors/s3-sink/current/overview.html#s3-object-names
messageKeyFileName: string
The Key for S3's put_object method, formatted as `topics/<topic>/<encodedPartition>/<topic>+<kafkaPartition>+<startOffset>.<format>.key.bin`
headersFileName: string
The Key for S3's put_object method, formatted as `topics/<topic>/<encodedPartition>/<topic>+<kafkaPartition>+<startOffset>.<format>.headers.json`
Example
-------
>>> for message in consumer.consume(timeout=1):
file_name, message_key_file_name, headers_file_name = parse_filenames(
message
)
"""
# Kafka limits topic characters to ASCII alphanumerics, '.', '_' and '-'
topic = message.topic()
offset = message.offset()
partition = message.partition()
file_name = f"topics/{topic}/partition={partition}/{topic}+{partition}+{offset}.bin"
message_key_file_name = f"{file_name}.key.bin" if message.key() else None
headers_file_name = f"{file_name}.headers.json" if message.headers() else None

return file_name, message_key_file_name, headers_file_name


def run(bucketName):
log.info("Creating consumer")
config = gcn_kafka.config_from_env()
config["stats_cb"] = stats_cb
Expand All @@ -38,6 +79,32 @@ def run():
while True:
for message in consumer.consume(timeout=1):
topic = message.topic()
file_name, message_key_file_name, headers_file_name = parse_filenames(
message
)
s3_client.put_object(
Bucket=bucketName,
Key=file_name,
Body=message.value(),
)

if message_key_file_name is not None:
s3_client.put_object(
Bucket=bucketName,
Key=message_key_file_name,
Body=message.key(),
)

if headers_file_name is not None:
s3_client.put_object(
Bucket=bucketName,
Key=headers_file_name,
Body={
key: b64encode(value).decode()
for key, value in message.headers()
},
)

if error := message.error():
log.error("topic %s: got error %s", topic, error)
else:
Expand Down
Loading

0 comments on commit 12616bd

Please sign in to comment.