Skip to content

Commit

Permalink
improves async response management and enriches status embeds
Browse files Browse the repository at this point in the history
  • Loading branch information
KPrasch committed Oct 10, 2023
1 parent 2769c76 commit 2d3c497
Show file tree
Hide file tree
Showing 5 changed files with 214 additions and 56 deletions.
46 changes: 46 additions & 0 deletions examples/nucypher/quirkbot.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
quirk:
name: "Coordinator Events Relay"
env: "./.env"
batch_size: 50 # events
loop_interval: 120 # seconds

web3:
infura: "{{ INFURA_API_KEY }}"

bot:
token: "{{ DISCORD_BOT_TOKEN }}"
prefix: '!'
subscribers:
- name: "QuirkBot Test Server"
channel_id: "{{ SUBSCRIBER_CHANNEL_ID }}"
description: "Test discord server description"

events:
- address: "0xF429C1f2d42765FE2b04CC62ab037564C2C66e5E"
name: "Coordinator"
description: "Lynx Domain Coordinator Events"
chain_id: 137
abi_file: './abis/coordinator.json'
events:
- StartRitual
- StartAggregationRound
- EndRitual
- TranscriptPosted
- AggregationPosted
- TimeoutChanged
- MaxDkgSizeChanged
- ParticipantPublicKeySet

- address: "0x8E49989F9D3aD89c8ab0de21FbA2E00C67ca872F"
description: "Tapir Domain Coordinator Events"
chain_id: 137
abi_file: './abis/coordinator.json'
events:
- StartRitual
- StartAggregationRound
- EndRitual
- TranscriptPosted
- AggregationPosted
- TimeoutChanged
- MaxDkgSizeChanged
- ParticipantPublicKeySet
167 changes: 125 additions & 42 deletions quirkbot/bot.py
Original file line number Diff line number Diff line change
@@ -1,44 +1,58 @@
import asyncio
import datetime
from asyncio import Queue
from collections import defaultdict
from typing import Dict, List

from discord.ext import commands
from discord.ext import tasks
from web3 import Web3, HTTPProvider

from quirkbot import defaults
from quirkbot.embeds import make_status_embed
from quirkbot.events import (
Event,
EventType,
log_event,
_load_web3_event_types,
send_event_message
)
from quirkbot.log import LOGGER
from quirkbot.subscribers import _get_subscribers, Subscriber
from quirkbot.utils import async_lock, get_infura_url
from quirkbot.utils import get_infura_url


class QuirkBot(commands.Cog):
def __init__(
self,
bot,
name: str,
providers: Dict[int, HTTPProvider],
events: Dict[str, EventType],
loop_interval: int,
batch_size: int,
subscribers: List[Subscriber] = None,
):

self.bot = bot
self.name = name
self.events = events
self.providers = providers
self.batch_size = batch_size

self.lock = asyncio.Lock()
self.task_queue = Queue()

self.events_processed = 0
self._subscribers = subscribers or []
self.latest_scanned_blocks: Dict[int, int] = defaultdict(int)

self.lock = asyncio.Lock()
self.latest_scanned_blocks: Dict[int, int] = defaultdict(int)
self.loop_interval = loop_interval
self.check_web3_events.change_interval(seconds=self.loop_interval)
self.start_time = datetime.datetime.now()

@commands.Cog.listener()
async def on_ready(self):
self.bot.loop.create_task(self.process_queue())
for chain_id, provider in self.providers.items():
w3 = Web3(provider)
w3.middleware_onion.clear()
Expand All @@ -50,72 +64,141 @@ async def on_ready(self):
self._subscribers = _get_subscribers(
bot=self.bot, subscribers=self._subscribers
)
self.bot.loop.create_task(self.initialize_check_web3_events())
self.bot.loop.create_task(self.initialize_check_web3_events_thread())
LOGGER.info(f"Bot is active!")
LOGGER.info(
f"{len(self._subscribers)} Subscribers; "
f"{len(self.events)} Events; "
f"{len(self.providers)} Providers"
)

@property
def uptime(self):
if self.start_time is None:
return "Bot is not active"
else:
return datetime.datetime.now() - self.start_time

@classmethod
def from_config(cls, config: Dict, bot):
infura_api_key = config["web3_endpoints"]["infura"]
chain_ids = {contract["chain_id"] for contract in config["publishers"]}
try:

# top-level keys
quirk = config["quirk"]
events = config["events"]
web3 = config["web3"]
bot_config = config["bot"]

# nested keys
name = quirk["name"]
infura_api_key = web3["infura"]
chain_ids = {contract["chain_id"] for contract in events}
subscribers_data = bot_config["subscribers"]

except KeyError as e:
message = "missing required key in configuration file: (quirk|web3|bot|events)."
LOGGER.error(message)
raise e

batch_size = quirk.get("batch_size", defaults.BATCH_SIZE)
loop_interval = quirk.get("loop_interval", defaults.LOOP_INTERVAL)

providers = {
cid: Web3.HTTPProvider(get_infura_url(cid, infura_api_key))
for cid in chain_ids
}
subscribers_data = config["bot"]["subscribers"]
events = _load_web3_event_types(config, providers=providers)
instance = cls(
bot=bot, providers=providers, events=events, subscribers=subscribers_data
bot=bot,
name=name,
loop_interval=loop_interval,
batch_size=batch_size,
providers=providers,
events=events,
subscribers=subscribers_data
)
return instance

def initialize_check_web3_events_thread(self):
self.bot.loop.create_task(self.initialize_check_web3_events())

async def initialize_check_web3_events(self):
await self.bot.wait_until_ready()
self.check_web3_events.start()

async def fetch_events(self, event_type, start_block):
try:
events = event_type.get_logs(fromBlock=start_block)
num_new_events = len(events)
if num_new_events > 0:
LOGGER.info(f"Found {num_new_events} new events")
await self.handle_events(events)
except Exception as e:
LOGGER.error(f"Error in check_web3_events: {e}")
finally:
self.latest_scanned_blocks[
event_type.w3.eth.chain_id
] = event_type.w3.eth.block_number
self.events_processed += 1
async def process_queue(self):
while True:
task = await self.task_queue.get()
await task
self.task_queue.task_done()
await asyncio.sleep(0.1) # Adjust as necessary

async def handle_events(self, events):
async def fetch_events(self, event_type, start_block, batch_size=100):
async with self.lock:
# Lock to prevent simultaneous scans
try:
latest_block = event_type.w3.eth.block_number
end_block = min(start_block + batch_size, latest_block)

# If there's nothing new, just return
if end_block <= start_block:
return

LOGGER.info(f"Fetching events from {start_block} to {end_block}")
events = event_type.get_logs(fromBlock=start_block, toBlock=end_block)
num_new_events = len(events)
if num_new_events > 0:
LOGGER.info(f"Found {num_new_events} new events")
await self.handle_events(events)

self.latest_scanned_blocks[event_type.w3.eth.chain_id] = end_block
self.events_processed += num_new_events
except Exception as e:
LOGGER.error(f"Error in check_web3_events: {e}")

async def handle_events(self, events: List[Dict]):
for event_data in events:
event = Event.from_attr_dict(event_data)
log_event(event)
for subscriber in self._subscribers:
self.bot.loop.create_task(send_event_message(subscriber, event))
task = send_event_message(subscriber, event)
self.task_queue.put_nowait(task)

@tasks.loop(seconds=60)
@tasks.loop(seconds=None)
async def check_web3_events(self):
LOGGER.info("Next round of web3 event checking.")
async with async_lock(self.lock):
for event_type in self.events:
event_name = event_type.name
chain_id = event_type.chain_id
contract_address = event_type.address
latest_block = event_type.w3.eth.block_number
try:
LOGGER.info("Next round of web3 event checking.")
for event_type in self.events: # Iterate over all event types
LOGGER.info(f"Scanning {event_type.name}|{event_type.address[:8]}")
# Identify the starting block for scanning based on the last scanned block
start_block = self.latest_scanned_blocks[event_type.w3.eth.chain_id] + 1
LOGGER.info(
f"Checking for {event_name} {contract_address[:8]}@{chain_id} "
f"between blocks {start_block} and {latest_block}"
)
await self.fetch_events(event_type, start_block)
latest_block = event_type.w3.eth.block_number

# Batch fetching logic
while start_block <= latest_block:
await self.fetch_events(event_type, start_block, self.batch_size)
start_block += self.batch_size # Update the start block for the next batch

await asyncio.sleep(0.1) # be kind to the API
except Exception as e:
LOGGER.error(f"Error in check_web3_events: {e}")

@commands.command()
async def status(self, ctx):
embed = await make_status_embed(ctx=ctx, w3c=self)
await ctx.send(embed=embed)
try:
embed = await make_status_embed(ctx=ctx, w3c=self)
await ctx.send(embed=embed)
except Exception as e:
LOGGER.error(f"Error in status: {e}")

@commands.command()
@commands.has_permissions(administrator=True) # Restrict to admins
async def set_loop_interval(self, ctx, interval: int):
"""Sets the interval for the event checking loop."""

# Validation for the new interval (for example, it should be greater than 0)
if interval <= 0:
await ctx.send("Interval must be greater than zero.")
return

self.check_web3_events.change_interval(seconds=interval)
await ctx.send(f"Loop interval has been set to {interval} seconds.")

2 changes: 2 additions & 0 deletions quirkbot/defaults.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
BATCH_SIZE = 100
LOOP_INTERVAL = 60
51 changes: 39 additions & 12 deletions quirkbot/embeds.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,57 @@
from collections import defaultdict

from discord import Embed
from web3 import Web3

from quirkbot.networks import NETWORKS


async def make_status_embed(w3c, ctx):
embed = Embed(
title="Bot Status",
color=0x00FF00, # You can choose your own color
description="Current status of the bot.",
title=f"{w3c.name} Status",
color=000000,
# description="Current status of the bot."
)
# Add fields to the embed

pretty_blocks = ", ".join(f"{NETWORKS.get(k, k)}: {v}" for k, v in w3c.latest_scanned_blocks.items())
embed.add_field(
name="Latest Scanned Block", value=w3c.latest_scanned_block, inline=True
name="Latest Scanned Blocks",
value=pretty_blocks,
inline=False
)
embed.add_field(name="Events Processed", value=w3c.events_processed, inline=True)
embed.add_field(name="Processed", value=w3c.events_processed, inline=True)
embed.add_field(name="Subscribers", value=len(w3c._subscribers), inline=True)
embed.add_field(name="Events", value=len(w3c.events), inline=True)
embed.add_field(name="Loop Interval", value=w3c.loop_interval, inline=True)
embed.add_field(name="Batch Size", value=w3c.batch_size, inline=True)

raw_uptime = w3c.uptime
pretty_up_time = (f"{raw_uptime.days}D "
f"{raw_uptime.seconds // 3600}H "
f"{(raw_uptime.seconds // 60) % 60}M "
f"{raw_uptime.seconds % 60}S")
embed.add_field(name="Uptime", value=pretty_up_time, inline=True)

contract_events = defaultdict(list)
for event_type in w3c.events:
contract_events[event_type.address].append(event_type.name)

human_readable_events = []
for address in sorted(contract_events.keys()): # Sorting contracts by address
events = sorted(contract_events[address]) # Sorting events within each contract
event_lines = "\n".join(f"• {event}" for event in events)
contract_line = f"---\nContract: [0x{address}...](https://etherscan.io/address/{address})\n{event_lines}"
human_readable_events.append(contract_line)

human_readable_events = [
f"{name}: {event_type.address}" for name, event_type in w3c.events.items()
]
embed.add_field(
name="Tracked Events", value=", ".join(human_readable_events), inline=False
name="Tracked Events",
value="\n".join(human_readable_events),
inline=False,
)
embed.set_footer(text="Status requested by: {}".format(ctx.author.display_name))

embed.set_footer(text=f"Status requested by: {ctx.author.display_name}")
return embed


def create_event_embed(event_instance):
embed = Embed(
title=f"New {event_instance.event_type} Event",
Expand Down
4 changes: 2 additions & 2 deletions quirkbot/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ def _load_web3_event_types(
config: Dict, providers: Dict[int, HTTPProvider]
) -> Set[EventType]:
events = set()
for event in config["publishers"]:
for event in config["events"]:
contract_address = event["address"]
event_names = event["events"]
chain_id = event["chain_id"]
Expand All @@ -133,7 +133,7 @@ def _load_web3_event_types(
contract = w3.eth.contract(address=contract_address, abi=event_abi)
event_type = EventType(
w3_type=contract.events[event_name](),
description=description
description=description,
)

events.add(event_type)
Expand Down

0 comments on commit 2d3c497

Please sign in to comment.