From bb37ccc7f6ad8eaa14cfffc797879cac8073db3c Mon Sep 17 00:00:00 2001 From: BobTheBuidler <70677534+BobTheBuidler@users.noreply.github.com> Date: Mon, 24 Apr 2023 16:58:52 -0400 Subject: [PATCH] Fix err handler (#67) * fix: err handling edge case * chore: refactor with partial * feat: patch provider * fix: session * feat: log internal errs --- dank_mids/controller.py | 4 +- dank_mids/helpers/__init__.py | 16 +++++ dank_mids/helpers/_middleware/__init__.py | 6 ++ .../_middleware/formatting.py} | 72 ++----------------- dank_mids/helpers/_middleware/poa.py | 40 +++++++++++ dank_mids/helpers/_session.py | 14 ++++ dank_mids/helpers/w3.py | 27 +++++++ dank_mids/requests.py | 22 ++++-- examples/dank_brownie_example.py | 1 + 9 files changed, 126 insertions(+), 76 deletions(-) create mode 100644 dank_mids/helpers/__init__.py create mode 100644 dank_mids/helpers/_middleware/__init__.py rename dank_mids/{helpers.py => helpers/_middleware/formatting.py} (50%) create mode 100644 dank_mids/helpers/_middleware/poa.py create mode 100644 dank_mids/helpers/_session.py create mode 100644 dank_mids/helpers/w3.py diff --git a/dank_mids/controller.py b/dank_mids/controller.py index 14f9fdd2..b78e3bb3 100644 --- a/dank_mids/controller.py +++ b/dank_mids/controller.py @@ -15,6 +15,7 @@ from dank_mids._config import LOOP_INTERVAL from dank_mids._demo_mode import demo_logger +from dank_mids.helpers import _session from dank_mids.loggers import main_logger, sort_lazy_logger from dank_mids.requests import JSONRPCBatch, Multicall, RPCRequest, eth_call from dank_mids.types import BlockId, ChainId @@ -27,8 +28,7 @@ def _sync_w3_from_async(w3: Web3) -> Web3: assert w3.eth.is_async and isinstance(w3.provider, AsyncBaseProvider), "Dank Middleware can only be applied to an asycnhronous Web3 instance." - sync_provider = HTTPProvider(w3.provider.endpoint_uri) - sync_w3: Web3 = Web3(provider = sync_provider) + sync_w3: Web3 = Web3(provider = HTTPProvider(w3.provider.endpoint_uri, {'timeout': 600}, session=_session.get_session())) # We can't pickle middlewares to send to process executor. # The call has already passed thru all middlewares on the user's Web3 instance. sync_w3.middleware_onion.clear() diff --git a/dank_mids/helpers/__init__.py b/dank_mids/helpers/__init__.py new file mode 100644 index 00000000..238931e8 --- /dev/null +++ b/dank_mids/helpers/__init__.py @@ -0,0 +1,16 @@ +import asyncio as _asyncio +from typing import Awaitable as _Awaitable +from typing import Iterable as _Iterable + +from tqdm.asyncio import tqdm_asyncio as _tqdm_asyncio + +from dank_mids.helpers.w3 import setup_dank_w3, setup_dank_w3_from_sync + + +async def await_all(futs: _Iterable[_Awaitable], verbose: bool = False) -> None: + # NOTE: 'verbose' is mainly for debugging but feel free to have fun + if not isinstance(verbose, bool): + raise NotImplementedError(verbose) + as_completed = _tqdm_asyncio.as_completed if verbose else _asyncio.as_completed + for fut in as_completed(futs if isinstance(futs, list) else [*futs]): + await fut \ No newline at end of file diff --git a/dank_mids/helpers/_middleware/__init__.py b/dank_mids/helpers/_middleware/__init__.py new file mode 100644 index 00000000..86f56ae4 --- /dev/null +++ b/dank_mids/helpers/_middleware/__init__.py @@ -0,0 +1,6 @@ + +from dank_mids.helpers._middleware.poa import geth_poa_middleware + +""" +Everything in this module is in web3.py now, but dank_mids wants to support versions that predates them. +""" diff --git a/dank_mids/helpers.py b/dank_mids/helpers/_middleware/formatting.py similarity index 50% rename from dank_mids/helpers.py rename to dank_mids/helpers/_middleware/formatting.py index b41fc7b5..25b4d319 100644 --- a/dank_mids/helpers.py +++ b/dank_mids/helpers/_middleware/formatting.py @@ -1,51 +1,12 @@ -import asyncio -from typing import (Any, Awaitable, Callable, Coroutine, Iterable, List, - Literal, Optional) -from eth_utils.curried import (apply_formatter_if, apply_formatters_to_dict, - apply_key_map, is_null) -from eth_utils.toolz import assoc, complement, compose, merge -from hexbytes import HexBytes -from multicall.utils import get_async_w3 -from tqdm.asyncio import tqdm_asyncio -from web3 import Web3 -from web3._utils.rpc_abi import RPC -from web3.providers.async_base import AsyncBaseProvider -from web3.providers.base import BaseProvider +from typing import Any, Callable, Coroutine, Literal, Optional + +from eth_utils.toolz import assoc, merge from web3.types import Formatters, FormattersDict, RPCEndpoint, RPCResponse -from dank_mids.middleware import dank_middleware from dank_mids.types import AsyncMiddleware - -dank_w3s: List[Web3] = [] - -def setup_dank_w3(async_w3: Web3) -> Web3: - """ Injects Dank Middleware into an async Web3 instance. """ - assert async_w3.eth.is_async and isinstance(async_w3.provider, AsyncBaseProvider) - # NOTE: We use this lookup to prevent errs where 2 project dependencies both depend on dank_mids and eth-brownie. - if async_w3 not in dank_w3s: - async_w3.middleware_onion.inject(dank_middleware, layer=0) - async_w3.middleware_onion.add(geth_poa_middleware) - dank_w3s.append(async_w3) - return async_w3 - -def setup_dank_w3_from_sync(sync_w3: Web3) -> Web3: - """ Creates a new async Web3 instance from `w3.provider.endpoint_uri` and injects it with Dank Middleware. """ - assert not sync_w3.eth.is_async and isinstance(sync_w3.provider, BaseProvider) - return setup_dank_w3(get_async_w3(sync_w3)) - -async def await_all(futs: Iterable[Awaitable], verbose: bool = False) -> None: - # NOTE: 'verbose' is mainly for debugging but feel free to have fun - if not isinstance(verbose, bool): - raise NotImplementedError(verbose) - as_completed = tqdm_asyncio.as_completed if verbose else asyncio.as_completed - for fut in as_completed(futs if isinstance(futs, list) else [*futs]): - await fut - -# Everything below is in web3.py now, but dank_mids currently needs a version that predates them. - -is_not_null = complement(is_null) +from web3 import Web3 FORMATTER_DEFAULTS: FormattersDict = { "request_formatters": {}, @@ -53,29 +14,6 @@ async def await_all(futs: Iterable[Awaitable], verbose: bool = False) -> None: "error_formatters": {}, } -remap_geth_poa_fields = apply_key_map( - { - "extraData": "proofOfAuthorityData", - } -) - -pythonic_geth_poa = apply_formatters_to_dict( - { - "proofOfAuthorityData": HexBytes, - } -) - -geth_poa_cleanup = compose(pythonic_geth_poa, remap_geth_poa_fields) - -async def geth_poa_middleware(make_request: Callable[[RPCEndpoint, Any], Any], w3: Web3) -> AsyncMiddleware: - middleware = await async_construct_formatting_middleware( - result_formatters={ - RPC.eth_getBlockByHash: apply_formatter_if(is_not_null, geth_poa_cleanup), - RPC.eth_getBlockByNumber: apply_formatter_if(is_not_null, geth_poa_cleanup), - }, - ) - return await middleware(make_request, w3) # type: ignore [arg-type, return-value] - async def async_construct_formatting_middleware( request_formatters: Optional[Formatters] = None, result_formatters: Optional[Formatters] = None, @@ -146,4 +84,4 @@ def _format_response( elif "error" in response and method in error_formatters: return _format_response("error", error_formatters[method]) else: - return response + return response \ No newline at end of file diff --git a/dank_mids/helpers/_middleware/poa.py b/dank_mids/helpers/_middleware/poa.py new file mode 100644 index 00000000..939608d8 --- /dev/null +++ b/dank_mids/helpers/_middleware/poa.py @@ -0,0 +1,40 @@ + + +from typing import Any, Callable + +from eth_utils.curried import (apply_formatter_if, apply_formatters_to_dict, + apply_key_map, is_null) +from eth_utils.toolz import complement, compose +from hexbytes import HexBytes +from web3 import Web3 +from web3._utils.rpc_abi import RPC +from web3.types import RPCEndpoint + +from dank_mids.helpers._middleware.formatting import \ + async_construct_formatting_middleware +from dank_mids.types import AsyncMiddleware + +is_not_null = complement(is_null) + +remap_geth_poa_fields = apply_key_map( + { + "extraData": "proofOfAuthorityData", + } +) + +pythonic_geth_poa = apply_formatters_to_dict( + { + "proofOfAuthorityData": HexBytes, + } +) + +geth_poa_cleanup = compose(pythonic_geth_poa, remap_geth_poa_fields) + +async def geth_poa_middleware(make_request: Callable[[RPCEndpoint, Any], Any], w3: Web3) -> AsyncMiddleware: + middleware = await async_construct_formatting_middleware( + result_formatters={ + RPC.eth_getBlockByHash: apply_formatter_if(is_not_null, geth_poa_cleanup), + RPC.eth_getBlockByNumber: apply_formatter_if(is_not_null, geth_poa_cleanup), + }, + ) + return await middleware(make_request, w3) # type: ignore [arg-type, return-value] \ No newline at end of file diff --git a/dank_mids/helpers/_session.py b/dank_mids/helpers/_session.py new file mode 100644 index 00000000..c0de0158 --- /dev/null +++ b/dank_mids/helpers/_session.py @@ -0,0 +1,14 @@ + +from functools import lru_cache + +from requests import Session +from requests.adapters import HTTPAdapter + + +@lru_cache(maxsize=1) +def get_session() -> Session: + adapter = HTTPAdapter(pool_connections=100, pool_maxsize=100) + session = Session() + session.mount("http://", adapter) + session.mount("https://", adapter) + return session \ No newline at end of file diff --git a/dank_mids/helpers/w3.py b/dank_mids/helpers/w3.py new file mode 100644 index 00000000..2fc6ee3d --- /dev/null +++ b/dank_mids/helpers/w3.py @@ -0,0 +1,27 @@ + +from typing import List + +from multicall.utils import get_async_w3 +from web3 import Web3 +from web3.providers.async_base import AsyncBaseProvider +from web3.providers.base import BaseProvider + +from dank_mids.helpers._middleware import geth_poa_middleware +from dank_mids.middleware import dank_middleware + +dank_w3s: List[Web3] = [] + +def setup_dank_w3(async_w3: Web3) -> Web3: + """ Injects Dank Middleware into an async Web3 instance. """ + assert async_w3.eth.is_async and isinstance(async_w3.provider, AsyncBaseProvider) + # NOTE: We use this lookup to prevent errs where 2 project dependencies both depend on dank_mids and eth-brownie. + if async_w3 not in dank_w3s: + async_w3.middleware_onion.inject(dank_middleware, layer=0) + async_w3.middleware_onion.add(geth_poa_middleware) + dank_w3s.append(async_w3) + return async_w3 + +def setup_dank_w3_from_sync(sync_w3: Web3) -> Web3: + """ Creates a new async Web3 instance from `w3.provider.endpoint_uri` and injects it with Dank Middleware. """ + assert not sync_w3.eth.is_async and isinstance(sync_w3.provider, BaseProvider) + return setup_dank_w3(get_async_w3(sync_w3)) diff --git a/dank_mids/requests.py b/dank_mids/requests.py index f5f09900..24f9ce6b 100644 --- a/dank_mids/requests.py +++ b/dank_mids/requests.py @@ -3,6 +3,7 @@ import asyncio import threading from collections import defaultdict +from functools import partial from time import time from typing import (TYPE_CHECKING, Any, DefaultDict, Dict, Generator, Generic, Iterable, Iterator, List, Optional, Tuple, TypeVar, Union) @@ -25,7 +26,7 @@ MAX_JSONRPC_BATCH_SIZE) from dank_mids._demo_mode import demo_logger from dank_mids.constants import BAD_HEXES, OVERRIDE_CODE -from dank_mids.helpers import await_all +from dank_mids.helpers import _session, await_all from dank_mids.loggers import main_logger from dank_mids.types import BatchId, BlockId, JsonrpcParams, RpcCallJson, T @@ -326,8 +327,8 @@ class Multicall(_Batch[eth_call]): """ Runs in worker thread. One-time use.""" method = "eth_call" fourbyte = function_signature_to_4byte_selector("tryBlockAndAggregate(bool,(address,bytes)[])") - input_types = "(bool,(address,bytes)[])" - output_types = "(uint256,uint256,(bool,bytes)[])" + encode_args = partial(encode_single, "(bool,(address,bytes)[])") + decode_response = partial(run_in_subprocess, partial(decode_single, "(uint256,uint256,(bool,bytes)[])")) def __init__( self, @@ -347,7 +348,7 @@ def block(self) -> BlockId: @property def calldata(self) -> str: - return (self.fourbyte + encode_single(self.input_types, [False, [[call.target, call.calldata] for call in self.calls]])).hex() + return (self.fourbyte + Multicall.encode_args([False, [[call.target, call.calldata] for call in self.calls]])).hex() @property def target(self) -> ChecksumAddress: @@ -398,7 +399,7 @@ async def set_response(self, data: Union[bytes, str, Exception], wakeup_main_loo await await_all(call.set_response(data, wakeup_main_loop=False) for call in self.calls) else: decoded: List[Tuple[bool, bytes]] - _, _, decoded = await run_in_subprocess(decode_single, self.output_types, to_bytes(data)) + _, _, decoded = await Multicall.decode_response(to_bytes(data)) await await_all(call.set_response(data, wakeup_main_loop=False) for call, (_, data) in zip(self.calls, decoded)) if wakeup_main_loop: self.set_done() @@ -410,12 +411,18 @@ def _post_ensure_future_cleanup(self) -> None: self.controller.pending_eth_calls.pop(self.block) +session = _session.get_session() + def post_sync(endpoint, data) -> Union[bytes, Tuple[str, Exception]]: - response = requests.post(endpoint, json=data) + response = session.post(endpoint, json=data) try: + response.raise_for_status() return response.json() + except requests.HTTPError as e: + return e.args[0], e except Exception as e: - return response._content.decode(), e + # This really shouldn't be running but just in case + return response.reason, e class JSONRPCBatch(_Batch[Union[Multicall, RPCRequest]]): def __init__(self, worker: "DankWorker", calls: List[Union[Multicall, RPCRequest]] = [], jid: Optional[BatchId] = None) -> None: @@ -479,6 +486,7 @@ async def post_sync(self) -> Union[Dict, List[bytes]]: elif isinstance(response, Tuple): decoded, e = response counts = self.method_counts + main_logger.error(f"{e.__class__.__name__}: {e}") main_logger.info(f"json batch id: {self.jid} | len: {len(self)} | total calls: {self.total_calls}", ) main_logger.info(f"methods called: {counts}") if 'content length too large' in decoded or decoded == "": diff --git a/examples/dank_brownie_example.py b/examples/dank_brownie_example.py index b6aaaf1a..2bc1c6a4 100644 --- a/examples/dank_brownie_example.py +++ b/examples/dank_brownie_example.py @@ -10,6 +10,7 @@ import asyncio from brownie import Contract, web3 + from dank_mids.brownie_patch import patch_contract from dank_mids.helpers import setup_dank_w3_from_sync