Skip to content

Commit

Permalink
Fix err handler (#67)
Browse files Browse the repository at this point in the history
* fix: err handling edge case

* chore: refactor with partial

* feat: patch provider

* fix: session

* feat: log internal  errs
  • Loading branch information
BobTheBuidler authored Apr 24, 2023
1 parent 7e39ca8 commit bb37ccc
Show file tree
Hide file tree
Showing 9 changed files with 126 additions and 76 deletions.
4 changes: 2 additions & 2 deletions dank_mids/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

from dank_mids._config import LOOP_INTERVAL
from dank_mids._demo_mode import demo_logger
from dank_mids.helpers import _session
from dank_mids.loggers import main_logger, sort_lazy_logger
from dank_mids.requests import JSONRPCBatch, Multicall, RPCRequest, eth_call
from dank_mids.types import BlockId, ChainId
Expand All @@ -27,8 +28,7 @@

def _sync_w3_from_async(w3: Web3) -> Web3:
assert w3.eth.is_async and isinstance(w3.provider, AsyncBaseProvider), "Dank Middleware can only be applied to an asycnhronous Web3 instance."
sync_provider = HTTPProvider(w3.provider.endpoint_uri)
sync_w3: Web3 = Web3(provider = sync_provider)
sync_w3: Web3 = Web3(provider = HTTPProvider(w3.provider.endpoint_uri, {'timeout': 600}, session=_session.get_session()))
# We can't pickle middlewares to send to process executor.
# The call has already passed thru all middlewares on the user's Web3 instance.
sync_w3.middleware_onion.clear()
Expand Down
16 changes: 16 additions & 0 deletions dank_mids/helpers/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import asyncio as _asyncio
from typing import Awaitable as _Awaitable
from typing import Iterable as _Iterable

from tqdm.asyncio import tqdm_asyncio as _tqdm_asyncio

from dank_mids.helpers.w3 import setup_dank_w3, setup_dank_w3_from_sync


async def await_all(futs: _Iterable[_Awaitable], verbose: bool = False) -> None:
# NOTE: 'verbose' is mainly for debugging but feel free to have fun
if not isinstance(verbose, bool):
raise NotImplementedError(verbose)
as_completed = _tqdm_asyncio.as_completed if verbose else _asyncio.as_completed
for fut in as_completed(futs if isinstance(futs, list) else [*futs]):
await fut
6 changes: 6 additions & 0 deletions dank_mids/helpers/_middleware/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@

from dank_mids.helpers._middleware.poa import geth_poa_middleware

"""
Everything in this module is in web3.py now, but dank_mids wants to support versions that predates them.
"""
Original file line number Diff line number Diff line change
@@ -1,81 +1,19 @@

import asyncio
from typing import (Any, Awaitable, Callable, Coroutine, Iterable, List,
Literal, Optional)

from eth_utils.curried import (apply_formatter_if, apply_formatters_to_dict,
apply_key_map, is_null)
from eth_utils.toolz import assoc, complement, compose, merge
from hexbytes import HexBytes
from multicall.utils import get_async_w3
from tqdm.asyncio import tqdm_asyncio
from web3 import Web3
from web3._utils.rpc_abi import RPC
from web3.providers.async_base import AsyncBaseProvider
from web3.providers.base import BaseProvider
from typing import Any, Callable, Coroutine, Literal, Optional

from eth_utils.toolz import assoc, merge
from web3.types import Formatters, FormattersDict, RPCEndpoint, RPCResponse

from dank_mids.middleware import dank_middleware
from dank_mids.types import AsyncMiddleware

dank_w3s: List[Web3] = []

def setup_dank_w3(async_w3: Web3) -> Web3:
""" Injects Dank Middleware into an async Web3 instance. """
assert async_w3.eth.is_async and isinstance(async_w3.provider, AsyncBaseProvider)
# NOTE: We use this lookup to prevent errs where 2 project dependencies both depend on dank_mids and eth-brownie.
if async_w3 not in dank_w3s:
async_w3.middleware_onion.inject(dank_middleware, layer=0)
async_w3.middleware_onion.add(geth_poa_middleware)
dank_w3s.append(async_w3)
return async_w3

def setup_dank_w3_from_sync(sync_w3: Web3) -> Web3:
""" Creates a new async Web3 instance from `w3.provider.endpoint_uri` and injects it with Dank Middleware. """
assert not sync_w3.eth.is_async and isinstance(sync_w3.provider, BaseProvider)
return setup_dank_w3(get_async_w3(sync_w3))

async def await_all(futs: Iterable[Awaitable], verbose: bool = False) -> None:
# NOTE: 'verbose' is mainly for debugging but feel free to have fun
if not isinstance(verbose, bool):
raise NotImplementedError(verbose)
as_completed = tqdm_asyncio.as_completed if verbose else asyncio.as_completed
for fut in as_completed(futs if isinstance(futs, list) else [*futs]):
await fut

# Everything below is in web3.py now, but dank_mids currently needs a version that predates them.

is_not_null = complement(is_null)
from web3 import Web3

FORMATTER_DEFAULTS: FormattersDict = {
"request_formatters": {},
"result_formatters": {},
"error_formatters": {},
}

remap_geth_poa_fields = apply_key_map(
{
"extraData": "proofOfAuthorityData",
}
)

pythonic_geth_poa = apply_formatters_to_dict(
{
"proofOfAuthorityData": HexBytes,
}
)

geth_poa_cleanup = compose(pythonic_geth_poa, remap_geth_poa_fields)

async def geth_poa_middleware(make_request: Callable[[RPCEndpoint, Any], Any], w3: Web3) -> AsyncMiddleware:
middleware = await async_construct_formatting_middleware(
result_formatters={
RPC.eth_getBlockByHash: apply_formatter_if(is_not_null, geth_poa_cleanup),
RPC.eth_getBlockByNumber: apply_formatter_if(is_not_null, geth_poa_cleanup),
},
)
return await middleware(make_request, w3) # type: ignore [arg-type, return-value]

async def async_construct_formatting_middleware(
request_formatters: Optional[Formatters] = None,
result_formatters: Optional[Formatters] = None,
Expand Down Expand Up @@ -146,4 +84,4 @@ def _format_response(
elif "error" in response and method in error_formatters:
return _format_response("error", error_formatters[method])
else:
return response
return response
40 changes: 40 additions & 0 deletions dank_mids/helpers/_middleware/poa.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@


from typing import Any, Callable

from eth_utils.curried import (apply_formatter_if, apply_formatters_to_dict,
apply_key_map, is_null)
from eth_utils.toolz import complement, compose
from hexbytes import HexBytes
from web3 import Web3
from web3._utils.rpc_abi import RPC
from web3.types import RPCEndpoint

from dank_mids.helpers._middleware.formatting import \
async_construct_formatting_middleware
from dank_mids.types import AsyncMiddleware

is_not_null = complement(is_null)

remap_geth_poa_fields = apply_key_map(
{
"extraData": "proofOfAuthorityData",
}
)

pythonic_geth_poa = apply_formatters_to_dict(
{
"proofOfAuthorityData": HexBytes,
}
)

geth_poa_cleanup = compose(pythonic_geth_poa, remap_geth_poa_fields)

async def geth_poa_middleware(make_request: Callable[[RPCEndpoint, Any], Any], w3: Web3) -> AsyncMiddleware:
middleware = await async_construct_formatting_middleware(
result_formatters={
RPC.eth_getBlockByHash: apply_formatter_if(is_not_null, geth_poa_cleanup),
RPC.eth_getBlockByNumber: apply_formatter_if(is_not_null, geth_poa_cleanup),
},
)
return await middleware(make_request, w3) # type: ignore [arg-type, return-value]
14 changes: 14 additions & 0 deletions dank_mids/helpers/_session.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@

from functools import lru_cache

from requests import Session
from requests.adapters import HTTPAdapter


@lru_cache(maxsize=1)
def get_session() -> Session:
adapter = HTTPAdapter(pool_connections=100, pool_maxsize=100)
session = Session()
session.mount("http://", adapter)
session.mount("https://", adapter)
return session
27 changes: 27 additions & 0 deletions dank_mids/helpers/w3.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@

from typing import List

from multicall.utils import get_async_w3
from web3 import Web3
from web3.providers.async_base import AsyncBaseProvider
from web3.providers.base import BaseProvider

from dank_mids.helpers._middleware import geth_poa_middleware
from dank_mids.middleware import dank_middleware

dank_w3s: List[Web3] = []

def setup_dank_w3(async_w3: Web3) -> Web3:
""" Injects Dank Middleware into an async Web3 instance. """
assert async_w3.eth.is_async and isinstance(async_w3.provider, AsyncBaseProvider)
# NOTE: We use this lookup to prevent errs where 2 project dependencies both depend on dank_mids and eth-brownie.
if async_w3 not in dank_w3s:
async_w3.middleware_onion.inject(dank_middleware, layer=0)
async_w3.middleware_onion.add(geth_poa_middleware)
dank_w3s.append(async_w3)
return async_w3

def setup_dank_w3_from_sync(sync_w3: Web3) -> Web3:
""" Creates a new async Web3 instance from `w3.provider.endpoint_uri` and injects it with Dank Middleware. """
assert not sync_w3.eth.is_async and isinstance(sync_w3.provider, BaseProvider)
return setup_dank_w3(get_async_w3(sync_w3))
22 changes: 15 additions & 7 deletions dank_mids/requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import asyncio
import threading
from collections import defaultdict
from functools import partial
from time import time
from typing import (TYPE_CHECKING, Any, DefaultDict, Dict, Generator, Generic,
Iterable, Iterator, List, Optional, Tuple, TypeVar, Union)
Expand All @@ -25,7 +26,7 @@
MAX_JSONRPC_BATCH_SIZE)
from dank_mids._demo_mode import demo_logger
from dank_mids.constants import BAD_HEXES, OVERRIDE_CODE
from dank_mids.helpers import await_all
from dank_mids.helpers import _session, await_all
from dank_mids.loggers import main_logger
from dank_mids.types import BatchId, BlockId, JsonrpcParams, RpcCallJson, T

Expand Down Expand Up @@ -326,8 +327,8 @@ class Multicall(_Batch[eth_call]):
""" Runs in worker thread. One-time use."""
method = "eth_call"
fourbyte = function_signature_to_4byte_selector("tryBlockAndAggregate(bool,(address,bytes)[])")
input_types = "(bool,(address,bytes)[])"
output_types = "(uint256,uint256,(bool,bytes)[])"
encode_args = partial(encode_single, "(bool,(address,bytes)[])")
decode_response = partial(run_in_subprocess, partial(decode_single, "(uint256,uint256,(bool,bytes)[])"))

def __init__(
self,
Expand All @@ -347,7 +348,7 @@ def block(self) -> BlockId:

@property
def calldata(self) -> str:
return (self.fourbyte + encode_single(self.input_types, [False, [[call.target, call.calldata] for call in self.calls]])).hex()
return (self.fourbyte + Multicall.encode_args([False, [[call.target, call.calldata] for call in self.calls]])).hex()

@property
def target(self) -> ChecksumAddress:
Expand Down Expand Up @@ -398,7 +399,7 @@ async def set_response(self, data: Union[bytes, str, Exception], wakeup_main_loo
await await_all(call.set_response(data, wakeup_main_loop=False) for call in self.calls)
else:
decoded: List[Tuple[bool, bytes]]
_, _, decoded = await run_in_subprocess(decode_single, self.output_types, to_bytes(data))
_, _, decoded = await Multicall.decode_response(to_bytes(data))
await await_all(call.set_response(data, wakeup_main_loop=False) for call, (_, data) in zip(self.calls, decoded))
if wakeup_main_loop:
self.set_done()
Expand All @@ -410,12 +411,18 @@ def _post_ensure_future_cleanup(self) -> None:
self.controller.pending_eth_calls.pop(self.block)


session = _session.get_session()

def post_sync(endpoint, data) -> Union[bytes, Tuple[str, Exception]]:
response = requests.post(endpoint, json=data)
response = session.post(endpoint, json=data)
try:
response.raise_for_status()
return response.json()
except requests.HTTPError as e:
return e.args[0], e
except Exception as e:
return response._content.decode(), e
# This really shouldn't be running but just in case
return response.reason, e

class JSONRPCBatch(_Batch[Union[Multicall, RPCRequest]]):
def __init__(self, worker: "DankWorker", calls: List[Union[Multicall, RPCRequest]] = [], jid: Optional[BatchId] = None) -> None:
Expand Down Expand Up @@ -479,6 +486,7 @@ async def post_sync(self) -> Union[Dict, List[bytes]]:
elif isinstance(response, Tuple):
decoded, e = response
counts = self.method_counts
main_logger.error(f"{e.__class__.__name__}: {e}")
main_logger.info(f"json batch id: {self.jid} | len: {len(self)} | total calls: {self.total_calls}", )
main_logger.info(f"methods called: {counts}")
if 'content length too large' in decoded or decoded == "":
Expand Down
1 change: 1 addition & 0 deletions examples/dank_brownie_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import asyncio

from brownie import Contract, web3

from dank_mids.brownie_patch import patch_contract
from dank_mids.helpers import setup_dank_w3_from_sync

Expand Down

0 comments on commit bb37ccc

Please sign in to comment.