diff --git a/faststream/nats/subscriber/usecase.py b/faststream/nats/subscriber/usecase.py index 4370f70e1a..85c133770c 100644 --- a/faststream/nats/subscriber/usecase.py +++ b/faststream/nats/subscriber/usecase.py @@ -1,7 +1,6 @@ import asyncio from abc import abstractmethod from contextlib import suppress -from functools import cached_property from typing import ( TYPE_CHECKING, Any, @@ -146,7 +145,7 @@ def setup( # type: ignore[override] _call_decorators=_call_decorators, ) - @cached_property + @property def clear_subject(self) -> str: """Compile `test.{name}` to `test.*` subject.""" _, path = compile_nats_wildcard(self.subject) @@ -215,7 +214,7 @@ def add_prefix(self, prefix: str) -> None: for subject in (self.config.filter_subjects or ()) ] - @cached_property + @property def _resolved_subject_string(self) -> str: return self.subject or ", ".join(self.config.filter_subjects or ()) diff --git a/tests/opentelemetry/basic.py b/tests/opentelemetry/basic.py index 794a09ee6d..06e1b3bb03 100644 --- a/tests/opentelemetry/basic.py +++ b/tests/opentelemetry/basic.py @@ -22,15 +22,16 @@ from faststream.opentelemetry.middleware import MessageAction as Action from faststream.opentelemetry.middleware import TelemetryMiddleware +from tests.brokers.base.basic import BaseTestcaseConfig + @pytest.mark.asyncio() -class LocalTelemetryTestcase: +class LocalTelemetryTestcase(BaseTestcaseConfig): messaging_system: str include_messages_counters: bool broker_class: Type[BrokerUsecase] - timeout: int = 3 - subscriber_kwargs: ClassVar[Dict[str, Any]] = {} - resource: Resource = Resource.create(attributes={"service.name": "faststream.test"}) + resource: Resource = Resource.create( + attributes={"service.name": "faststream.test"}) telemetry_middleware_class: TelemetryMiddleware @@ -96,7 +97,8 @@ def assert_span( assert attrs[SpanAttr.MESSAGING_MESSAGE_CONVERSATION_ID] == IsUUID, attrs[ SpanAttr.MESSAGING_MESSAGE_CONVERSATION_ID ] - assert span.name == f"{self.destination_name(queue)} {action}", span.name + assert span.name == f"{self.destination_name(queue)} { + action}", span.name assert span.kind in (SpanKind.CONSUMER, SpanKind.PRODUCER), span.kind if span.kind == SpanKind.PRODUCER and action in (Action.CREATE, Action.PUBLISH): @@ -165,7 +167,9 @@ async def test_subscriber_create_publish_process_span( mid = self.telemetry_middleware_class(tracer_provider=tracer_provider) broker = self.broker_class(middlewares=(mid,)) - @broker.subscriber(queue, **self.subscriber_kwargs) + args, kwargs = self.get_subscriber_params(queue) + + @broker.subscriber(*args, **kwargs) async def handler(m): mock(m) event.set() @@ -205,12 +209,16 @@ async def test_chain_subscriber_publisher( first_queue = queue second_queue = queue + "2" - @broker.subscriber(first_queue, **self.subscriber_kwargs) + args, kwargs = self.get_subscriber_params(first_queue) + + @broker.subscriber(*args, **kwargs) @broker.publisher(second_queue) async def handler1(m): return m - @broker.subscriber(second_queue, **self.subscriber_kwargs) + args2, kwargs2 = self.get_subscriber_params(second_queue) + + @broker.subscriber(*args2, **kwargs2) async def handler2(m): mock(m) event.set() @@ -231,10 +239,14 @@ async def handler2(m): parent_span_id = create.context.span_id self.assert_span(create, Action.CREATE, first_queue, msg) - self.assert_span(pub1, Action.PUBLISH, first_queue, msg, parent_span_id) - self.assert_span(proc1, Action.PROCESS, first_queue, msg, parent_span_id) - self.assert_span(pub2, Action.PUBLISH, second_queue, msg, proc1.context.span_id) - self.assert_span(proc2, Action.PROCESS, second_queue, msg, parent_span_id) + self.assert_span(pub1, Action.PUBLISH, + first_queue, msg, parent_span_id) + self.assert_span(proc1, Action.PROCESS, + first_queue, msg, parent_span_id) + self.assert_span(pub2, Action.PUBLISH, second_queue, + msg, proc1.context.span_id) + self.assert_span(proc2, Action.PROCESS, + second_queue, msg, parent_span_id) assert ( create.start_time @@ -258,7 +270,9 @@ async def test_no_trace_context_create_process_span( mid = self.telemetry_middleware_class(tracer_provider=tracer_provider) broker = self.broker_class(middlewares=(mid,)) - @broker.subscriber(queue, **self.subscriber_kwargs) + args, kwargs = self.get_subscriber_params(queue) + + @broker.subscriber(*args, **kwargs) async def handler(m): mock(m) event.set() @@ -295,7 +309,9 @@ async def test_metrics( mid = self.telemetry_middleware_class(meter_provider=meter_provider) broker = self.broker_class(middlewares=(mid,)) - @broker.subscriber(queue, **self.subscriber_kwargs) + args, kwargs = self.get_subscriber_params(queue) + + @broker.subscriber(*args, **kwargs) async def handler(m): mock(m) event.set() @@ -330,7 +346,9 @@ async def test_error_metrics( broker = self.broker_class(middlewares=(mid,)) expected_value_type = "ValueError" - @broker.subscriber(queue, **self.subscriber_kwargs) + args, kwargs = self.get_subscriber_params(queue) + + @broker.subscriber(*args, **kwargs) async def handler(m): try: raise ValueError diff --git a/tests/opentelemetry/confluent/test_confluent.py b/tests/opentelemetry/confluent/test_confluent.py index 930bf9aeaf..18a673142d 100644 --- a/tests/opentelemetry/confluent/test_confluent.py +++ b/tests/opentelemetry/confluent/test_confluent.py @@ -1,5 +1,5 @@ import asyncio -from typing import Any, ClassVar, Dict, Optional +from typing import Optional from unittest.mock import Mock import pytest @@ -15,18 +15,15 @@ from faststream.confluent.opentelemetry import KafkaTelemetryMiddleware from faststream.opentelemetry.consts import MESSAGING_DESTINATION_PUBLISH_NAME from faststream.opentelemetry.middleware import MessageAction as Action -from tests.brokers.confluent.test_consume import TestConsume -from tests.brokers.confluent.test_publish import TestPublish from ..basic import LocalTelemetryTestcase +from tests.brokers.confluent.basic import ConfluentTestcaseConfig @pytest.mark.confluent() -class TestTelemetry(LocalTelemetryTestcase): +class TestTelemetry(ConfluentTestcaseConfig, LocalTelemetryTestcase): messaging_system = "kafka" include_messages_counters = True - timeout: int = 10 - subscriber_kwargs: ClassVar[Dict[str, Any]] = {"auto_offset_reset": "earliest"} broker_class = KafkaBroker telemetry_middleware_class = KafkaTelemetryMiddleware @@ -81,7 +78,8 @@ async def test_batch( expected_link_count = 1 expected_link_attrs = {"messaging.batch.message_count": 3} - @broker.subscriber(queue, batch=True, **self.subscriber_kwargs) + args, kwargs = self.get_subscriber_params(queue, batch=True) + @broker.subscriber(*args, **kwargs) async def handler(m): mock(m) event.set() @@ -133,7 +131,8 @@ async def test_batch_publish_with_single_consume( expected_span_count = 8 expected_pub_batch_count = 1 - @broker.subscriber(queue, **self.subscriber_kwargs) + args, kwargs = self.get_subscriber_params(queue) + @broker.subscriber(*args, **kwargs) async def handler(msg): await msgs_queue.put(msg) @@ -191,7 +190,8 @@ async def test_single_publish_with_batch_consume( expected_span_count = 6 expected_process_batch_count = 1 - @broker.subscriber(queue, batch=True, **self.subscriber_kwargs) + args, kwargs = self.get_subscriber_params(queue, batch=True) + @broker.subscriber(*args, **kwargs) async def handler(m): m.sort() mock(m) @@ -222,21 +222,3 @@ async def handler(m): assert event.is_set() mock.assert_called_once_with(["buy", "hi"]) - - -@pytest.mark.confluent() -class TestPublishWithTelemetry(TestPublish): - def get_broker(self, apply_types: bool = False): - return KafkaBroker( - middlewares=(KafkaTelemetryMiddleware(),), - apply_types=apply_types, - ) - - -@pytest.mark.confluent() -class TestConsumeWithTelemetry(TestConsume): - def get_broker(self, apply_types: bool = False): - return KafkaBroker( - middlewares=(KafkaTelemetryMiddleware(),), - apply_types=apply_types, - )