Skip to content

Commit

Permalink
kafka prometheus
Browse files Browse the repository at this point in the history
  • Loading branch information
roma-frolov committed Sep 16, 2024
1 parent 119dc8a commit 9823e4b
Show file tree
Hide file tree
Showing 7 changed files with 164 additions and 3 deletions.
Empty file.
19 changes: 19 additions & 0 deletions faststream/kafka/prometheus/middleware.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from typing import TYPE_CHECKING

from faststream.kafka.prometheus.provider import settings_provider_factory
from faststream.prometheus.middleware import BasePrometheusMiddleware

if TYPE_CHECKING:
from prometheus_client import CollectorRegistry


class KafkaPrometheusMiddleware(BasePrometheusMiddleware):
def __init__(
self,
*,
registry: "CollectorRegistry",
):
super().__init__(
settings_provider_factory=settings_provider_factory,
registry=registry,
)
62 changes: 62 additions & 0 deletions faststream/kafka/prometheus/provider.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
from typing import TYPE_CHECKING, Sequence, Tuple, Union

from faststream.broker.message import MsgType, StreamMessage
from faststream.prometheus.provider import (
ConsumeAttrs,
MetricsSettingsProvider,
)

if TYPE_CHECKING:
from aiokafka import ConsumerRecord

from faststream.types import AnyDict


class BaseKafkaMetricsSettingsProvider(MetricsSettingsProvider[MsgType]):
def __init__(self):
self.messaging_system = "kafka"

def get_publish_destination_name_from_kwargs(
self,
kwargs: "AnyDict",
) -> str:
return kwargs["topic"]


class KafkaMetricsSettingsProvider(BaseKafkaMetricsSettingsProvider["ConsumerRecord"]):
def get_consume_attrs_from_message(
self,
msg: "StreamMessage[ConsumerRecord]",
) -> ConsumeAttrs:
return {
"destination_name": msg.raw_message.topic,
"message_size": len(msg.body),
"messages_count": 1,
}


class BatchKafkaMetricsSettingsProvider(
BaseKafkaMetricsSettingsProvider[Tuple["ConsumerRecord", ...]]
):
def get_consume_attrs_from_message(
self,
msg: "StreamMessage[Tuple[ConsumerRecord, ...]]",
) -> ConsumeAttrs:
raw_message = msg.raw_message[0]
return {
"destination_name": raw_message.topic,
"message_size": len(bytearray().join(msg.body)),
"messages_count": len(msg.raw_message),
}


def settings_provider_factory(
msg: Union["ConsumerRecord", Sequence["ConsumerRecord"], None],
) -> Union[
KafkaMetricsSettingsProvider,
BatchKafkaMetricsSettingsProvider,
]:
if isinstance(msg, Sequence):
return BatchKafkaMetricsSettingsProvider()
else:
return KafkaMetricsSettingsProvider()
4 changes: 2 additions & 2 deletions faststream/redis/prometheus/middleware.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from typing import TYPE_CHECKING

from faststream.prometheus.middleware import BasePrometheusMiddleware
from faststream.redis.prometheus.provider import attributes_provider_factory
from faststream.redis.prometheus.provider import settings_provider_factory

if TYPE_CHECKING:
from prometheus_client import CollectorRegistry
Expand All @@ -14,6 +14,6 @@ def __init__(
registry: "CollectorRegistry",
):
super().__init__(
settings_provider_factory=attributes_provider_factory,
settings_provider_factory=settings_provider_factory,
registry=registry,
)
2 changes: 1 addition & 1 deletion faststream/redis/prometheus/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def get_consume_attrs_from_message(
}


def attributes_provider_factory(
def settings_provider_factory(
msg: Optional["AnyDict"],
) -> Union[
RedisMetricsSettingsProvider,
Expand Down
3 changes: 3 additions & 0 deletions tests/prometheus/kafka/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import pytest

pytest.importorskip("aio_pika")
77 changes: 77 additions & 0 deletions tests/prometheus/kafka/test_kafka.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import asyncio
from unittest.mock import Mock

import pytest
from prometheus_client import CollectorRegistry

from faststream.kafka import KafkaBroker, KafkaMessage
from faststream.kafka.prometheus.middleware import KafkaPrometheusMiddleware
from faststream.redis import RedisBroker
from faststream.redis.prometheus.middleware import RedisPrometheusMiddleware
from tests.brokers.redis.test_consume import TestConsume
from tests.brokers.redis.test_publish import TestPublish
from tests.prometheus.basic import LocalPrometheusTestcase


@pytest.mark.kafka
class TestPrometheus(LocalPrometheusTestcase):
broker_class = KafkaBroker
middleware_class = KafkaPrometheusMiddleware
message_class = KafkaMessage

async def test_metrics_batch(
self,
event: asyncio.Event,
queue: str,
):
middleware = self.middleware_class(registry=CollectorRegistry())
metrics_mock = Mock()
middleware._metrics = metrics_mock

broker = self.broker_class(middlewares=(middleware,))

args, kwargs = self.get_subscriber_params(queue, batch=True)

message_class = self.message_class
message = None

@broker.subscriber(*args, **kwargs)
async def handler(m: message_class):
event.set()

nonlocal message
message = m

async with broker:
await broker.start()
tasks = (
asyncio.create_task(
broker.publish_batch("hello", "world", topic=queue)
),
asyncio.create_task(event.wait()),
)
await asyncio.wait(tasks, timeout=self.timeout)

assert event.is_set()
self.assert_consume_metrics(
metrics=metrics_mock, message=message, exception_class=None
)
self.assert_publish_metrics(metrics=metrics_mock)


@pytest.mark.kafka
class TestPublishWithPrometheus(TestPublish):
def get_broker(self, apply_types: bool = False):
return RedisBroker(
middlewares=(RedisPrometheusMiddleware(registry=CollectorRegistry()),),
apply_types=apply_types,
)


@pytest.mark.kafka
class TestConsumeWithTelemetry(TestConsume):
def get_broker(self, apply_types: bool = False):
return RedisBroker(
middlewares=(RedisPrometheusMiddleware(registry=CollectorRegistry()),),
apply_types=apply_types,
)

0 comments on commit 9823e4b

Please sign in to comment.