Skip to content

Commit

Permalink
fix: fix Nats router subjects
Browse files Browse the repository at this point in the history
  • Loading branch information
Lancetnik committed Aug 7, 2024
1 parent 92a2988 commit 3924a13
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 45 deletions.
5 changes: 2 additions & 3 deletions faststream/nats/subscriber/usecase.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import asyncio
from abc import abstractmethod
from contextlib import suppress
from functools import cached_property
from typing import (
TYPE_CHECKING,
Any,
Expand Down Expand Up @@ -146,7 +145,7 @@ def setup( # type: ignore[override]
_call_decorators=_call_decorators,
)

@cached_property
@property
def clear_subject(self) -> str:
"""Compile `test.{name}` to `test.*` subject."""
_, path = compile_nats_wildcard(self.subject)
Expand Down Expand Up @@ -215,7 +214,7 @@ def add_prefix(self, prefix: str) -> None:
for subject in (self.config.filter_subjects or ())
]

@cached_property
@property
def _resolved_subject_string(self) -> str:
return self.subject or ", ".join(self.config.filter_subjects or ())

Expand Down
48 changes: 33 additions & 15 deletions tests/opentelemetry/basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,16 @@
from faststream.opentelemetry.middleware import MessageAction as Action
from faststream.opentelemetry.middleware import TelemetryMiddleware

from tests.brokers.base.basic import BaseTestcaseConfig


@pytest.mark.asyncio()
class LocalTelemetryTestcase:
class LocalTelemetryTestcase(BaseTestcaseConfig):
messaging_system: str
include_messages_counters: bool
broker_class: Type[BrokerUsecase]
timeout: int = 3
subscriber_kwargs: ClassVar[Dict[str, Any]] = {}
resource: Resource = Resource.create(attributes={"service.name": "faststream.test"})
resource: Resource = Resource.create(
attributes={"service.name": "faststream.test"})

telemetry_middleware_class: TelemetryMiddleware

Expand Down Expand Up @@ -96,7 +97,8 @@ def assert_span(
assert attrs[SpanAttr.MESSAGING_MESSAGE_CONVERSATION_ID] == IsUUID, attrs[
SpanAttr.MESSAGING_MESSAGE_CONVERSATION_ID
]
assert span.name == f"{self.destination_name(queue)} {action}", span.name
assert span.name == f"{self.destination_name(queue)} {
action}", span.name
assert span.kind in (SpanKind.CONSUMER, SpanKind.PRODUCER), span.kind

if span.kind == SpanKind.PRODUCER and action in (Action.CREATE, Action.PUBLISH):
Expand Down Expand Up @@ -165,7 +167,9 @@ async def test_subscriber_create_publish_process_span(
mid = self.telemetry_middleware_class(tracer_provider=tracer_provider)
broker = self.broker_class(middlewares=(mid,))

@broker.subscriber(queue, **self.subscriber_kwargs)
args, kwargs = self.get_subscriber_params(queue)

@broker.subscriber(*args, **kwargs)
async def handler(m):
mock(m)
event.set()
Expand Down Expand Up @@ -205,12 +209,16 @@ async def test_chain_subscriber_publisher(
first_queue = queue
second_queue = queue + "2"

@broker.subscriber(first_queue, **self.subscriber_kwargs)
args, kwargs = self.get_subscriber_params(first_queue)

@broker.subscriber(*args, **kwargs)
@broker.publisher(second_queue)
async def handler1(m):
return m

@broker.subscriber(second_queue, **self.subscriber_kwargs)
args2, kwargs2 = self.get_subscriber_params(second_queue)

@broker.subscriber(*args2, **kwargs2)
async def handler2(m):
mock(m)
event.set()
Expand All @@ -231,10 +239,14 @@ async def handler2(m):
parent_span_id = create.context.span_id

self.assert_span(create, Action.CREATE, first_queue, msg)
self.assert_span(pub1, Action.PUBLISH, first_queue, msg, parent_span_id)
self.assert_span(proc1, Action.PROCESS, first_queue, msg, parent_span_id)
self.assert_span(pub2, Action.PUBLISH, second_queue, msg, proc1.context.span_id)
self.assert_span(proc2, Action.PROCESS, second_queue, msg, parent_span_id)
self.assert_span(pub1, Action.PUBLISH,
first_queue, msg, parent_span_id)
self.assert_span(proc1, Action.PROCESS,
first_queue, msg, parent_span_id)
self.assert_span(pub2, Action.PUBLISH, second_queue,
msg, proc1.context.span_id)
self.assert_span(proc2, Action.PROCESS,
second_queue, msg, parent_span_id)

assert (
create.start_time
Expand All @@ -258,7 +270,9 @@ async def test_no_trace_context_create_process_span(
mid = self.telemetry_middleware_class(tracer_provider=tracer_provider)
broker = self.broker_class(middlewares=(mid,))

@broker.subscriber(queue, **self.subscriber_kwargs)
args, kwargs = self.get_subscriber_params(queue)

@broker.subscriber(*args, **kwargs)
async def handler(m):
mock(m)
event.set()
Expand Down Expand Up @@ -295,7 +309,9 @@ async def test_metrics(
mid = self.telemetry_middleware_class(meter_provider=meter_provider)
broker = self.broker_class(middlewares=(mid,))

@broker.subscriber(queue, **self.subscriber_kwargs)
args, kwargs = self.get_subscriber_params(queue)

@broker.subscriber(*args, **kwargs)
async def handler(m):
mock(m)
event.set()
Expand Down Expand Up @@ -330,7 +346,9 @@ async def test_error_metrics(
broker = self.broker_class(middlewares=(mid,))
expected_value_type = "ValueError"

@broker.subscriber(queue, **self.subscriber_kwargs)
args, kwargs = self.get_subscriber_params(queue)

@broker.subscriber(*args, **kwargs)
async def handler(m):
try:
raise ValueError
Expand Down
36 changes: 9 additions & 27 deletions tests/opentelemetry/confluent/test_confluent.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import asyncio
from typing import Any, ClassVar, Dict, Optional
from typing import Optional
from unittest.mock import Mock

import pytest
Expand All @@ -15,18 +15,15 @@
from faststream.confluent.opentelemetry import KafkaTelemetryMiddleware
from faststream.opentelemetry.consts import MESSAGING_DESTINATION_PUBLISH_NAME
from faststream.opentelemetry.middleware import MessageAction as Action
from tests.brokers.confluent.test_consume import TestConsume
from tests.brokers.confluent.test_publish import TestPublish

from ..basic import LocalTelemetryTestcase
from tests.brokers.confluent.basic import ConfluentTestcaseConfig


@pytest.mark.confluent()
class TestTelemetry(LocalTelemetryTestcase):
class TestTelemetry(ConfluentTestcaseConfig, LocalTelemetryTestcase):
messaging_system = "kafka"
include_messages_counters = True
timeout: int = 10
subscriber_kwargs: ClassVar[Dict[str, Any]] = {"auto_offset_reset": "earliest"}
broker_class = KafkaBroker
telemetry_middleware_class = KafkaTelemetryMiddleware

Expand Down Expand Up @@ -81,7 +78,8 @@ async def test_batch(
expected_link_count = 1
expected_link_attrs = {"messaging.batch.message_count": 3}

@broker.subscriber(queue, batch=True, **self.subscriber_kwargs)
args, kwargs = self.get_subscriber_params(queue, batch=True)
@broker.subscriber(*args, **kwargs)
async def handler(m):
mock(m)
event.set()
Expand Down Expand Up @@ -133,7 +131,8 @@ async def test_batch_publish_with_single_consume(
expected_span_count = 8
expected_pub_batch_count = 1

@broker.subscriber(queue, **self.subscriber_kwargs)
args, kwargs = self.get_subscriber_params(queue)
@broker.subscriber(*args, **kwargs)
async def handler(msg):
await msgs_queue.put(msg)

Expand Down Expand Up @@ -191,7 +190,8 @@ async def test_single_publish_with_batch_consume(
expected_span_count = 6
expected_process_batch_count = 1

@broker.subscriber(queue, batch=True, **self.subscriber_kwargs)
args, kwargs = self.get_subscriber_params(queue, batch=True)
@broker.subscriber(*args, **kwargs)
async def handler(m):
m.sort()
mock(m)
Expand Down Expand Up @@ -222,21 +222,3 @@ async def handler(m):

assert event.is_set()
mock.assert_called_once_with(["buy", "hi"])


@pytest.mark.confluent()
class TestPublishWithTelemetry(TestPublish):
def get_broker(self, apply_types: bool = False):
return KafkaBroker(
middlewares=(KafkaTelemetryMiddleware(),),
apply_types=apply_types,
)


@pytest.mark.confluent()
class TestConsumeWithTelemetry(TestConsume):
def get_broker(self, apply_types: bool = False):
return KafkaBroker(
middlewares=(KafkaTelemetryMiddleware(),),
apply_types=apply_types,
)

0 comments on commit 3924a13

Please sign in to comment.