Skip to content

Commit

Permalink
Optimize for speed (#14)
Browse files Browse the repository at this point in the history
* feat: second event loop in bg thread

* chore: remove unused return value

* feat: in_process_calls

* fix: race condition

* feat: type hints

* chore: remove useless property

* fix: missing import

* fix: exception handling when exception has an exception in args[0]

* chore: remove useless var

* fix: race condition

* feat: run execute_multicall in second thread to reduce event loop congestion

* chore: remove unused func

* chore: remove unused property

* feat: queue_is_full

* feat: improved loop timer logic

* fix: debug logging statement

* chore: increase default loop interval to 0.01s

* chore: remove semaphore

* chore: bump eth_retry version

* chore: refactor
  • Loading branch information
BobTheBuidler authored Jul 21, 2022
1 parent 86cd4be commit d13205e
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 58 deletions.
3 changes: 1 addition & 2 deletions dank_mids/_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,4 @@
import asyncio
import os

SEMAPHORE = asyncio.Semaphore(int(os.environ.get("DANKMIDSSEMAPHORE", 100_000)))
LOOP_INTERVAL = float(os.environ.get("DANKMIDSLOOPINTERVAL", 0.001))
LOOP_INTERVAL = float(os.environ.get("DANKMIDSLOOPINTERVAL", 0.01))
111 changes: 59 additions & 52 deletions dank_mids/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from collections import defaultdict
from functools import lru_cache
from time import time
from typing import Any, Dict, List, Literal, Optional, Union
from typing import Any, Dict, List, Literal, Optional, Set, Union

import multicall
from aiohttp import RequestInfo
Expand Down Expand Up @@ -34,6 +34,8 @@ def _err_msg(e: Exception) -> str:
""" Extract an error message from `e` to use in a spoof rpc response. """
if isinstance(e.args[0], str) or isinstance(e.args[0], RequestInfo):
err_msg = f"DankMidsError: {type(e)} {e.args}"
elif isinstance(e.args[0], Exception):
err_msg = f"DankMidsError: {type(e.args[0])} {e.args[0].args}"
elif "message" in e.args[0]:
err_msg = e.args[0]["message"]
elif "error" in e.args[0] and "message" in e.args[0]["error"]:
Expand All @@ -42,33 +44,41 @@ def _err_msg(e: Exception) -> str:
raise e
return err_msg

def start_caller_event_loop(loop: asyncio.BaseEventLoop) -> None:
"""
Used to start a second event loop in a separate thread which is used to reduce congestion on the main event loop.
This allows dank_mids to better communicate with your node while you abuse it with heavy loads.
"""
asyncio.set_event_loop(loop)
loop.run_forever()


class DankMiddlewareController:
def __init__(self, w3: Web3) -> None:
assert w3.eth.is_async and isinstance(w3.provider, AsyncBaseProvider), "Dank Middleware can only be applied to an asycnhronous Web3 instance."
self.w3 = w3
self.sync_w3 = Web3(provider = HTTPProvider(self.w3.provider.endpoint_uri))
self.w3: Web3 = w3
self.sync_w3: Web3 = Web3(provider = HTTPProvider(self.w3.provider.endpoint_uri))
# Can't pickle middlewares to send to process executor
self.sync_w3.middleware_onion.clear()
self.sync_w3.provider.middlewares = tuple()
self.DO_NOT_BATCH = set()
self.pending_calls = defaultdict(dict)
self.completed_calls = defaultdict(dict)
self.DO_NOT_BATCH: Set[str] = set()
self.pending_calls: defaultdict = defaultdict(dict)
self.in_process_calls: defaultdict = defaultdict(dict)
self.completed_calls: defaultdict = defaultdict(dict)
self.batcher = multicall.multicall.batcher
self.process_executor = multicall.utils.process_pool_executor
self._bid = 0 # batch id
self._mid = 0 # multicall id
self._cid = 0 # call id
self._bid_lock = threading.Lock()
self._mid_lock = threading.Lock()
self._cid_lock = threading.Lock()
self._last_seen_cid = 0
self._initializing = False
self._is_configured = False
self._is_executing = False
self._pools_closed = False
self._checkpoint = time()
self._instance = len(instances)
self.caller_event_loop = asyncio.new_event_loop()
threading.Thread(target=lambda: start_caller_event_loop(self.caller_event_loop)).start()
self._bid: int = 0 # batch id
self._mid: int = 0 # multicall id
self._cid: int = 0 # call id
self._bid_lock: threading.Lock = threading.Lock()
self._mid_lock: threading.Lock = threading.Lock()
self._cid_lock: threading.Lock = threading.Lock()
self._initializing: bool = False
self._is_configured: bool = False
self._pools_closed: bool = False
self._checkpoint: float = time()
self._instance: int = len(instances)
instances.append(self)

def __repr__(self) -> str:
Expand All @@ -94,6 +104,7 @@ def next_mid(self) -> int:
@property
def next_cid(self) -> int:
""" Returns the next unique call id. """
self._checkpoint = time()
return self._increment('cid')

@sort_lazy_logger
Expand All @@ -119,50 +130,51 @@ async def add_to_queue(self, params: Any) -> int:

async def await_response(self, block: str, cid: int) -> RPCResponse:
while cid not in self.completed_calls[block]:
assert cid in self.pending_calls[block], f"Something went wrong, call{cid} is missing from `pending_calls`."
if self.queue_is_ready() and self.loop_is_ready():
await self.execute_multicall()
if cid in self.in_process_calls[block]:
while cid not in self.completed_calls[block]:
await asyncio.sleep(0)
return self.fetch_response(block,cid)
assert cid in self.pending_calls[block] or cid in self.in_process_calls[block], f"Something went wrong, call{cid} is missing from `pending_calls`."
if self.loop_is_ready() or self.queue_is_full(block):
asyncio.run_coroutine_threadsafe(self.execute_multicall(), self.caller_event_loop).result()
await asyncio.sleep(LOOP_INTERVAL)
self._last_seen_cid = self._cid
return self.fetch_response(block,cid)

def queue_is_ready(self) -> bool:
if not self._is_executing and self._cid == self._last_seen_cid and len(self.pending_calls):
return True
return False

def loop_is_ready(self) -> bool:
return time() - self._checkpoint > LOOP_INTERVAL

def queue_is_full(self, block) -> bool:
return len(self.pending_calls[block]) >= self.batcher.step * 25

def fetch_response(self, block: str, cid: int) -> RPCResponse:
response = self.completed_calls[block].pop(cid)
if not self.completed_calls[block]:
self.completed_calls.pop(block)
return response
return self.completed_calls[block].pop(cid)

async def execute_multicall(self) -> None:
i = 0
while self._cid_lock.locked():
if i // 50 == int(i // 50):
if i // 500 == int(i // 500):
main_logger.debug('lock is locked')
i += 1
await asyncio.sleep(.1)
if self._is_executing:
return
demo_logger.info(f'executing multicall (current cid: {self._cid})')
self._is_executing = True
self._pools_closed = True
with self._cid_lock:
pending_calls = list(self.pending_calls.items())
calls_to_exec = []
blocks = list(self.pending_calls.keys())
for block in blocks:
calls = self.pending_calls.pop(block)
for cid, params in calls.items():
self.in_process_calls[block][cid] = params
calls_to_exec.append((block, calls))
self._pools_closed = False
await gather([self.process_block(block, calls) for block, calls in pending_calls])
self._is_executing = False
demo_logger.info(f'executing multicall (current cid: {self._cid})')
await gather([self.process_block(block, calls) for block, calls in calls_to_exec])
demo_logger.info('multicall complete')

async def process_block(self, block: str, calls: Dict[int,List]) -> None:
calls = [[cid, [params[0]['to'], HexBytes(params[0]['data'])]] for cid, params in calls.items()]
demo_logger.info(f'executing {len(calls)} calls for block {block}')
batches = self.batcher.batch_calls(calls, self.batcher.step)
batches = await gather([self.process_batch(batch,block) for batch in batches])
await gather([self.process_batch(batch,block) for batch in batches])

async def process_batch(self, batch: List, block: str, bid: Optional[int] = None) -> None:
if bid is None:
Expand Down Expand Up @@ -217,7 +229,7 @@ def _multicall_for_block(self, block: str) -> multicall.Call:
state_override_code=OVERRIDE_CODE,
)

async def spoof_response(self, cid: int, block: str, params: List, data: Optional[Union[str,Exception]]) -> RPCResponse:
async def spoof_response(self, cid: int, block: str, params: List, data: Optional[Union[str,Exception]]) -> None:
""" Creates a spoof rpc response for call # `cid` using data returned from multicall2. """
if (
# If multicall failed, try single call to get detailed error.
Expand All @@ -240,15 +252,10 @@ async def spoof_response(self, cid: int, block: str, params: List, data: Optiona
main_logger.debug(f"spoof: {spoof}")
self.completed_calls[block][cid] = spoof

# Pop the call from pending calls
# TODO figure out why cids can be missing from pending_calls when they haven't been popped yet
if cid in self.pending_calls[block]:
self.pending_calls[block].pop(cid)

# Pop the block from pending calls if empty
if not self.pending_calls[block]:
self.pending_calls.pop(block)
return data
# Pop the call from in_process_calls
# TODO figure out why cids can be missing from in_process_calls when they haven't been popped yet
if cid in self.in_process_calls[block]:
self.in_process_calls[block].pop(cid)

async def _setup(self) -> None:
if self._initializing:
Expand Down
4 changes: 1 addition & 3 deletions dank_mids/middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
from web3 import Web3
from web3.types import RPCEndpoint, RPCResponse

from dank_mids._config import SEMAPHORE
from dank_mids.controller import DankMiddlewareController
from dank_mids.types import AsyncMiddleware

Expand All @@ -18,7 +17,6 @@ async def middleware(method: RPCEndpoint, params: Any) -> RPCResponse:
if not dank_mids._is_configured and dank_mids.should_batch(method, params):
await dank_mids._setup()
if dank_mids.should_batch(method, params):
async with SEMAPHORE:
return await dank_mids(params)
return await dank_mids(params)
return await make_request(method, params)
return middleware
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
bobs_lazy_logging>=0.0.2
eth_retry>=0.1.1
eth_retry>=0.1.6,<0.2
multicall>=0.5.1

0 comments on commit d13205e

Please sign in to comment.