diff --git a/python/src/uagents/agent.py b/python/src/uagents/agent.py index f2cb25cc..5e86c1c0 100644 --- a/python/src/uagents/agent.py +++ b/python/src/uagents/agent.py @@ -1,13 +1,13 @@ """Agent""" import asyncio +import contextlib import functools import logging import uuid from typing import ( Any, Callable, - Coroutine, Dict, List, Optional, @@ -106,18 +106,6 @@ async def _run_interval( await asyncio.sleep(period) -async def _delay(coroutine: Coroutine, delay_seconds: float): - """ - Delay the execution of the provided coroutine by the specified number of seconds. - - Args: - coroutine (Coroutine): The coroutine to delay. - delay_seconds (float): The delay time in seconds. - """ - await asyncio.sleep(delay_seconds) - await coroutine - - async def _send_error_message(ctx: Context, destination: str, msg: ErrorMessage): """ Send an error message to the specified destination. @@ -812,19 +800,17 @@ async def _schedule_registration(self): registration. """ - time_until_next_registration = REGISTRATION_UPDATE_INTERVAL_SECONDS - try: - await self.register() - except InsufficientFundsError: - time_until_next_registration = 2 * AVERAGE_BLOCK_INTERVAL - except Exception as ex: - self._logger.exception(f"Failed to register: {ex}") - time_until_next_registration = REGISTRATION_RETRY_INTERVAL_SECONDS + while True: + time_until_next_registration = REGISTRATION_UPDATE_INTERVAL_SECONDS + try: + await self.register() + except InsufficientFundsError: + time_until_next_registration = 2 * AVERAGE_BLOCK_INTERVAL + except Exception as ex: + self._logger.exception(f"Failed to register: {ex}") + time_until_next_registration = REGISTRATION_RETRY_INTERVAL_SECONDS - # schedule the next registration update - self._loop.create_task( - _delay(self._schedule_registration(), time_until_next_registration) - ) + await asyncio.sleep(time_until_next_registration) def on_interval( self, @@ -1100,7 +1086,7 @@ async def _startup(self): """ if self._registration_policy: if self._endpoints: - await self._schedule_registration() + self.start_registration_loop() else: self._logger.warning( "No endpoints provided. Skipping registration: Agent won't be reachable." @@ -1149,6 +1135,13 @@ async def setup(self): self.start_message_receivers() self.start_interval_tasks() + def start_registration_loop(self): + """ + Start the registration loop. + + """ + self._loop.create_task(self._schedule_registration()) + def start_message_dispenser(self): """ Start the message dispenser. @@ -1217,16 +1210,25 @@ async def run_async(self): tasks.append(self._mailbox_client.run()) try: - await asyncio.gather(*tasks) + await asyncio.gather(*tasks, return_exceptions=True) + except (asyncio.CancelledError, KeyboardInterrupt): + pass finally: await self._shutdown() + tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()] + _ = [task.cancel() for task in tasks] + await asyncio.gather(*tasks) def run(self): """ - Run the agent. + Run the agent by itself. + A fresh event loop is created for the agent and it is closed after the agent stops. """ - self._loop.run_until_complete(self.run_async()) + with contextlib.suppress(asyncio.CancelledError, KeyboardInterrupt): + self._loop.run_until_complete(self.run_async()) + self._loop.stop() + self._loop.close() def get_message_protocol( self, message_schema_digest @@ -1509,23 +1511,20 @@ async def _schedule_registration(self): Start the batch registration loop. """ - if not any(agent._endpoints for agent in self._agents): return - time_to_next_registration = REGISTRATION_UPDATE_INTERVAL_SECONDS - try: - await self._registration_policy.register() - except InsufficientFundsError: - time_to_next_registration = 2 * AVERAGE_BLOCK_INTERVAL - except Exception as ex: - self._logger.exception(f"Failed to register: {ex}") - time_to_next_registration = REGISTRATION_RETRY_INTERVAL_SECONDS + while True: + time_to_next_registration = REGISTRATION_UPDATE_INTERVAL_SECONDS + try: + await self._registration_policy.register() + except InsufficientFundsError: + time_to_next_registration = 2 * AVERAGE_BLOCK_INTERVAL + except Exception as ex: + self._logger.exception(f"Failed to register: {ex}") + time_to_next_registration = REGISTRATION_RETRY_INTERVAL_SECONDS - # schedule the next registration update - self._loop.create_task( - _delay(self._schedule_registration(), time_to_next_registration) - ) + await asyncio.sleep(time_to_next_registration) async def run_async(self): """ @@ -1533,6 +1532,9 @@ async def run_async(self): """ tasks = [self._server.serve()] + if not self._agents: + self._logger.warning("No agents to run.") + return for agent in self._agents: await agent.setup() if agent.agentverse["use_mailbox"] and agent.mailbox_client is not None: @@ -1540,9 +1542,11 @@ async def run_async(self): tasks.append(self._schedule_registration()) try: - await asyncio.gather(*tasks) + await asyncio.gather(*tasks, return_exceptions=True) finally: - await asyncio.gather(*[agent._shutdown() for agent in self._agents]) + await asyncio.gather( + *[agent._shutdown() for agent in self._agents], return_exceptions=True + ) def run(self): """ diff --git a/python/src/uagents/asgi.py b/python/src/uagents/asgi.py index 2150cf32..d3d14814 100644 --- a/python/src/uagents/asgi.py +++ b/python/src/uagents/asgi.py @@ -201,8 +201,8 @@ async def serve(self): ) try: await self._server.serve() - except KeyboardInterrupt: - self._logger.info("Shutting down server") + except (asyncio.CancelledError, KeyboardInterrupt): + self._logger.info("Shutting down server...") async def _handle_rest( self, diff --git a/python/src/uagents/mailbox.py b/python/src/uagents/mailbox.py index 6d9335b4..81ea6e8a 100644 --- a/python/src/uagents/mailbox.py +++ b/python/src/uagents/mailbox.py @@ -67,7 +67,9 @@ async def run(self): """ Runs the mailbox client. """ - await asyncio.gather(self.start_polling(), self.process_deletion_queue()) + loop = asyncio.get_event_loop() + loop.create_task(self.start_polling()) + loop.create_task(self.process_deletion_queue()) async def start_polling(self): """