diff --git a/dapr/aio/clients/grpc/client.py b/dapr/aio/clients/grpc/client.py index 2b40101c..f9f53498 100644 --- a/dapr/aio/clients/grpc/client.py +++ b/dapr/aio/clients/grpc/client.py @@ -24,7 +24,7 @@ from warnings import warn -from typing import Callable, Dict, Optional, Text, Union, Sequence, List, Any, Awaitable +from typing import Callable, Dict, Optional, Text, Union, Sequence, List, Any from typing_extensions import Self from google.protobuf.message import Message as GrpcMessage @@ -39,14 +39,12 @@ AioRpcError, ) -from dapr.aio.clients.grpc.subscription import Subscription from dapr.clients.exceptions import DaprInternalError, DaprGrpcError from dapr.clients.grpc._crypto import EncryptOptions, DecryptOptions from dapr.clients.grpc._state import StateOptions, StateItem from dapr.clients.grpc._helpers import getWorkflowRuntimeStatus from dapr.clients.health import DaprHealth from dapr.clients.retry import RetryPolicy -from dapr.common.pubsub.subscription import StreamInactiveError from dapr.conf.helpers import GrpcEndpoint from dapr.conf import settings from dapr.proto import api_v1, api_service_v1, common_v1 @@ -96,7 +94,6 @@ UnlockResponse, GetWorkflowResponse, StartWorkflowResponse, - TopicEventResponse, ) @@ -485,72 +482,6 @@ async def publish_event( return DaprResponse(await call.initial_metadata()) - async def subscribe( - self, - pubsub_name: str, - topic: str, - metadata: Optional[dict] = None, - dead_letter_topic: Optional[str] = None, - ) -> Subscription: - """ - Subscribe to a topic with a bidirectional stream - - Args: - pubsub_name (str): The name of the pubsub component. - topic (str): The name of the topic. - metadata (Optional[dict]): Additional metadata for the subscription. - dead_letter_topic (Optional[str]): Name of the dead-letter topic. - - Returns: - Subscription: The Subscription object managing the stream. - """ - subscription = Subscription(self._stub, pubsub_name, topic, metadata, dead_letter_topic) - await subscription.start() - return subscription - - async def subscribe_with_handler( - self, - pubsub_name: str, - topic: str, - handler_fn: Callable[..., TopicEventResponse], - metadata: Optional[dict] = None, - dead_letter_topic: Optional[str] = None, - ) -> Callable[[], Awaitable[None]]: - """ - Subscribe to a topic with a bidirectional stream and a message handler function - - Args: - pubsub_name (str): The name of the pubsub component. - topic (str): The name of the topic. - handler_fn (Callable[..., TopicEventResponse]): The function to call when a message is received. - metadata (Optional[dict]): Additional metadata for the subscription. - dead_letter_topic (Optional[str]): Name of the dead-letter topic. - - Returns: - Callable[[], Awaitable[None]]: An async function to close the subscription. - """ - subscription = await self.subscribe(pubsub_name, topic, metadata, dead_letter_topic) - - async def stream_messages(sub: Subscription): - while True: - try: - message = await sub.next_message() - if message: - response = await handler_fn(message) - if response: - await subscription.respond(message, response.status) - else: - continue - except StreamInactiveError: - break - - async def close_subscription(): - await subscription.close() - - asyncio.create_task(stream_messages(subscription)) - - return close_subscription - async def get_state( self, store_name: str, diff --git a/dapr/aio/clients/grpc/subscription.py b/dapr/aio/clients/grpc/subscription.py deleted file mode 100644 index 84542bb4..00000000 --- a/dapr/aio/clients/grpc/subscription.py +++ /dev/null @@ -1,110 +0,0 @@ -import asyncio -from grpc import StatusCode -from grpc.aio import AioRpcError - -from dapr.clients.grpc._response import TopicEventResponse -from dapr.clients.health import DaprHealth -from dapr.common.pubsub.subscription import StreamInactiveError, SubscriptionMessage -from dapr.proto import api_v1, appcallback_v1 - - -class Subscription: - def __init__(self, stub, pubsub_name, topic, metadata=None, dead_letter_topic=None): - self._stub = stub - self._pubsub_name = pubsub_name - self._topic = topic - self._metadata = metadata or {} - self._dead_letter_topic = dead_letter_topic or '' - self._stream = None - self._send_queue = asyncio.Queue() - self._stream_active = asyncio.Event() - - async def start(self): - async def outgoing_request_iterator(): - try: - initial_request = api_v1.SubscribeTopicEventsRequestAlpha1( - initial_request=api_v1.SubscribeTopicEventsRequestInitialAlpha1( - pubsub_name=self._pubsub_name, - topic=self._topic, - metadata=self._metadata, - dead_letter_topic=self._dead_letter_topic, - ) - ) - yield initial_request - - while self._stream_active.is_set(): - try: - response = await asyncio.wait_for(self._send_queue.get(), timeout=1.0) - yield response - except asyncio.TimeoutError: - continue - except Exception as e: - raise Exception(f'Error while writing to stream: {e}') - - self._stream = self._stub.SubscribeTopicEventsAlpha1(outgoing_request_iterator()) - self._stream_active.set() - await self._stream.read() # discard the initial message - - async def reconnect_stream(self): - await self.close() - DaprHealth.wait_until_ready() - print('Attempting to reconnect...') - await self.start() - - async def next_message(self): - if not self._stream_active.is_set(): - raise StreamInactiveError('Stream is not active') - - try: - if self._stream is not None: - message = await self._stream.read() - if message is None: - return None - return SubscriptionMessage(message.event_message) - except AioRpcError as e: - if e.code() == StatusCode.UNAVAILABLE: - print( - f'gRPC error while reading from stream: {e.details()}, ' - f'Status Code: {e.code()}. ' - f'Attempting to reconnect...' - ) - await self.reconnect_stream() - elif e.code() != StatusCode.CANCELLED: - raise Exception(f'gRPC error while reading from subscription stream: {e} ') - except Exception as e: - raise Exception(f'Error while fetching message: {e}') - - return None - - async def respond(self, message, status): - try: - status = appcallback_v1.TopicEventResponse(status=status.value) - response = api_v1.SubscribeTopicEventsRequestProcessedAlpha1( - id=message.id(), status=status - ) - msg = api_v1.SubscribeTopicEventsRequestAlpha1(event_processed=response) - if not self._stream_active.is_set(): - raise StreamInactiveError('Stream is not active') - await self._send_queue.put(msg) - except Exception as e: - print(f"Can't send message: {e}") - - async def respond_success(self, message): - await self.respond(message, TopicEventResponse('success').status) - - async def respond_retry(self, message): - await self.respond(message, TopicEventResponse('retry').status) - - async def respond_drop(self, message): - await self.respond(message, TopicEventResponse('drop').status) - - async def close(self): - if self._stream: - try: - self._stream.cancel() - self._stream_active.clear() - except AioRpcError as e: - if e.code() != StatusCode.CANCELLED: - raise Exception(f'Error while closing stream: {e}') - except Exception as e: - raise Exception(f'Error while closing stream: {e}') diff --git a/daprdocs/content/en/python-sdk-docs/python-client.md b/daprdocs/content/en/python-sdk-docs/python-client.md index b4e92a9b..539604dd 100644 --- a/daprdocs/content/en/python-sdk-docs/python-client.md +++ b/daprdocs/content/en/python-sdk-docs/python-client.md @@ -261,10 +261,10 @@ You can create a streaming subscription to a PubSub topic using either the `subs or `subscribe_handler` methods. The `subscribe` method returns a `Subscription` object, which allows you to pull messages from the -stream by -calling the `next_message` method. This will block on the main thread while waiting for messages. -When done, you should call the close method to terminate the -subscription and stop receiving messages. +stream by calling the `next_message` method. This will block on the main thread while waiting for +messages. +When done, you should call the close method to terminate the subscription and stop receiving +messages. The `subscribe_with_handler` method accepts a callback function that is executed for each message received from the stream. diff --git a/examples/pubsub-streaming-async/README.md b/examples/pubsub-streaming-async/README.md deleted file mode 100644 index dfa7d27d..00000000 --- a/examples/pubsub-streaming-async/README.md +++ /dev/null @@ -1,122 +0,0 @@ -# Example - Publish and subscribe to messages - -This example utilizes a publisher and a subscriber to show the bidirectional pubsub pattern. -It creates a publisher and calls the `publish_event` method in the `DaprClient`. -In the s`subscriber.py` file it creates a subscriber object that can call the `next_message` method to get new messages from the stream. After processing the new message, it returns a status to the stream. - - -> **Note:** Make sure to use the latest proto bindings - -## Pre-requisites - -- [Dapr CLI and initialized environment](https://docs.dapr.io/getting-started) -- [Install Python 3.8+](https://www.python.org/downloads/) - -## Install Dapr python-SDK - - - -```bash -pip3 install dapr -``` - -## Run async example where users control reading messages off the stream - -Run the following command in a terminal/command prompt: - - - -```bash -# 1. Start Subscriber -dapr run --app-id python-subscriber --app-protocol grpc python3 subscriber.py -``` - - - -In another terminal/command prompt run: - - - -```bash -# 2. Start Publisher -dapr run --app-id python-publisher --app-protocol grpc --dapr-grpc-port=3500 --enable-app-health-check python3 publisher.py -``` - - - -## Run async example with a handler function - -Run the following command in a terminal/command prompt: - - - -```bash -# 1. Start Subscriber -dapr run --app-id python-subscriber --app-protocol grpc python3 subscriber-handler.py -``` - - - -In another terminal/command prompt run: - - - -```bash -# 2. Start Publisher -dapr run --app-id python-publisher --app-protocol grpc --dapr-grpc-port=3500 --enable-app-health-check python3 publisher.py -``` - - - - -## Cleanup - - diff --git a/examples/pubsub-streaming-async/publisher.py b/examples/pubsub-streaming-async/publisher.py deleted file mode 100644 index b9702355..00000000 --- a/examples/pubsub-streaming-async/publisher.py +++ /dev/null @@ -1,45 +0,0 @@ -# ------------------------------------------------------------ -# Copyright 2022 The Dapr Authors -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# http://www.apache.org/licenses/LICENSE-2.0 -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ------------------------------------------------------------ -import asyncio -import json - -from dapr.aio.clients import DaprClient - - -async def publish_events(): - """ - Publishes events to a pubsub topic asynchronously - """ - - async with DaprClient() as d: - id = 0 - while id < 5: - id += 1 - req_data = {'id': id, 'message': 'hello world'} - - # Create a typed message with content type and body - await d.publish_event( - pubsub_name='pubsub', - topic_name='TOPIC_A', - data=json.dumps(req_data), - data_content_type='application/json', - publish_metadata={'ttlInSeconds': '100', 'rawPayload': 'false'}, - ) - - # Print the request - print(req_data, flush=True) - - await asyncio.sleep(1) - - -asyncio.run(publish_events()) diff --git a/examples/pubsub-streaming-async/subscriber-handler.py b/examples/pubsub-streaming-async/subscriber-handler.py deleted file mode 100644 index f9503f06..00000000 --- a/examples/pubsub-streaming-async/subscriber-handler.py +++ /dev/null @@ -1,42 +0,0 @@ -import asyncio -from dapr.aio.clients import DaprClient -from dapr.clients.grpc._response import TopicEventResponse - -counter = 0 - - -async def process_message(message) -> TopicEventResponse: - """ - Asynchronously processes the message and returns a TopicEventResponse. - """ - - print(f'Processing message: {message.data()} from {message.topic()}...') - global counter - counter += 1 - return TopicEventResponse('success') - - -async def main(): - """ - Main function to subscribe to a pubsub topic and handle messages asynchronously. - """ - async with DaprClient() as client: - # Subscribe to the pubsub topic with the message handler - close_fn = await client.subscribe_with_handler( - pubsub_name='pubsub', - topic='TOPIC_A', - handler_fn=process_message, - dead_letter_topic='TOPIC_A_DEAD', - ) - - # Wait until 5 messages are processed - global counter - while counter < 5: - await asyncio.sleep(1) - - print('Closing subscription...') - await close_fn() - - -if __name__ == '__main__': - asyncio.run(main()) diff --git a/examples/pubsub-streaming-async/subscriber.py b/examples/pubsub-streaming-async/subscriber.py deleted file mode 100644 index 0f7da59b..00000000 --- a/examples/pubsub-streaming-async/subscriber.py +++ /dev/null @@ -1,53 +0,0 @@ -import asyncio - -from dapr.aio.clients import DaprClient -from dapr.clients.grpc.subscription import StreamInactiveError - -counter = 0 - - -def process_message(message): - global counter - counter += 1 - # Process the message here - print(f'Processing message: {message.data()} from {message.topic()}...') - return 'success' - - -async def main(): - async with DaprClient() as client: - global counter - subscription = await client.subscribe( - pubsub_name='pubsub', topic='TOPIC_A', dead_letter_topic='TOPIC_A_DEAD' - ) - - try: - while counter < 5: - try: - message = await subscription.next_message() - - except StreamInactiveError: - print('Stream is inactive. Retrying...') - await asyncio.sleep(1) - continue - if message is None: - print('No message received within timeout period.') - continue - - # Process the message - response_status = process_message(message) - - if response_status == 'success': - await subscription.respond_success(message) - elif response_status == 'retry': - await subscription.respond_retry(message) - elif response_status == 'drop': - await subscription.respond_drop(message) - - finally: - print('Closing subscription...') - await subscription.close() - - -if __name__ == '__main__': - asyncio.run(main()) diff --git a/tests/clients/test_dapr_grpc_client_async.py b/tests/clients/test_dapr_grpc_client_async.py index 42bbd830..754abbeb 100644 --- a/tests/clients/test_dapr_grpc_client_async.py +++ b/tests/clients/test_dapr_grpc_client_async.py @@ -24,7 +24,6 @@ from dapr.aio.clients.grpc.client import DaprGrpcClientAsync from dapr.aio.clients import DaprClient from dapr.clients.exceptions import DaprGrpcError -from dapr.common.pubsub.subscription import StreamInactiveError from dapr.proto import common_v1 from .fake_dapr_server import FakeDaprSidecar from dapr.conf import settings @@ -261,128 +260,6 @@ async def test_publish_error(self): data=111, ) - async def test_subscribe_topic(self): - # The fake server we're using sends two messages and then closes the stream - # The client should be able to read both messages, handle the stream closure and reconnect - # which will result in reading the same two messages again. - # That's why message 3 should be the same as message 1 - dapr = DaprGrpcClientAsync(f'{self.scheme}localhost:{self.grpc_port}') - subscription = await dapr.subscribe(pubsub_name='pubsub', topic='example') - - # First message - text - message1 = await subscription.next_message() - await subscription.respond_success(message1) - - self.assertEqual('111', message1.id()) - self.assertEqual('app1', message1.source()) - self.assertEqual('com.example.type2', message1.type()) - self.assertEqual('1.0', message1.spec_version()) - self.assertEqual('text/plain', message1.data_content_type()) - self.assertEqual('TOPIC_A', message1.topic()) - self.assertEqual('pubsub', message1.pubsub_name()) - self.assertEqual(b'hello2', message1.raw_data()) - self.assertEqual('text/plain', message1.data_content_type()) - self.assertEqual('hello2', message1.data()) - - # Second message - json - message2 = await subscription.next_message() - await subscription.respond_success(message2) - - self.assertEqual('222', message2.id()) - self.assertEqual('app1', message2.source()) - self.assertEqual('com.example.type2', message2.type()) - self.assertEqual('1.0', message2.spec_version()) - self.assertEqual('TOPIC_A', message2.topic()) - self.assertEqual('pubsub', message2.pubsub_name()) - self.assertEqual(b'{"a": 1}', message2.raw_data()) - self.assertEqual('application/json', message2.data_content_type()) - self.assertEqual({'a': 1}, message2.data()) - - # On this call the stream will be closed and return an error, so the message will be none - # but the client will try to reconnect - message3 = await subscription.next_message() - self.assertIsNone(message3) - - # # The client already reconnected and will start reading the messages again - # # Since we're working with a fake server, the messages will be the same - # message4 = await subscription.next_message() - # await subscription.respond_success(message4) - # self.assertEqual('111', message4.id()) - # self.assertEqual('app1', message4.source()) - # self.assertEqual('com.example.type2', message4.type()) - # self.assertEqual('1.0', message4.spec_version()) - # self.assertEqual('text/plain', message4.data_content_type()) - # self.assertEqual('TOPIC_A', message4.topic()) - # self.assertEqual('pubsub', message4.pubsub_name()) - # self.assertEqual(b'hello2', message4.raw_data()) - # self.assertEqual('text/plain', message4.data_content_type()) - # self.assertEqual('hello2', message4.data()) - - await subscription.close() - - async def test_subscribe_topic_early_close(self): - dapr = DaprGrpcClientAsync(f'{self.scheme}localhost:{self.grpc_port}') - subscription = await dapr.subscribe(pubsub_name='pubsub', topic='example') - await subscription.close() - - with self.assertRaises(StreamInactiveError): - await subscription.next_message() - - # async def test_subscribe_topic_with_handler(self): - # # The fake server we're using sends two messages and then closes the stream - # # The client should be able to read both messages, handle the stream closure and reconnect - # # which will result in reading the same two messages again. - # # That's why message 3 should be the same as message 1 - # dapr = DaprGrpcClientAsync(f'{self.scheme}localhost:{self.grpc_port}') - # counter = 0 - # - # async def handler(message): - # nonlocal counter - # if counter == 0: - # self.assertEqual('111', message.id()) - # self.assertEqual('app1', message.source()) - # self.assertEqual('com.example.type2', message.type()) - # self.assertEqual('1.0', message.spec_version()) - # self.assertEqual('text/plain', message.data_content_type()) - # self.assertEqual('TOPIC_A', message.topic()) - # self.assertEqual('pubsub', message.pubsub_name()) - # self.assertEqual(b'hello2', message.raw_data()) - # self.assertEqual('text/plain', message.data_content_type()) - # self.assertEqual('hello2', message.data()) - # elif counter == 1: - # self.assertEqual('222', message.id()) - # self.assertEqual('app1', message.source()) - # self.assertEqual('com.example.type2', message.type()) - # self.assertEqual('1.0', message.spec_version()) - # self.assertEqual('TOPIC_A', message.topic()) - # self.assertEqual('pubsub', message.pubsub_name()) - # self.assertEqual(b'{"a": 1}', message.raw_data()) - # self.assertEqual('application/json', message.data_content_type()) - # self.assertEqual({'a': 1}, message.data()) - # elif counter == 2: - # self.assertEqual('111', message.id()) - # self.assertEqual('app1', message.source()) - # self.assertEqual('com.example.type2', message.type()) - # self.assertEqual('1.0', message.spec_version()) - # self.assertEqual('text/plain', message.data_content_type()) - # self.assertEqual('TOPIC_A', message.topic()) - # self.assertEqual('pubsub', message.pubsub_name()) - # self.assertEqual(b'hello2', message.raw_data()) - # self.assertEqual('text/plain', message.data_content_type()) - # self.assertEqual('hello2', message.data()) - # - # counter += 1 - # - # return TopicEventResponse("success") - # - # close_fn = await dapr.subscribe_with_handler( - # pubsub_name='pubsub', topic='example', handler_fn=handler - # ) - # - # while counter < 3: - # await asyncio.sleep(0.1) # sleep to prevent a busy loop - # await close_fn() - @patch.object(settings, 'DAPR_API_TOKEN', 'test-token') async def test_dapr_api_token_insertion(self): dapr = DaprGrpcClientAsync(f'{self.scheme}localhost:{self.grpc_port}')