Skip to content

Commit

Permalink
fix(core): agent shutdown routine (#578)
Browse files Browse the repository at this point in the history
  • Loading branch information
Archento authored Nov 18, 2024
1 parent 42c3793 commit 39947f7
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 47 deletions.
92 changes: 48 additions & 44 deletions python/src/uagents/agent.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
"""Agent"""

import asyncio
import contextlib
import functools
import logging
import uuid
from typing import (
Any,
Callable,
Coroutine,
Dict,
List,
Optional,
Expand Down Expand Up @@ -106,18 +106,6 @@ async def _run_interval(
await asyncio.sleep(period)


async def _delay(coroutine: Coroutine, delay_seconds: float):
"""
Delay the execution of the provided coroutine by the specified number of seconds.
Args:
coroutine (Coroutine): The coroutine to delay.
delay_seconds (float): The delay time in seconds.
"""
await asyncio.sleep(delay_seconds)
await coroutine


async def _send_error_message(ctx: Context, destination: str, msg: ErrorMessage):
"""
Send an error message to the specified destination.
Expand Down Expand Up @@ -812,19 +800,17 @@ async def _schedule_registration(self):
registration.
"""
time_until_next_registration = REGISTRATION_UPDATE_INTERVAL_SECONDS
try:
await self.register()
except InsufficientFundsError:
time_until_next_registration = 2 * AVERAGE_BLOCK_INTERVAL
except Exception as ex:
self._logger.exception(f"Failed to register: {ex}")
time_until_next_registration = REGISTRATION_RETRY_INTERVAL_SECONDS
while True:
time_until_next_registration = REGISTRATION_UPDATE_INTERVAL_SECONDS
try:
await self.register()
except InsufficientFundsError:
time_until_next_registration = 2 * AVERAGE_BLOCK_INTERVAL
except Exception as ex:
self._logger.exception(f"Failed to register: {ex}")
time_until_next_registration = REGISTRATION_RETRY_INTERVAL_SECONDS

# schedule the next registration update
self._loop.create_task(
_delay(self._schedule_registration(), time_until_next_registration)
)
await asyncio.sleep(time_until_next_registration)

def on_interval(
self,
Expand Down Expand Up @@ -1100,7 +1086,7 @@ async def _startup(self):
"""
if self._registration_policy:
if self._endpoints:
await self._schedule_registration()
self.start_registration_loop()
else:
self._logger.warning(
"No endpoints provided. Skipping registration: Agent won't be reachable."
Expand Down Expand Up @@ -1149,6 +1135,13 @@ async def setup(self):
self.start_message_receivers()
self.start_interval_tasks()

def start_registration_loop(self):
"""
Start the registration loop.
"""
self._loop.create_task(self._schedule_registration())

def start_message_dispenser(self):
"""
Start the message dispenser.
Expand Down Expand Up @@ -1217,16 +1210,25 @@ async def run_async(self):
tasks.append(self._mailbox_client.run())

try:
await asyncio.gather(*tasks)
await asyncio.gather(*tasks, return_exceptions=True)
except (asyncio.CancelledError, KeyboardInterrupt):
pass
finally:
await self._shutdown()
tasks = [t for t in asyncio.all_tasks() if t is not asyncio.current_task()]
_ = [task.cancel() for task in tasks]
await asyncio.gather(*tasks)

def run(self):
"""
Run the agent.
Run the agent by itself.
A fresh event loop is created for the agent and it is closed after the agent stops.
"""
self._loop.run_until_complete(self.run_async())
with contextlib.suppress(asyncio.CancelledError, KeyboardInterrupt):
self._loop.run_until_complete(self.run_async())
self._loop.stop()
self._loop.close()

def get_message_protocol(
self, message_schema_digest
Expand Down Expand Up @@ -1509,40 +1511,42 @@ async def _schedule_registration(self):
Start the batch registration loop.
"""

if not any(agent._endpoints for agent in self._agents):
return

time_to_next_registration = REGISTRATION_UPDATE_INTERVAL_SECONDS
try:
await self._registration_policy.register()
except InsufficientFundsError:
time_to_next_registration = 2 * AVERAGE_BLOCK_INTERVAL
except Exception as ex:
self._logger.exception(f"Failed to register: {ex}")
time_to_next_registration = REGISTRATION_RETRY_INTERVAL_SECONDS
while True:
time_to_next_registration = REGISTRATION_UPDATE_INTERVAL_SECONDS
try:
await self._registration_policy.register()
except InsufficientFundsError:
time_to_next_registration = 2 * AVERAGE_BLOCK_INTERVAL
except Exception as ex:
self._logger.exception(f"Failed to register: {ex}")
time_to_next_registration = REGISTRATION_RETRY_INTERVAL_SECONDS

# schedule the next registration update
self._loop.create_task(
_delay(self._schedule_registration(), time_to_next_registration)
)
await asyncio.sleep(time_to_next_registration)

async def run_async(self):
"""
Run the agents managed by the bureau.
"""
tasks = [self._server.serve()]
if not self._agents:
self._logger.warning("No agents to run.")
return
for agent in self._agents:
await agent.setup()
if agent.agentverse["use_mailbox"] and agent.mailbox_client is not None:
tasks.append(agent.mailbox_client.run())
tasks.append(self._schedule_registration())

try:
await asyncio.gather(*tasks)
await asyncio.gather(*tasks, return_exceptions=True)
finally:
await asyncio.gather(*[agent._shutdown() for agent in self._agents])
await asyncio.gather(
*[agent._shutdown() for agent in self._agents], return_exceptions=True
)

def run(self):
"""
Expand Down
4 changes: 2 additions & 2 deletions python/src/uagents/asgi.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,8 +201,8 @@ async def serve(self):
)
try:
await self._server.serve()
except KeyboardInterrupt:
self._logger.info("Shutting down server")
except (asyncio.CancelledError, KeyboardInterrupt):
self._logger.info("Shutting down server...")

async def _handle_rest(
self,
Expand Down
4 changes: 3 additions & 1 deletion python/src/uagents/mailbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@ async def run(self):
"""
Runs the mailbox client.
"""
await asyncio.gather(self.start_polling(), self.process_deletion_queue())
loop = asyncio.get_event_loop()
loop.create_task(self.start_polling())
loop.create_task(self.process_deletion_queue())

async def start_polling(self):
"""
Expand Down

0 comments on commit 39947f7

Please sign in to comment.