diff --git a/dank_mids/ENVIRONMENT_VARIABLES.py b/dank_mids/ENVIRONMENT_VARIABLES.py index 2636d718..53d07352 100644 --- a/dank_mids/ENVIRONMENT_VARIABLES.py +++ b/dank_mids/ENVIRONMENT_VARIABLES.py @@ -13,7 +13,9 @@ logger = logging.getLogger("dank_mids.envs") if not typed_envs.logger.disabled: - logger.info("For your information, you can tweak your configuration for optimal performance using any of the envs below:") + logger.info( + "For your information, you can tweak your configuration for optimal performance using any of the envs below:" + ) ############### # ENVIRONMENT # @@ -34,19 +36,29 @@ DEMO_MODE = _envs.create_env("DEMO_MODE", bool, default=demo_mode, verbose=False) # Are you calling a ganache fork? Can't use state override code -ganache_fork = _envs._deprecated_format.create_env("GANACHE_FORK", bool, default=False, verbose=False) +ganache_fork = _envs._deprecated_format.create_env( + "GANACHE_FORK", bool, default=False, verbose=False +) GANACHE_FORK = _envs.create_env("GANACHE_FORK", bool, default=ganache_fork) """Flag indicating whether the current environment is a Ganache fork.""" # We set the default to 20 minutes to account for potentially long event loop times if you're doing serious work. -AIOHTTP_TIMEOUT = _envs.create_env("AIOHTTP_TIMEOUT", int, default=20*60, string_converter=int) +AIOHTTP_TIMEOUT = _envs.create_env("AIOHTTP_TIMEOUT", int, default=20 * 60, string_converter=int) """Timeout value in seconds for aiohttp requests.""" # Brownie call Semaphore # Used because I experienced some OOM errs due to web3 formatters when I was batching an absurd number of brownie calls. # We need a separate semaphore here because the method-specific semaphores are too late in the code to prevent this OOM issue. -brownie_semaphore = _envs._deprecated_format.create_env("BROWNIE_CALL_SEMAPHORE", int, default=100_000, string_converter=int, verbose=False) -BROWNIE_CALL_SEMAPHORE = _envs.create_env("BROWNIE_CALL_SEMAPHORE", BlockSemaphore, default=brownie_semaphore, string_converter=int, verbose=not OPERATION_MODE.infura) +brownie_semaphore = _envs._deprecated_format.create_env( + "BROWNIE_CALL_SEMAPHORE", int, default=100_000, string_converter=int, verbose=False +) +BROWNIE_CALL_SEMAPHORE = _envs.create_env( + "BROWNIE_CALL_SEMAPHORE", + BlockSemaphore, + default=brownie_semaphore, + string_converter=int, + verbose=not OPERATION_MODE.infura, +) """ Semaphore for limiting concurrent Brownie calls. @@ -54,7 +66,13 @@ :class:`dank_mids.semaphores.BlockSemaphore`: The semaphore class used for concurrency control. """ -BROWNIE_ENCODER_SEMAPHORE = _envs.create_env("BROWNIE_ENCODER_SEMAPHORE", BlockSemaphore, default=BROWNIE_CALL_SEMAPHORE._default_value * 2, string_converter=int, verbose=not OPERATION_MODE.infura) +BROWNIE_ENCODER_SEMAPHORE = _envs.create_env( + "BROWNIE_ENCODER_SEMAPHORE", + BlockSemaphore, + default=BROWNIE_CALL_SEMAPHORE._default_value * 2, + string_converter=int, + verbose=not OPERATION_MODE.infura, +) """ Semaphore for limiting concurrent Brownie encoding operations. This limits memory consumption. @@ -65,7 +83,13 @@ # Processes for decoding. This determines process pool size, not total subprocess count. # There are 3 pools, each initialized with the same value. # NOTE: Don't stress, these are good for you and will not hog your cpu. You can disable them by setting the var = 0. #TODO: lol u cant yet -BROWNIE_ENCODER_PROCESSES = _envs.create_env("BROWNIE_ENCODER_PROCESSES", AsyncProcessPoolExecutor, default=0 if OPERATION_MODE.infura else 1, string_converter=int, verbose=not OPERATION_MODE.infura) +BROWNIE_ENCODER_PROCESSES = _envs.create_env( + "BROWNIE_ENCODER_PROCESSES", + AsyncProcessPoolExecutor, + default=0 if OPERATION_MODE.infura else 1, + string_converter=int, + verbose=not OPERATION_MODE.infura, +) """ Process pool for Brownie encoding operations. @@ -73,7 +97,13 @@ :class:`a_sync.AsyncProcessPoolExecutor`: The executor class used for managing asynchronous processes. """ -BROWNIE_DECODER_PROCESSES = _envs.create_env("BROWNIE_DECODER_PROCESSES", AsyncProcessPoolExecutor, default=0 if OPERATION_MODE.infura else 1, string_converter=int, verbose=not OPERATION_MODE.infura) +BROWNIE_DECODER_PROCESSES = _envs.create_env( + "BROWNIE_DECODER_PROCESSES", + AsyncProcessPoolExecutor, + default=0 if OPERATION_MODE.infura else 1, + string_converter=int, + verbose=not OPERATION_MODE.infura, +) """ Process pool for Brownie decoding operations. @@ -81,7 +111,13 @@ :class:`a_sync.AsyncProcessPoolExecutor`: The executor class used for managing asynchronous processes. """ -MULTICALL_DECODER_PROCESSES = _envs.create_env("MULTICALL_DECODER_PROCESSES", AsyncProcessPoolExecutor, default=0 if OPERATION_MODE.infura else 1, string_converter=int, verbose=not OPERATION_MODE.infura) +MULTICALL_DECODER_PROCESSES = _envs.create_env( + "MULTICALL_DECODER_PROCESSES", + AsyncProcessPoolExecutor, + default=0 if OPERATION_MODE.infura else 1, + string_converter=int, + verbose=not OPERATION_MODE.infura, +) """ Process pool for Multicall decoding operations. @@ -89,7 +125,13 @@ :class:`a_sync.AsyncProcessPoolExecutor`: The executor class used for managing asynchronous processes. """ -COLLECTION_FACTOR = _envs.create_env("COLLECTION_FACTOR", int, default=10 if OPERATION_MODE.infura else 1, string_converter=int, verbose=not OPERATION_MODE.infura) +COLLECTION_FACTOR = _envs.create_env( + "COLLECTION_FACTOR", + int, + default=10 if OPERATION_MODE.infura else 1, + string_converter=int, + verbose=not OPERATION_MODE.infura, +) """Factor determining the size of data collection operations.""" # We use a modified version of the request spec that doesn't contain unnecessary fields, and switch to the full spec if necessary for your node. @@ -102,32 +144,70 @@ # TODO: implement this EXPORT_STATS = _envs.create_env("EXPORT_STATS", bool, default=False, verbose=False) # NOTE: COLLECT_STATS is implemented -COLLECT_STATS = _envs.create_env("COLLECT_STATS", bool, default=EXPORT_STATS, verbose=not EXPORT_STATS) +COLLECT_STATS = _envs.create_env( + "COLLECT_STATS", bool, default=EXPORT_STATS, verbose=not EXPORT_STATS +) # You probably don't need to use this unless you know you need to -STUCK_CALL_TIMEOUT = _envs.create_env("STUCK_CALL_TIMEOUT", int, default=60*60*2) +STUCK_CALL_TIMEOUT = _envs.create_env("STUCK_CALL_TIMEOUT", int, default=60 * 60 * 2) # Method-specific Semaphores method_semaphores: Dict[str, a_sync.Semaphore] = { - "eth_call": _envs.create_env("ETH_CALL_SEMAPHORE", BlockSemaphore, default=BROWNIE_CALL_SEMAPHORE._value, string_converter=int, verbose=False), - "eth_getBlock": _envs.create_env("ETH_GETBLOCK_SEMAPHORE", a_sync.Semaphore, default=1_000, string_converter=int, verbose=False), - "eth_getLogs": _envs.create_env("ETH_GETLOGS_SEMAPHORE", a_sync.Semaphore, default=64, string_converter=int, verbose=False), - "eth_getTransaction": _envs.create_env("ETH_GETTRANSACTION_SEMAPHORE", a_sync.Semaphore, default=1_000, string_converter=int, verbose=False), - "eth_getCode": _envs.create_env("ETH_GETCODE_SEMAPHORE", a_sync.Semaphore, default=5_000, string_converter=int, verbose=False), + "eth_call": _envs.create_env( + "ETH_CALL_SEMAPHORE", + BlockSemaphore, + default=BROWNIE_CALL_SEMAPHORE._value, + string_converter=int, + verbose=False, + ), + "eth_getBlock": _envs.create_env( + "ETH_GETBLOCK_SEMAPHORE", + a_sync.Semaphore, + default=1_000, + string_converter=int, + verbose=False, + ), + "eth_getLogs": _envs.create_env( + "ETH_GETLOGS_SEMAPHORE", a_sync.Semaphore, default=64, string_converter=int, verbose=False + ), + "eth_getTransaction": _envs.create_env( + "ETH_GETTRANSACTION_SEMAPHORE", + a_sync.Semaphore, + default=1_000, + string_converter=int, + verbose=False, + ), + "eth_getCode": _envs.create_env( + "ETH_GETCODE_SEMAPHORE", + a_sync.Semaphore, + default=5_000, + string_converter=int, + verbose=False, + ), } if not typed_envs.logger.disabled: logger.info("More details can be found in dank_mids/ENVIRONMENT_VARIABLES.py") - logger.info("NOTE: You can disable these logs by setting the `TYPEDENVS_SHUTUP` env var to any value.") + logger.info( + "NOTE: You can disable these logs by setting the `TYPEDENVS_SHUTUP` env var to any value." + ) # Validate some stuffs # NOTE: The other modes are (probably) bugging out right now. More investigation needed. For now you use infura mode. if not OPERATION_MODE.infura: - logger.warning("Unless you know what you're doing, dank mids should be run in infura mode for now") + logger.warning( + "Unless you know what you're doing, dank mids should be run in infura mode for now" + ) if OPERATION_MODE.infura: - for process_pool in {MULTICALL_DECODER_PROCESSES, BROWNIE_DECODER_PROCESSES, BROWNIE_ENCODER_PROCESSES}: + for process_pool in { + MULTICALL_DECODER_PROCESSES, + BROWNIE_DECODER_PROCESSES, + BROWNIE_ENCODER_PROCESSES, + }: if process_pool._max_workers: - raise ValueError(f"You cannot set env var {process_pool.name} while running dank in infura mode.") + raise ValueError( + f"You cannot set env var {process_pool.name} while running dank in infura mode." + ) diff --git a/dank_mids/__init__.py b/dank_mids/__init__.py index 434b87c4..12c60a64 100644 --- a/dank_mids/__init__.py +++ b/dank_mids/__init__.py @@ -1,6 +1,11 @@ from contextlib import suppress -from dank_mids.brownie_patch import DankContractCall, DankContractMethod, DankContractTx, DankOverloadedMethod +from dank_mids.brownie_patch import ( + DankContractCall, + DankContractMethod, + DankContractTx, + DankOverloadedMethod, +) from dank_mids.controller import instances from dank_mids.exceptions import BrownieNotConnectedError from dank_mids.helpers import setup_dank_w3, setup_dank_w3_from_sync @@ -14,14 +19,16 @@ def _configure_concurrent_future_work_queue_size(): """ Configures the concurrent futures process pool to allow for a larger number of queued calls. - + This function increases the EXTRA_QUEUED_CALLS value to 50,000, which allows for more concurrent operations to be queued in the process pool. This can significantly improve performance for applications that make heavy use of brownie. """ import concurrent.futures.process as _cfp + _cfp.EXTRA_QUEUED_CALLS = 50_000 + _configure_concurrent_future_work_queue_size() @@ -29,6 +36,7 @@ def _configure_concurrent_future_work_queue_size(): __brownie_objects = ["Contract", "dank_web3", "web3", "dank_eth", "eth", "patch_contract"] with suppress(ImportError): from dank_mids.brownie_patch import Contract, dank_eth, dank_web3, patch_contract + __all__ += __brownie_objects # aliased for cleanliness and convenience web3 = dank_web3 @@ -46,5 +54,3 @@ def __getattr__(name: str): if name in __brownie_objects: raise BrownieNotConnectedError(name) raise AttributeError(f"module '{__name__}' has no attribute '{name}'") - - diff --git a/dank_mids/_batch.py b/dank_mids/_batch.py index 113e56d2..40b1eae0 100644 --- a/dank_mids/_batch.py +++ b/dank_mids/_batch.py @@ -17,6 +17,7 @@ logger = logging.getLogger(__name__) + class DankBatch: """ A batch of JSON-RPC batches. @@ -32,9 +33,14 @@ class DankBatch: :class:`dank_mids._requests.RPCRequest`: The RPCRequest class used in this batch. """ - __slots__ = 'controller', 'multicalls', 'rpc_calls', '_started' + __slots__ = "controller", "multicalls", "rpc_calls", "_started" - def __init__(self, controller: "DankMiddlewareController", multicalls: Multicalls, rpc_calls: List[Union[Multicall, RPCRequest]]): + def __init__( + self, + controller: "DankMiddlewareController", + multicalls: Multicalls, + rpc_calls: List[Union[Multicall, RPCRequest]], + ): self.controller = controller """The controller managing this batch.""" @@ -46,7 +52,7 @@ def __init__(self, controller: "DankMiddlewareController", multicalls: Multicall self._started = False """A flag indicating whether the batch has been started.""" - + def __await__(self) -> Generator[Any, None, Any]: """ Makes the DankBatch awaitable. @@ -59,11 +65,11 @@ def __await__(self) -> Generator[Any, None, Any]: """ self.start() return self._await().__await__() - + async def _await(self) -> None: """ Internal method to await the completion of all coroutines in the batch. - + This method gathers all coroutines in the batch and awaits their completion, logging any exceptions that may occur during execution. It's designed to handle both successful completions and potential errors gracefully. @@ -76,7 +82,10 @@ async def _await(self) -> None: for batch, result in zip(batches, await asyncio.gather(*batches, return_exceptions=True)): if isinstance(result, Exception): if not isinstance(result, DankMidsInternalError): - logger.error(f"That's not good, there was an exception in a {batch.__class__.__name__}. These are supposed to be handled.\n{result}\n", exc_info=True) + logger.error( + f"That's not good, there was an exception in a {batch.__class__.__name__}. These are supposed to be handled.\n{result}\n", + exc_info=True, + ) raise result def start(self) -> None: @@ -96,7 +105,7 @@ def start(self) -> None: for call in self.rpc_calls: call.start(self) self._started = True - + @property def coroutines(self) -> Generator[Union["_Batch", Awaitable[RawResponse]], None, None]: # Combine multicalls into one or more jsonrpc batches @@ -116,7 +125,7 @@ def coroutines(self) -> Generator[Union["_Batch", Awaitable[RawResponse]], None, if working_batch.is_full: yield working_batch working_batch = JSONRPCBatch(self.controller) - + rpc_calls_to_batch = self.rpc_calls[:] while rpc_calls_to_batch: if working_batch.is_full: diff --git a/dank_mids/_demo_mode.py b/dank_mids/_demo_mode.py index ecafa56d..0b1a2408 100644 --- a/dank_mids/_demo_mode.py +++ b/dank_mids/_demo_mode.py @@ -21,5 +21,6 @@ def info(self, *args: Any, **kwargs: Any) -> None: **kwargs: Keyword arguments (ignored). """ + # Choose between a real logger and a dummy logger based on the demo mode setting demo_logger: logging.Logger = logging.getLogger("dank_mids.demo") if ENVIRONMENT_VARIABLES.DEMO_MODE else DummyLogger() # type: ignore [attr-defined, assignment] diff --git a/dank_mids/_envs.py b/dank_mids/_envs.py index 73ac0959..1fb5c776 100644 --- a/dank_mids/_envs.py +++ b/dank_mids/_envs.py @@ -1,4 +1,3 @@ - from typed_envs import EnvVarFactory _factory = EnvVarFactory("DANKMIDS") diff --git a/dank_mids/_exceptions.py b/dank_mids/_exceptions.py index 6374fc47..4618475c 100644 --- a/dank_mids/_exceptions.py +++ b/dank_mids/_exceptions.py @@ -12,29 +12,38 @@ logger = logging.getLogger("dank_mids.exceptions") + class BadResponse(ValueError): """Exception raised when an unexpected or invalid response is received.""" + def __init__(self, response: "PartialResponse") -> None: self.response = response super().__init__(response.to_dict()) + class EmptyBatch(ValueError): """Exception raised when attempting to process an empty batch.""" + class ResponseNotReady(ValueError): """Exception raised when trying to access a response that isn't ready yet.""" + class PayloadTooLarge(BadResponse): """Exception raised when the payload for a request is too large.""" + class ExceedsMaxBatchSize(BadResponse): """Exception raised when a batch operation exceeds the maximum allowed size for the RPC.""" + @property def limit(self) -> int: - return int(re.search(r'batch limit (\d+) exceeded', self.response.error.message)[1]) # type: ignore [index, union-attr] + return int(re.search(r"batch limit (\d+) exceeded", self.response.error.message)[1]) # type: ignore [index, union-attr] + class DankMidsClientResponseError(ClientResponseError): """A wrapper around the standard aiohttp ClientResponseError that attaches the request that generated the error.""" + def __init__( self, exc: ClientResponseError, @@ -46,9 +55,15 @@ def __init__( It contains information about the request that led to this error. """ - super().__init__(exc.request_info, exc.history, status=exc.status, message=exc.message, headers=exc.headers) + super().__init__( + exc.request_info, + exc.history, + status=exc.status, + message=exc.message, + headers=exc.headers, + ) self.args = (*self.args, request) # type: ignore [has-type] - + self._exception = exc """ The original ClientResponseError that this class is wrapping. @@ -56,7 +71,10 @@ def __init__( """ -internal_err_types = Union[AttributeError, TypeError, UnboundLocalError, NotImplementedError, RuntimeError, SyntaxError] +internal_err_types = Union[ + AttributeError, TypeError, UnboundLocalError, NotImplementedError, RuntimeError, SyntaxError +] + class DankMidsInternalError(Exception): def __init__(self, e: Union[ValueError, internal_err_types]) -> None: @@ -69,13 +87,19 @@ def __init__(self, e: Union[ValueError, internal_err_types]) -> None: """ super().__init__(e.__repr__()) - + class BatchResponseSortError(Exception): """ A `BatchResponseSortError` indicates your RPC needs some special handling to properly handle batch calls / responses. """ - def __init__(self, controller: "DankMiddlewareController", calls: List["RPCRequest"], response: List["RawResponse"]) -> None: + + def __init__( + self, + controller: "DankMiddlewareController", + calls: List["RPCRequest"], + response: List["RawResponse"], + ) -> None: self.calls = calls """ A list of RPCRequest objects representing the calls that were made in the batch. @@ -87,13 +111,17 @@ def __init__(self, controller: "DankMiddlewareController", calls: List["RPCReque A list of Response objects representing the decoded results from the RPC call. This can be used to analyze the responses that couldn't be properly sorted. """ - - super().__init__(f"This will not mess up your run but will make things needlessly slow. Please show this to Bob. endpoint={controller.endpoint} client_version={controller.client_version} calls={[call.uid for call in calls]} response={self.results}") + + super().__init__( + f"This will not mess up your run but will make things needlessly slow. Please show this to Bob. endpoint={controller.endpoint} client_version={controller.client_version} calls={[call.uid for call in calls]} response={self.results}" + ) + class ChainstackRateLimited(BadResponse): """ Chainstack doesn't use 429 for rate limiting, it sends a successful 200 response back to the rpc with an error message so our usual rate-limiting handlers don't work and we need to handle that case with bespoke logic. """ + @property def try_again_in(self) -> float: """ @@ -105,7 +133,7 @@ def try_again_in(self) -> float: Raises: NotImplementedError: If the time format is not recognized. """ - decimal_string = self.response.error.data['try_again_in'] + decimal_string = self.response.error.data["try_again_in"] if "ms" in decimal_string: ms = float(decimal_string[:-2]) logger.warning("rate limited by chainstack, retrying in %sms", ms) diff --git a/dank_mids/_mode.py b/dank_mids/_mode.py index 69da6fed..cc62c59a 100644 --- a/dank_mids/_mode.py +++ b/dank_mids/_mode.py @@ -1,8 +1,8 @@ - from functools import cached_property MODES = ["default", "application", "infura"] + class OperationMode(str): """ Enum-like class to define different operation modes for dank_mids. @@ -10,19 +10,25 @@ class OperationMode(str): These modes can affect how dank_mids optimizes requests and interacts with different Ethereum node providers. """ + @cached_property def application(self) -> bool: # This mode keeps the event loop as unblocked as possible so an asyncio application can run as designed return self.mode == "application" + @cached_property def default(self) -> bool: return self.mode == "default" + @cached_property def infura(self) -> bool: # This mode minimizes the total number of calls sent to the node return self.mode == "infura" + @property def mode(self) -> str: if self not in MODES: - raise ValueError(f'dank mids operation mode {self} is invalid', f'valid modes: {MODES}', str(self)) + raise ValueError( + f"dank mids operation mode {self} is invalid", f"valid modes: {MODES}", str(self) + ) return self diff --git a/dank_mids/_requests.py b/dank_mids/_requests.py index 3dd92d80..cfbaba76 100644 --- a/dank_mids/_requests.py +++ b/dank_mids/_requests.py @@ -1,4 +1,3 @@ - import abc import asyncio import logging @@ -9,9 +8,23 @@ from contextlib import suppress from functools import cached_property, lru_cache from itertools import chain -from typing import (TYPE_CHECKING, Any, DefaultDict, Dict, Generator, Generic, - Iterable, Iterator, List, NoReturn, Optional, Tuple, TypeVar, - Union, overload) +from typing import ( + TYPE_CHECKING, + Any, + DefaultDict, + Dict, + Generator, + Generic, + Iterable, + Iterator, + List, + NoReturn, + Optional, + Tuple, + TypeVar, + Union, + overload, +) import a_sync import eth_retry @@ -28,36 +41,57 @@ from dank_mids import ENVIRONMENT_VARIABLES as ENVS from dank_mids import _debugging, constants, stats from dank_mids._demo_mode import demo_logger -from dank_mids._exceptions import (BadResponse, BatchResponseSortError, ChainstackRateLimited, - DankMidsClientResponseError, DankMidsInternalError, - EmptyBatch, ExceedsMaxBatchSize, PayloadTooLarge, - ResponseNotReady, internal_err_types) +from dank_mids._exceptions import ( + BadResponse, + BatchResponseSortError, + ChainstackRateLimited, + DankMidsClientResponseError, + DankMidsInternalError, + EmptyBatch, + ExceedsMaxBatchSize, + PayloadTooLarge, + ResponseNotReady, + internal_err_types, +) from dank_mids._uid import _AlertingRLock from dank_mids.helpers import _codec, _session from dank_mids.helpers._helpers import set_done -from dank_mids.types import (BatchId, BlockId, JSONRPCBatchResponse, - JsonrpcParams, PartialRequest, PartialResponse, - RawResponse, Request, Response) +from dank_mids.types import ( + BatchId, + BlockId, + JSONRPCBatchResponse, + JsonrpcParams, + PartialRequest, + PartialResponse, + RawResponse, + Request, + Response, +) if TYPE_CHECKING: from dank_mids._batch import DankBatch - from dank_mids.controller import (DankMiddlewareController, - _MulticallContract) + from dank_mids.controller import DankMiddlewareController, _MulticallContract logger = logging.getLogger(__name__) -_Response = TypeVar("_Response", Response, List[Response], RPCResponse, List[RPCResponse], RawResponse) +_Response = TypeVar( + "_Response", Response, List[Response], RPCResponse, List[RPCResponse], RawResponse +) + class RPCError(_RPCError, total=False): dankmids_added_context: Dict[str, Any] + class _RequestEvent(a_sync.Event): def __init__(self, owner: "_RequestMeta") -> None: super().__init__(debug_daemon_interval=300) self._owner = weakref.proxy(owner) + def __repr__(self) -> str: return f"<{self.__class__.__name__} object at {hex(id(self))} [{'set' if self.is_set() else 'unset'}, waiter:{self._owner}>" + def set(self): # Make sure we wake up the _RequestEvent's event loop if its in another thread if asyncio.get_running_loop() == self._loop: @@ -65,18 +99,20 @@ def set(self): else: self._loop.call_soon_threadsafe(super().set) + class _RequestMeta(Generic[_Response], metaclass=abc.ABCMeta): - __slots__ = 'controller', 'uid', '_response', '_done', '_start', '_batch', '__weakref__' + __slots__ = "controller", "uid", "_response", "_done", "_start", "_batch", "__weakref__" controller: "DankMiddlewareController" + def __init__(self) -> None: self.uid = self.controller.call_uid.next self._response: Union[_Response, RPCResponse, Exception, None] = None self._done = _RequestEvent(self) self._start = time.time() - + def __await__(self) -> Generator[Any, None, _Response]: return self.get_response().__await__() - + @abc.abstractmethod def __len__(self) -> int: pass @@ -90,7 +126,7 @@ def response(self) -> _Response: @abc.abstractmethod async def get_response(self) -> _Response: pass - + @abc.abstractmethod def start(self, batch: Union["_Batch", "DankBatch", None] = None) -> None: pass @@ -101,7 +137,7 @@ async def _debug_daemon(self) -> None: await asyncio.sleep(60) if not self._done.is_set(): logger.debug(f"{self} has not received data after {time.time() - self._start}s") - + ### Single requests: @@ -111,13 +147,22 @@ async def _debug_daemon(self) -> None: These methods are typically handled separately or have special requirements. """ + @lru_cache(maxsize=None) def _should_batch_method(method: str) -> bool: return all(bypass not in method for bypass in BYPASS_METHODS) + class RPCRequest(_RequestMeta[RawResponse]): - __slots__ = 'method', 'params', 'should_batch', '_started', '_retry', '_daemon' - def __init__(self, controller: "DankMiddlewareController", method: RPCEndpoint, params: Any, retry: bool = False): + __slots__ = "method", "params", "should_batch", "_started", "_retry", "_daemon" + + def __init__( + self, + controller: "DankMiddlewareController", + method: RPCEndpoint, + params: Any, + retry: bool = False, + ): self.controller = controller """The DankMiddlewareController that created this request.""" self.method = method @@ -137,30 +182,34 @@ def __init__(self, controller: "DankMiddlewareController", method: RPCEndpoint, raise NotImplementedError("we should not get here", self) else: self.controller.pending_rpc_calls.append(self) - demo_logger.info(f'added to queue (cid: {self.uid})') # type: ignore + demo_logger.info(f"added to queue (cid: {self.uid})") # type: ignore if logger.isEnabledFor(logging.DEBUG): self._daemon = asyncio.create_task(self._debug_daemon()) - + def __eq__(self, __o: object) -> bool: - return self.uid == __o.uid if isinstance(__o, self.__class__) else False - + return self.uid == __o.uid if isinstance(__o, self.__class__) else False + def __len__(self) -> int: # we dont need to consider this for v small batch sizes if ENVS.MAX_JSONRPC_BATCH_SIZE > 50: # type: ignore [operator] # NOTE: These are totally arbitrary if self.method == "eth_getTransactionReceipt": return 10 - elif any(m in self.method for m in ["eth_getCode" "eth_getBlockBy", "eth_getTransaction"]): + elif any( + m in self.method for m in ["eth_getCode" "eth_getBlockBy", "eth_getTransaction"] + ): return 6 return 1 - + def __repr__(self) -> str: - return f"<{self.__class__.__name__} uid={self.uid} method={self.method} params={self.params}>" + return ( + f"<{self.__class__.__name__} uid={self.uid} method={self.method} params={self.params}>" + ) @property def request(self) -> Union[Request, PartialRequest]: return self.controller.request_type(method=self.method, params=self.params, id=self.uid) - + def start(self, batch: Union["_Batch", "DankBatch"]) -> None: # type: ignore [override] self._started = True self._batch = batch @@ -170,7 +219,7 @@ async def get_response(self) -> RPCResponse: # type: ignore [override] if not self.should_batch: logger.debug(f"bypassed, method is {self.method}") return await self.get_response_unbatched() - + if self._started and not self._batch._started: # NOTE: If we're already started, we filled a batch. Let's await it now so we can send something to the node. await self._batch @@ -188,7 +237,7 @@ async def get_response(self) -> RPCResponse: # type: ignore [override] ) except asyncio.TimeoutError: return await self.create_duplicate() - + try: await asyncio.wait_for(self._done.wait(), timeout=ENVS.STUCK_CALL_TIMEOUT) # type: ignore [arg-type] except asyncio.TimeoutError: @@ -198,23 +247,25 @@ async def get_response(self) -> RPCResponse: # type: ignore [override] if isinstance(self.response, RawResponse): response = self.response.decode(partial=True).to_dict(self.method) error: Optional[RPCError] - if error := response.get('error'): # type: ignore [assignment] - if error['message'].lower() in ['invalid request', 'parse error']: + if error := response.get("error"): # type: ignore [assignment] + if error["message"].lower() in ["invalid request", "parse error"]: if self.controller._time_of_request_type_change == 0: self.controller.request_type = Request self.controller._time_of_request_type_change = time.time() if time.time() - self.controller._time_of_request_type_change <= 600: - logger.debug("your node says the partial request was invalid but its okay, we can use the full jsonrpc spec instead") + logger.debug( + "your node says the partial request was invalid but its okay, we can use the full jsonrpc spec instead" + ) return await self.controller(self.method, self.params) - response['error'] = dict(error) - response['error']['dankmids_added_context'] = self.request + response["error"] = dict(error) + response["error"]["dankmids_added_context"] = self.request # I'm 99.99999% sure that any errd call has no result and we only get this field from mscspec object defs # But I'll check it anyway to be safe - if result := response.pop('result', None): - response['result'] = result + if result := response.pop("result", None): + response["result"] = result logger.debug("error response for %s: %s", self, response) return response - + # If we have an Exception here it came from the goofy sync_call thing I need to get rid of. # We raise it here so it traces back up to the caller if isinstance(self.response, Exception): @@ -222,7 +273,7 @@ async def get_response(self) -> RPCResponse: # type: ignore [override] # Less optimal decoding # TODO: refactor this out return self.response - + @set_done async def get_response_unbatched(self) -> RPCResponse: # type: ignore [override] task = asyncio.create_task(self.make_request()) # type: ignore [arg-type] @@ -231,13 +282,15 @@ async def get_response_unbatched(self) -> RPCResponse: # type: ignore [override await asyncio.wait_for(shielded, timeout=ENVS.STUCK_CALL_TIMEOUT) # type: ignore [arg-type] except asyncio.TimeoutError: # looks like its stuck for some reason, let's try another one - done, pending = await asyncio.wait([task, self.create_duplicate()], return_when=asyncio.FIRST_COMPLETED) + done, pending = await asyncio.wait( + [task, self.create_duplicate()], return_when=asyncio.FIRST_COMPLETED + ) for t in pending: t.cancel() for task in done: return await task return self.response.decode(partial=True).to_dict(self.method) - + @set_done async def spoof_response(self, data: Union[RawResponse, bytes, Exception]) -> None: # sourcery skip: merge-duplicate-blocks @@ -246,17 +299,19 @@ async def spoof_response(self, data: Union[RawResponse, bytes, Exception]) -> No `bytes` type data comes for individual eth_calls that were batched into multicalls and already decoded `Exception` type data comes from failed calls """ - + # New handler if isinstance(data, RawResponse): self._response = data elif isinstance(data, BadResponse): - if data.response.error.message.lower() in ['invalid request', 'parse error']: # type: ignore [union-attr] + if data.response.error.message.lower() in ["invalid request", "parse error"]: # type: ignore [union-attr] if self.controller._time_of_request_type_change == 0: self.controller.request_type = Request self.controller._time_of_request_type_change = int(time.time()) if time.time() - self.controller._time_of_request_type_change <= 600: - logger.debug("your node says the partial request was invalid but its okay, we can use the full jsonrpc spec instead") + logger.debug( + "your node says the partial request was invalid but its okay, we can use the full jsonrpc spec instead" + ) self._response = await self.create_duplicate() return self._response = {"error": __format_error(self.request, data.response)} @@ -268,23 +323,29 @@ async def spoof_response(self, data: Union[RawResponse, bytes, Exception]) -> No elif isinstance(data, bytes): self._response = {"result": data} else: - raise NotImplementedError(f'type {type(data)} not supported for spoofing.', type(data), data) - + raise NotImplementedError( + f"type {type(data)} not supported for spoofing.", type(data), data + ) + @set_done async def make_request(self) -> RawResponse: """Used to execute the request with no batching.""" self._started = True - self._response = await self.controller.make_request(self.method, self.params, request_id=self.uid) + self._response = await self.controller.make_request( + self.method, self.params, request_id=self.uid + ) return self._response - + @property def semaphore(self) -> a_sync.Semaphore: # NOTE: We cannot cache this property so the semaphore control pattern in the `duplicate` fn will work as intended - if self.method == 'eth_call': + if self.method == "eth_call": return self.controller.eth_call_semaphores[self.params[1]] return self.controller.method_semaphores[self.method] # type: ignore [return-value] - - async def create_duplicate(self) -> RPCResponse: # Not actually self, but for typing purposes it is. + + async def create_duplicate( + self, + ) -> RPCResponse: # Not actually self, but for typing purposes it is. # We need to make room since the stalled call is still holding the semaphore self.semaphore.release() # We need to check the semaphore again to ensure we have the right context manager, soon but not right away. @@ -300,12 +361,13 @@ async def create_duplicate(self) -> RPCResponse: # Not actually self, but for ty "invalid opcode", "missing trie node", "resource not found", - "invalid ether transfer", - "error processing call revert", + "invalid ether transfer", + "error processing call revert", } - + revert_threads = PruningThreadPoolExecutor(4) + def _is_call_revert(e: BadResponse) -> bool: """ Determine if a BadResponse was caused by a revert in one of the individual calls within a multicall. @@ -319,6 +381,7 @@ def _is_call_revert(e: BadResponse) -> bool: stre = f"{e}" return any(s in stre for s in INDIVIDUAL_CALL_REVERT_STRINGS) + def _needs_full_request_spec(e: BadResponse): """ Determine if a BadResponse indicates that the node requires the full request specification. @@ -332,21 +395,24 @@ def _needs_full_request_spec(e: BadResponse): class eth_call(RPCRequest): - __slots__ = 'block' - def __init__(self, controller: "DankMiddlewareController", params: Any, retry: bool = False) -> None: - """ Adds a call to the DankMiddlewareContoller's `pending_eth_calls`. """ + __slots__ = "block" + + def __init__( + self, controller: "DankMiddlewareController", params: Any, retry: bool = False + ) -> None: + """Adds a call to the DankMiddlewareContoller's `pending_eth_calls`.""" self.block: BlockId = params[1] """The block height at which the contract will be called.""" super().__init__(controller, "eth_call", params) # type: ignore - + def __repr__(self) -> str: return f"<{self.__class__.__name__} uid={self.uid} params={self.params}>" - + @property def calldata(self) -> HexBytes: """The calldata for the call.""" - return HexBytes(self.params[0]['data']) - + return HexBytes(self.params[0]["data"]) + @property def multicall_compatible(self) -> bool: """True if this contract is multicall compatible, False if not.""" @@ -358,11 +424,13 @@ def target(self) -> ChecksumAddress: return self.params[0]["to"] async def spoof_response(self, data: Union[bytes, Exception, RawResponse]) -> None: # type: ignore - """ Sets and returns a spoof rpc response for this BatchedCall instance using data provided by the worker. """ + """Sets and returns a spoof rpc response for this BatchedCall instance using data provided by the worker.""" # NOTE: If `type(data)` is `bytes`, it is a result from a multicall. If not, `data` comes from a jsonrpc batch. # If this if clause is True, it means the call reverted inside of a multicall but returned a result, without causing the multicall to revert. - if isinstance(data, bytes) and any(data.startswith(selector) for selector in constants.REVERT_SELECTORS): + if isinstance(data, bytes) and any( + data.startswith(selector) for selector in constants.REVERT_SELECTORS + ): # TODO figure out how to include method selector in no_multicall key try: # NOTE: If call response from multicall indicates failure, make sync call to get either: @@ -370,7 +438,12 @@ async def spoof_response(self, data: Union[bytes, Exception, RawResponse]) -> No # - revert details from exception # If we get a successful response, most likely the target contract does not support multicall2. # TODO: Get rid of the sync executor and just use `make_request` - data = await asyncio.get_event_loop().run_in_executor(revert_threads, self.controller.sync_w3.eth.call, {"to": self.target, "data": self.calldata}, self.block) + data = await asyncio.get_event_loop().run_in_executor( + revert_threads, + self.controller.sync_w3.eth.call, + {"to": self.target, "data": self.calldata}, + self.block, + ) # The single call was successful. We don't want to include this contract in more multicalls self.controller.no_multicall.add(self.target) except Exception as e: @@ -378,7 +451,7 @@ async def spoof_response(self, data: Union[bytes, Exception, RawResponse]) -> No data = e # The above revert catching logic fails to account for pre-decoding RawResponse objects. await super().spoof_response(data) - + @property def semaphore(self) -> a_sync.Semaphore: """ @@ -391,38 +464,39 @@ def semaphore(self) -> a_sync.Semaphore: This property is not cached to ensure the semaphore control pattern in the `duplicate` function works as intended. """ return self.controller.eth_call_semaphores[self.block] - - + ### Batch requests: _Request = TypeVar("_Request", bound=_RequestMeta) + class _Batch(_RequestMeta[List[_Response]], Iterable[_Request]): - __slots__ = '_calls', '_lock', '_daemon' + __slots__ = "_calls", "_lock", "_daemon" _calls: List["weakref.ref[_Request]"] + def __init__(self, controller: "DankMiddlewareController", calls: Iterable[_Request]): self.controller = controller self._calls = [weakref.ref(call) for call in calls] self._lock = _AlertingRLock(name=self.__class__.__name__) super().__init__() - + def __bool__(self) -> bool: return bool(self.calls) - + @overload - def __getitem__(self, ix: int) -> _Request:... + def __getitem__(self, ix: int) -> _Request: ... @overload - def __getitem__(self, ix: slice) -> List[_Request]:... + def __getitem__(self, ix: slice) -> List[_Request]: ... def __getitem__(self, ix: Union[int, slice]) -> Union[_Request, List[_Request]]: return self.calls[ix] def __iter__(self) -> Iterator[_Request]: return iter(self.calls) - + def __len__(self) -> int: return len(self.calls) - + @property def calls(self) -> List[_Request]: "Returns a list of calls. Creates a temporary strong reference to each call in the batch, if it still exists." @@ -431,34 +505,34 @@ def calls(self) -> List[_Request]: @property def halfpoint(self) -> int: return len(self) // 2 - + @property def bisected(self) -> Generator[List[_Request], None, None]: yield self.chunk0 yield self.chunk1 - + @property def chunk0(self) -> List[_Request]: - return self.calls[:self.halfpoint] - + return self.calls[: self.halfpoint] + @property def chunk1(self) -> List[_Request]: - return self.calls[self.halfpoint:] + return self.calls[self.halfpoint :] @property def is_full(self) -> bool: raise NotImplementedError(type(self).__name__) - + def append(self, call: _Request, skip_check: bool = False) -> None: with self._lock: self._calls.append(weakref.ref(call)) - #self._len += 1 + # self._len += 1 if not skip_check: if self.is_full: self.start() elif self.controller.queue_is_full: self.controller.early_start() - + def extend(self, calls: Iterable[_Request], skip_check: bool = False) -> None: with self._lock: self._calls.extend(weakref.ref(call) for call in calls) @@ -481,26 +555,31 @@ def should_retry(self, e: Exception) -> bool: """Should the _Batch be retried based on `e`?""" if "out of gas" in f"{e}": # TODO Remember which contracts/calls are gas guzzlers - logger.debug('out of gas. cut in half, trying again') + logger.debug("out of gas. cut in half, trying again") elif any(err in f"{e}".lower() for err in constants.RETRY_ERRS): # TODO: use these exceptions to optimize for the user's node - logger.debug('Dank too loud. Bisecting batch and retrying.') + logger.debug("Dank too loud. Bisecting batch and retrying.") elif isinstance(e, BadResponse) and (_needs_full_request_spec(e) or _is_call_revert(e)): pass - elif "429" not in f"{e}" and all(err not in f"{e}".lower() for err in constants.TOO_MUCH_DATA_ERRS): + elif "429" not in f"{e}" and all( + err not in f"{e}".lower() for err in constants.TOO_MUCH_DATA_ERRS + ): logger.warning(f"unexpected {e.__class__.__name__}: {e}") return len(self) > 1 - + def _post_future_cleanup(self) -> None: raise NotImplementedError + mcall_encoder = abi.default_codec._registry.get_encoder("(bool,(address,bytes)[])") mcall_decoder = abi.default_codec._registry.get_decoder("(uint256,uint256,(bool,bytes)[])") + def mcall_encode(data: List[Tuple[bool, bytes]]) -> bytes: return mcall_encoder([False, data]) -def mcall_decode(data: PartialResponse) -> Union[List[Tuple[bool, bytes]], Exception]: + +def mcall_decode(data: PartialResponse) -> Union[List[Tuple[bool, bytes]], Exception]: try: decoded = data.decode_result("eth_call")[2:] # type: ignore [arg-type] decoded = bytes.fromhex(decoded) @@ -509,52 +588,63 @@ def mcall_decode(data: PartialResponse) -> Union[List[Tuple[bool, bytes]], Excep # NOTE: We need to safely bring any Exceptions back out of the ProcessPool try: # We do this goofy thing since we can't `return Exc() from e` - raise e.__class__(*e.args, data.decode_result() if isinstance(data, PartialResponse) else data) from e + raise e.__class__( + *e.args, data.decode_result() if isinstance(data, PartialResponse) else data + ) from e except Exception as new_e: return new_e - + class Multicall(_Batch[RPCResponse, eth_call]): - __slots__ = 'bid', '_started', '__dict__' # We need to specify __dict__ for the cached properties to work + __slots__ = ( + "bid", + "_started", + "__dict__", + ) # We need to specify __dict__ for the cached properties to work method = "eth_call" - fourbyte = function_signature_to_4byte_selector("tryBlockAndAggregate(bool,(address,bytes)[])") + fourbyte = function_signature_to_4byte_selector("tryBlockAndAggregate(bool,(address,bytes)[])") - def __init__(self, controller: "DankMiddlewareController", calls: List[eth_call] = [], bid: Optional[BatchId] = None): + def __init__( + self, + controller: "DankMiddlewareController", + calls: List[eth_call] = [], + bid: Optional[BatchId] = None, + ): # sourcery skip: default-mutable-arg super().__init__(controller, calls) self.bid = bid or self.controller.multicall_uid.next self._started = False - + def __repr__(self) -> str: return f"" - + @cached_property def block(self) -> BlockId: return self.calls[0].block - + @property def calldata(self) -> str: return (self.fourbyte + mcall_encode([[call.target, call.calldata] for call in self.calls])).hex() # type: ignore [misc] - + @property def target(self) -> ChecksumAddress: return self.mcall.address - + @cached_property def mcall(self) -> "_MulticallContract": return self.controller._select_mcall_target_for_block(self.block) - + @property def params(self) -> JsonrpcParams: - params = [{'to': self.target, 'data': f'0x{self.calldata}'}, self.block] + params = [{"to": self.target, "data": f"0x{self.calldata}"}, self.block] if self.needs_override_code and not self.controller.state_override_not_supported: - params.append({self.target: {'code': self.mcall.bytecode}}) + params.append({self.target: {"code": self.mcall.bytecode}}) return params # type: ignore [return-value] - + @property def request(self) -> Union[Request, PartialRequest]: return self.controller.request_type(method=self.method, params=self.params, id=self.uid) - + @property def is_full(self) -> bool: return len(self) >= self.controller.batcher.step @@ -562,53 +652,83 @@ def is_full(self) -> bool: @property def needs_override_code(self) -> bool: return self.mcall.needs_override_code_for_block(self.block) - + async def get_response(self) -> None: # type: ignore [override] if self._started: - logger.error(f'{self} early exit') + logger.error(f"{self} early exit") return self._started = True if (l := len(self)) == 1: return await self._exec_single_call() - #elif l < 50: # TODO play with later + # elif l < 50: # TODO play with later # return await JSONRPCBatch(self.controller, self.calls) rid = self.controller.request_uid.next - demo_logger.info(f'request {rid} for multicall {self.bid} starting') # type: ignore + demo_logger.info(f"request {rid} for multicall {self.bid} starting") # type: ignore try: # create a strong ref to all calls we will execute so they cant get gced mid execution and mess up response ordering calls = self.calls - await self.spoof_response(await self.controller.make_request(self.method, self.params, request_id=self.uid), calls) + await self.spoof_response( + await self.controller.make_request(self.method, self.params, request_id=self.uid), + calls, + ) except internal_err_types.__args__ as e: # type: ignore [attr-defined] - raise e if 'invalid argument' in str(e) else DankMidsInternalError(e) from e + raise e if "invalid argument" in str(e) else DankMidsInternalError(e) from e except ClientResponseError as e: if e.message == "Payload Too Large": logger.info("Payload too large. response headers: %s", e.headers) self.controller.reduce_multicall_size(len(self)) - _debugging.failures.record(self.controller.chain_id, e, type(self).__name__, self.uid, len(self), self.request.data) + _debugging.failures.record( + self.controller.chain_id, + e, + type(self).__name__, + self.uid, + len(self), + self.request.data, + ) else: if _log_exception(e): - _debugging.failures.record(self.controller.chain_id, e, type(self).__name__, self.uid, len(self), self.request.data) + _debugging.failures.record( + self.controller.chain_id, + e, + type(self).__name__, + self.uid, + len(self), + self.request.data, + ) await (self.bisect_and_retry(e) if self.should_retry(e) else self.spoof_response(e)) # type: ignore [misc] except Exception as e: if _log_exception(e): - _debugging.failures.record(self.controller.chain_id, e, type(self).__name__, self.uid, len(self), self.request.data) + _debugging.failures.record( + self.controller.chain_id, + e, + type(self).__name__, + self.uid, + len(self), + self.request.data, + ) await (self.bisect_and_retry(e) if self.should_retry(e) else self.spoof_response(e)) # type: ignore [misc] - demo_logger.info(f'request {rid} for multicall {self.bid} complete') # type: ignore - + demo_logger.info(f"request {rid} for multicall {self.bid} complete") # type: ignore + def should_retry(self, e: Exception) -> bool: """Should the Multicall be retried based on `e`?""" if any(err in f"{e}".lower() for err in constants.RETRY_ERRS): - logger.debug('dank too loud, trying again') + logger.debug("dank too loud, trying again") return True - elif "No state available for block" in f"{e}": # NOTE: While it might look weird, f-string is faster than `str(e)`. - e.args[0]["dankmids_note"] = "You're not using an archive node, and you need one for the application you are attempting to run." + elif ( + "No state available for block" in f"{e}" + ): # NOTE: While it might look weird, f-string is faster than `str(e)`. + e.args[0][ + "dankmids_note" + ] = "You're not using an archive node, and you need one for the application you are attempting to run." return False elif super().should_retry(e): return True return len(self) > 1 - + @set_done - async def spoof_response(self, data: Union[RawResponse, Exception], calls: Optional[List[eth_call]] = None) -> None: + async def spoof_response( + self, data: Union[RawResponse, Exception], calls: Optional[List[eth_call]] = None + ) -> None: # NOTE: we pass in the calls to create a strong reference so when we zip up the results everything gets to the right place if calls is None: calls = self.calls @@ -622,14 +742,21 @@ async def spoof_response(self, data: Union[RawResponse, Exception], calls: Optio elif isinstance(data, RawResponse): response = data.decode(partial=True) if response.error: - logger.debug("%s received an 'error' response from the rpc: %s", self, response.exception) + logger.debug( + "%s received an 'error' response from the rpc: %s", self, response.exception + ) # NOTE: We raise the exception which will be caught, call will be broken up and retried raise response.exception logger.debug("%s received valid bytes from the rpc", self) - await asyncio.gather(*(call.spoof_response(data) for call, (_, data) in zip(calls, await self.decode(response)))) + await asyncio.gather( + *( + call.spoof_response(data) + for call, (_, data) in zip(calls, await self.decode(response)) + ) + ) else: raise NotImplementedError(f"type {type(data)} not supported.", data) - + async def decode(self, data: PartialResponse) -> List[Tuple[bool, bytes]]: start = time.time() if ENVS.OPERATION_MODE.infura: # type: ignore [attr-defined] @@ -650,12 +777,12 @@ async def decode(self, data: PartialResponse) -> List[Tuple[bool, bytes]]: # Raise any Exceptions that may have come out of the process pool. if isinstance(retval, Exception): raise retval.__class__( - *retval.args, - self.request, - f"response: {data}", + *retval.args, + self.request, + f"response: {data}", ) return retval - + @set_done async def bisect_and_retry(self, e: Exception) -> List[RPCResponse]: """ @@ -663,13 +790,19 @@ async def bisect_and_retry(self, e: Exception) -> List[RPCResponse]: Calls `self._done.set()` when finished. """ logger.debug("%s had exception %s, bisecting and retrying", self, e) - batches = [Multicall(self.controller, chunk, f"{self.bid}_{i}") for i, chunk in enumerate(self.bisected)] + batches = [ + Multicall(self.controller, chunk, f"{self.bid}_{i}") + for i, chunk in enumerate(self.bisected) + ] for batch in batches: batch.start(cleanup=False) for batch, result in zip(batches, await asyncio.gather(*batches, return_exceptions=True)): if isinstance(result, Exception): if not isinstance(result, DankMidsInternalError): - logger.error(f"That's not good, there was an exception in a {batch.__class__.__name__}. These are supposed to be handled.\n{result}\n", exc_info=True) + logger.error( + f"That's not good, there was an exception in a {batch.__class__.__name__}. These are supposed to be handled.\n{result}\n", + exc_info=True, + ) raise result @set_done @@ -692,13 +825,13 @@ class JSONRPCBatch(_Batch[RPCResponse, Union[Multicall, RPCRequest]]): the number of separate network calls. """ - __slots__ = 'jid', '_started' + __slots__ = "jid", "_started" def __init__( self, controller: "DankMiddlewareController", - calls: List[Union[Multicall, RPCRequest]] = [], - jid: Optional[BatchId] = None + calls: List[Union[Multicall, RPCRequest]] = [], + jid: Optional[BatchId] = None, ) -> None: # sourcery skip: default-mutable-arg """ Initialize a new JSONRPCBatch. @@ -714,7 +847,7 @@ def __init__( def __repr__(self) -> str: return f"" - + @property def data(self) -> bytes: if not self: @@ -729,7 +862,7 @@ def data(self) -> bytes: except TypeError as e: raise TypeError(e, call.request) from None raise - + @property def is_multicalls_only(self) -> bool: with self._lock: @@ -756,7 +889,7 @@ def method_counts(self) -> Dict[RPCEndpoint, int]: else: counts[call.method] += 1 return dict(counts) - + @property def total_calls(self) -> int: """ @@ -783,11 +916,11 @@ async def get_response(self) -> None: # type: ignore [override] rid = self.controller.request_uid.next if ENVS.DEMO_MODE: # type: ignore [attr-defined] # When demo mode is disabled, we can save some CPU time by skipping this sum - demo_logger.info(f'request {rid} for jsonrpc batch {self.jid} ({sum(len(batch) for batch in self)} calls) starting') # type: ignore + demo_logger.info(f"request {rid} for jsonrpc batch {self.jid} ({sum(len(batch) for batch in self)} calls) starting") # type: ignore try: while True: try: - # NOTE: We do this inline so we never have to allocate the response to memory + # NOTE: We do this inline so we never have to allocate the response to memory await self.spoof_response(*await self.post()) break except ChainstackRateLimited as e: @@ -797,9 +930,11 @@ async def get_response(self) -> None: # type: ignore [override] # I want to see these asap when working on the lib. except internal_err_types.__args__ as e: # type: ignore [attr-defined] - raise e if 'invalid argument' in str(e) else DankMidsInternalError(e) from e + raise e if "invalid argument" in str(e) else DankMidsInternalError(e) from e except EmptyBatch as e: - logger.warning("These EmptyBatch exceptions shouldn't actually happen and this except clause can probably be removed soon.") + logger.warning( + "These EmptyBatch exceptions shouldn't actually happen and this except clause can probably be removed soon." + ) except ExceedsMaxBatchSize as e: logger.warning("exceeded max batch size for your node") self.controller.set_batch_size_limit(e.limit) @@ -810,7 +945,9 @@ async def get_response(self) -> None: # type: ignore [override] await self.bisect_and_retry(e) except Exception as e: if _log_exception(e): - _debugging.failures.record(self.controller.chain_id, e, type(self).__name__, self.uid, len(self), self.data) + _debugging.failures.record( + self.controller.chain_id, e, type(self).__name__, self.uid, len(self), self.data + ) stats.log_errd_batch(self) if self.should_retry(e): await self.bisect_and_retry(e) @@ -818,16 +955,20 @@ async def get_response(self) -> None: # type: ignore [override] # I include this elif clause as a failsafe. This is rare and should not impact performance. elif not self.is_single_multicall: # Just to force my IDE to resolve types correctly - calls : List[RPCRequest] = self.calls # type: ignore [assignment] - logger.debug("%s had exception %s, aborting and setting Exception as call._response", self, e) + calls: List[RPCRequest] = self.calls # type: ignore [assignment] + logger.debug( + "%s had exception %s, aborting and setting Exception as call._response", self, e + ) # NOTE: This means an exception occurred during the post request # AND that the json batch is made of just one rpc request that is not a multicall. - logger.info('does this ever actually run? pretty sure a single-multicall json batch is not possible. can I delete this?') + logger.info( + "does this ever actually run? pretty sure a single-multicall json batch is not possible. can I delete this?" + ) await asyncio.gather(*[call.spoof_response(e) for call in calls]) else: - raise NotImplementedError('and you may ask yourself, well, how did I get here?') - demo_logger.info(f'request {rid} for jsonrpc batch {self.jid} complete') # type: ignore - + raise NotImplementedError("and you may ask yourself, well, how did I get here?") + demo_logger.info(f"request {rid} for jsonrpc batch {self.jid} complete") # type: ignore + @eth_retry.auto_retry async def post(self) -> Tuple[List[RawResponse], List[Union[Multicall, RPCRequest]]]: """ @@ -849,42 +990,57 @@ async def post(self) -> Tuple[List[RawResponse], List[Union[Multicall, RPCReques calls = self.calls # for the multicalls too mcall_calls_strong_refs = [call.calls for call in filter(lambda x: isinstance(x, Multicall), calls)] # type: ignore [union-attr] - response: JSONRPCBatchResponse = await _session.post(self.controller.endpoint, data=self.data, loads=_codec.decode_jsonrpc_batch) + response: JSONRPCBatchResponse = await _session.post( + self.controller.endpoint, data=self.data, loads=_codec.decode_jsonrpc_batch + ) except ClientResponseError as e: if e.message == "Payload Too Large": logger.warning("Payload too large: %s", self.method_counts) self.adjust_batch_size() - elif 'broken pipe' in str(e).lower(): + elif "broken pipe" in str(e).lower(): logger.warning("This is what broke the pipe: %s", self.method_counts) logger.debug("caught %s for %s, reraising", e, self) if ENVS.DEBUG: # type: ignore [attr-defined] - _debugging.failures.record(self.controller.chain_id, e, type(self).__name__, self.uid, len(self), self.data) + _debugging.failures.record( + self.controller.chain_id, e, type(self).__name__, self.uid, len(self), self.data + ) raise except Exception as e: - if 'broken pipe' in str(e).lower(): + if "broken pipe" in str(e).lower(): logger.warning("This is what broke the pipe: %s", self.method_counts) if ENVS.DEBUG: # type: ignore [attr-defined] - _debugging.failures.record(self.controller.chain_id, e, type(self).__name__, self.uid, len(self), self.data) + _debugging.failures.record( + self.controller.chain_id, e, type(self).__name__, self.uid, len(self), self.data + ) raise # NOTE: A successful response will be a list of `RawResponse` objects. # A single `PartialResponse` implies an error. if isinstance(response, list): return response, calls # Oops, we failed. - if response.error.message.lower() in ['invalid request', 'parse error']: # type: ignore [union-attr] + if response.error.message.lower() in ["invalid request", "parse error"]: # type: ignore [union-attr] # NOT SURE IF THIS ACTUALLY RUNS, CAN WE RECEIVE THIS TYPE RESPONSE FOR A JSON BATCH? if self.controller._time_of_request_type_change == 0: self.controller.request_type = Request self.controller._time_of_request_type_change = time.time() if time.time() - self.controller._time_of_request_type_change <= 600: - logger.debug("your node says the partial request was invalid but its okay, we can use the full jsonrpc spec instead") + logger.debug( + "your node says the partial request was invalid but its okay, we can use the full jsonrpc spec instead" + ) elif response.error.message == "batch limit exceeded": # type: ignore [union-attr] self.adjust_batch_size() - + if ENVS.DEBUG: - _debugging.failures.record(self.controller.chain_id, response.exception, type(self).__name__, self.uid, len(self), self.data) + _debugging.failures.record( + self.controller.chain_id, + response.exception, + type(self).__name__, + self.uid, + len(self), + self.data, + ) raise response.exception - + def should_retry(self, e: Exception) -> bool: """ Determine whether the JSONRPCBatch should be retried based on the exception. @@ -897,12 +1053,14 @@ def should_retry(self, e: Exception) -> bool: """ # While it might look weird, f-string is faster than `str(e)`. if "No state available for block" in f"{e}": - logger.debug('No state available for one of the blocks queried. Bisecting batch and retrying.') + logger.debug( + "No state available for one of the blocks queried. Bisecting batch and retrying." + ) return True elif super().should_retry(e): return True return self.is_single_multicall - + @set_done async def spoof_response(self, response: List[RawResponse], calls: List[RPCRequest]) -> None: """ @@ -917,14 +1075,14 @@ async def spoof_response(self, response: List[RawResponse], calls: List[RPCReque calls: A list of RPCRequest objects that were included in the batch. """ # Reaching this point means we made a batch call and we got results. That doesn't mean they're good, but we got 'em. - + if self.controller._sort_calls: # NOTE: these providers don't always return batch results in the correct ordering - # NOTE: is it maybe because they + # NOTE: is it maybe because they calls.sort(key=lambda call: call.uid) if self.controller._sort_response: - response.sort(key = lambda raw: raw.decode().id) + response.sort(key=lambda raw: raw.decode().id) else: for call, raw in zip(calls, response): # TODO: make sure this doesn't ever raise and then delete it @@ -932,14 +1090,17 @@ async def spoof_response(self, response: List[RawResponse], calls: List[RPCReque if call.uid != decoded.id: # Not sure why it works this way raise BatchResponseSortError(self.controller, calls, response) - - for r in await asyncio.gather(*[call.spoof_response(raw) for call, raw in zip(calls, response)], return_exceptions=True): + + for r in await asyncio.gather( + *[call.spoof_response(raw) for call, raw in zip(calls, response)], + return_exceptions=True, + ): # NOTE: By doing this with the exceptions we allow any successful calls to get their results sooner # without being interrupted by the first exc in the gather and having to wait for the bisect and retry process # TODO: stop retrying ones that succeed, that's wasteful if isinstance(r, Exception): raise r - + @set_done async def bisect_and_retry(self, e: Exception) -> None: """ @@ -956,9 +1117,11 @@ async def bisect_and_retry(self, e: Exception) -> None: """ logger.debug("%s had exception %s, retrying", self, e) batches = [ - Multicall(self.controller, chunk[0].calls, f"json{self.jid}_{i}") # type: ignore [misc] - if len(chunk) == 1 and isinstance(chunk[0], Multicall) - else JSONRPCBatch(self.controller, chunk, f"{self.jid}_{i}") + ( + Multicall(self.controller, chunk[0].calls, f"json{self.jid}_{i}") # type: ignore [misc] + if len(chunk) == 1 and isinstance(chunk[0], Multicall) + else JSONRPCBatch(self.controller, chunk, f"{self.jid}_{i}") + ) for i, chunk in enumerate(self.bisected) if chunk ] @@ -967,9 +1130,12 @@ async def bisect_and_retry(self, e: Exception) -> None: for batch, result in zip(batches, await asyncio.gather(*batches, return_exceptions=True)): if isinstance(result, Exception): if not isinstance(result, DankMidsInternalError): - logger.error(f"That's not good, there was an exception in a {batch.__class__.__name__}. These are supposed to be handled.\n{result}\n", exc_info=True) + logger.error( + f"That's not good, there was an exception in a {batch.__class__.__name__}. These are supposed to be handled.\n{result}\n", + exc_info=True, + ) raise result - + def adjust_batch_size(self) -> None: """ Adjust the batch size on the controller based on the type of calls in the batch. @@ -979,10 +1145,12 @@ def adjust_batch_size(self) -> None: updates the batch size accordingly. """ if self.is_multicalls_only: - logger.info('checking if we should reduce multicall batch size... (%s calls)', self.total_calls) + logger.info( + "checking if we should reduce multicall batch size... (%s calls)", self.total_calls + ) self.controller.reduce_multicall_size(self.total_calls) else: - logger.info('checking if we should reduce json batch size... (%s requests)', len(self)) + logger.info("checking if we should reduce json batch size... (%s requests)", len(self)) self.controller.reduce_batch_size(len(self)) stats.logger.devhint( "We still need some better logic for catching these errors and using them to better optimize the batching process" @@ -992,11 +1160,12 @@ def _post_future_cleanup(self) -> None: with self.controller.pools_closed_lock: self.controller.pending_rpc_calls = JSONRPCBatch(self.controller) + def _log_exception(e: Exception) -> bool: """ Log exceptions that occur during a multicall or batch. - This function logs exceptions that are unexpected and considered errors, + This function logs exceptions that are unexpected and considered errors, allowing for better debugging and monitoring. Args: @@ -1007,26 +1176,38 @@ def _log_exception(e: Exception) -> bool: """ # NOTE: These errors are expected during normal use and are not indicative of any problem(s). No need to log them. # TODO: Better filter what we choose to log here - dont_need_to_see_errs = ['out of gas', 'non_empty_data', 'exceeding --rpc.returndata.limit', "'code': 429"] - + dont_need_to_see_errs = [ + "out of gas", + "non_empty_data", + "exceeding --rpc.returndata.limit", + "'code': 429", + ] + dont_need_to_see_errs += [ # We catch and correct these "invalid request", # We pass these down to the call they originated from - *INDIVIDUAL_CALL_REVERT_STRINGS + *INDIVIDUAL_CALL_REVERT_STRINGS, ] stre = str(e).lower() - if all(err not in stre for err in chain(dont_need_to_see_errs, constants.RETRY_ERRS, constants.TOO_MUCH_DATA_ERRS)): - logger.warning("The following exception is being logged for informational purposes and does not indicate failure:") + if all( + err not in stre + for err in chain(dont_need_to_see_errs, constants.RETRY_ERRS, constants.TOO_MUCH_DATA_ERRS) + ): + logger.warning( + "The following exception is being logged for informational purposes and does not indicate failure:" + ) logger.warning(e, exc_info=True) return ENVS.DEBUG # type: ignore [attr-defined,return-value] + def __format_error(request: PartialRequest, response: PartialResponse) -> AttributeDict: error = dict(response.error) # type: ignore [arg-type] - error['dankmids_added_context'] = request + error["dankmids_added_context"] = request return AttributeDict.recursive(error) + def __raise_more_detailed_exc(request: PartialRequest, exc: Exception) -> NoReturn: if isinstance(exc, ClientResponseError): raise DankMidsClientResponseError(exc, request) from exc diff --git a/dank_mids/_uid.py b/dank_mids/_uid.py index 26295410..fc9ad1d7 100644 --- a/dank_mids/_uid.py +++ b/dank_mids/_uid.py @@ -1,30 +1,31 @@ - import logging import threading logger = logging.getLogger(__name__) + class UIDGenerator: def __init__(self) -> None: self._value: int = -1 - self.lock = _AlertingRLock(name='uid') + self.lock = _AlertingRLock(name="uid") """ A custom reentrant lock used to ensure thread-safe access to the UID generator. """ - + @property def latest(self) -> int: - """ Returns the latest unique id. """ + """Returns the latest unique id.""" return self._value - + @property def next(self) -> int: - """ Returns the next unique id. """ + """Returns the next unique id.""" with self.lock: new: int = self._value + 1 self._value = new return new + class _AlertingRLock(threading._RLock): # type: ignore [misc] def __init__(self, name: str) -> None: super().__init__() @@ -32,6 +33,7 @@ def __init__(self, name: str) -> None: """ A string identifier for the lock, used for debugging and logging purposes. """ + def acquire(self, blocking: bool = True, timeout: int = -1) -> bool: # type: ignore [override] acquired = super().acquire(blocking=False, timeout=5) if not acquired: diff --git a/dank_mids/constants.py b/dank_mids/constants.py index 5de41edf..89382d6e 100644 --- a/dank_mids/constants.py +++ b/dank_mids/constants.py @@ -5,14 +5,25 @@ from eth_typing import BlockNumber from multicall.constants import Network -TOO_MUCH_DATA_ERRS = ["payload too large", "content length too large", "request entity too large", "batch limit exceeded"] +TOO_MUCH_DATA_ERRS = [ + "payload too large", + "content length too large", + "request entity too large", + "batch limit exceeded", +] """ A list of error messages indicating that the request sent to the RPC was too large and must be split up. These error messages are used to identify when a request needs to be broken into smaller chunks. """ -RETRY_ERRS = ["connection reset by peer", "server disconnected", "execution aborted (timeout =", "batch limit exceeded", "request timed out"] +RETRY_ERRS = [ + "connection reset by peer", + "server disconnected", + "execution aborted (timeout =", + "batch limit exceeded", + "request timed out", +] """ A list of error messages that are expected during normal use and are not indicative of any problem(s). @@ -61,7 +72,7 @@ """A dictionary mapping networks to the block numbers where Multicall3 was deployed.""" # When you get these call responses back from the multicall, we know there was some problem with execution. -# If you make the exact same calls without multicall, you will get an Exception not a response. +# If you make the exact same calls without multicall, you will get an Exception not a response. # TODO: Delete these BAD_HEXES = [ # Chainlink feeds no access @@ -89,7 +100,7 @@ # Not sure why yet but sometimes a multicall will succeed but one of the results will be a failure for one call that doesn't interrupt the rest of the mcall. # NOTE: we leave off the '0x' so we can compare raw bytes # NOTE: The 2nd one here needs to be converted to the first format but I need to encounter one in the wild before I can do that -REVERT_SELECTORS = [b'\x08\xc3y\xa0', b"4e487b71"] +REVERT_SELECTORS = [b"\x08\xc3y\xa0", b"4e487b71"] """ A list of byte strings representing revert selectors. diff --git a/dank_mids/controller.py b/dank_mids/controller.py index 73421caa..4cb81359 100644 --- a/dank_mids/controller.py +++ b/dank_mids/controller.py @@ -1,4 +1,3 @@ - import logging from collections import defaultdict from functools import lru_cache @@ -22,8 +21,7 @@ from dank_mids._uid import UIDGenerator, _AlertingRLock from dank_mids.helpers import _codec, _helpers, _session from dank_mids.semaphores import _MethodQueues, _MethodSemaphores, BlockSemaphore -from dank_mids.types import (BlockId, ChainId, PartialRequest, RawResponse, - Request) +from dank_mids.types import BlockId, ChainId, PartialRequest, RawResponse, Request try: from multicall.constants import MULTICALL3_ADDRESSES @@ -34,6 +32,7 @@ instances: DefaultDict[ChainId, List["DankMiddlewareController"]] = defaultdict(list) + class DankMiddlewareController: """ Controller for managing Dank Middleware operations. @@ -52,8 +51,8 @@ def __init__(self, w3: Web3) -> None: Args: w3: The Web3 instance used to make RPC requests. """ - logger.info('Dank Middleware initializing... Strap on your rocket boots...') - + logger.info("Dank Middleware initializing... Strap on your rocket boots...") + self.w3: Web3 = w3 """The Web3 instance used to make rpc requests.""" @@ -67,7 +66,9 @@ def __init__(self, w3: Web3) -> None: """The client version for the currently connected rpc.""" # NOTE: We need this mutable for node types that require the full jsonrpc spec - self.request_type = Request if ENVS.USE_FULL_REQUEST or "reth" in self.client_version else PartialRequest + self.request_type = ( + Request if ENVS.USE_FULL_REQUEST or "reth" in self.client_version else PartialRequest + ) """The Struct class the controller will use to encode requests.""" self._time_of_request_type_change: Union[int, float] = 0 @@ -89,7 +90,7 @@ def __init__(self, w3: Web3) -> None: if "tenderly" in self.endpoint and ENVS.MAX_JSONRPC_BATCH_SIZE > 10: # type: ignore [operator] logger.info("max jsonrpc batch size for tenderly is 10, overriding existing max") self.set_batch_size_limit(10) - + self._instance: int = sum(len(_instances) for _instances in instances.values()) instances[self.chain_id].append(self) # type: ignore @@ -97,22 +98,32 @@ def __init__(self, w3: Web3) -> None: multicall2 = MULTICALL2_ADDRESSES.get(self.chain_id) multicall3 = MULTICALL3_ADDRESSES.get(self.chain_id) if multicall2 is None and multicall3 is None: - raise NotImplementedError("Dank Mids currently does not support this network.\nTo add support, you just need to submit a PR adding the appropriate multicall contract addresses to this file:\nhttps://github.com/banteg/multicall.py/blob/master/multicall/constants.py") - - self.mc2 = _MulticallContract( - address = to_checksum_address(multicall2), - # TODO: copypasta deploy block dict - deploy_block = constants.MULTICALL3_DEPLOY_BLOCKS.get(self.chain_id), - bytecode = constants.MULTICALL2_OVERRIDE_CODE, - ) if multicall2 else None - - self.mc3 = _MulticallContract( - address = to_checksum_address(multicall3), - # TODO: copypasta deploy block dict - deploy_block = constants.MULTICALL3_DEPLOY_BLOCKS.get(self.chain_id), - bytecode = constants.MULTICALL3_OVERRIDE_CODE, - ) if multicall3 else None - + raise NotImplementedError( + "Dank Mids currently does not support this network.\nTo add support, you just need to submit a PR adding the appropriate multicall contract addresses to this file:\nhttps://github.com/banteg/multicall.py/blob/master/multicall/constants.py" + ) + + self.mc2 = ( + _MulticallContract( + address=to_checksum_address(multicall2), + # TODO: copypasta deploy block dict + deploy_block=constants.MULTICALL3_DEPLOY_BLOCKS.get(self.chain_id), + bytecode=constants.MULTICALL2_OVERRIDE_CODE, + ) + if multicall2 + else None + ) + + self.mc3 = ( + _MulticallContract( + address=to_checksum_address(multicall3), + # TODO: copypasta deploy block dict + deploy_block=constants.MULTICALL3_DEPLOY_BLOCKS.get(self.chain_id), + bytecode=constants.MULTICALL3_OVERRIDE_CODE, + ) + if multicall3 + else None + ) + self.no_multicall: Set[ChecksumAddress] = set() """A set of addresses that have issues when called from the multicall contract. Calls to these contracts will not be batched in multicalls.""" @@ -122,7 +133,7 @@ def __init__(self, w3: Web3) -> None: self.no_multicall.add(self.mc2.address) if self.mc3: self.no_multicall.add(self.mc3.address) - + self.method_semaphores = _MethodSemaphores(self) # TODO: refactor this out self.eth_call_semaphores: BlockSemaphore = self.method_semaphores["eth_call"] # type: ignore [assignment] """Used for managing concurrency of eth_calls.""" @@ -146,14 +157,16 @@ def __init__(self, w3: Web3) -> None: """Unique identifier generator for RPC requests.""" self.jsonrpc_batch_uid: UIDGenerator = UIDGenerator() - self.pools_closed_lock = _AlertingRLock(name='pools closed') + self.pools_closed_lock = _AlertingRLock(name="pools closed") - self.pending_eth_calls: DefaultDict[BlockId, Multicall] = defaultdict(lambda: Multicall(self)) + self.pending_eth_calls: DefaultDict[BlockId, Multicall] = defaultdict( + lambda: Multicall(self) + ) """A dictionary of pending Multicalls by block. The Multicalls hold all pending eth_calls.""" self.pending_rpc_calls = JSONRPCBatch(self) """A JSONRPCBatch containing all pending rpc requests.""" - + def __repr__(self) -> str: """ Returns a string representation of the DankMiddlewareController instance. @@ -177,35 +190,37 @@ async def __call__(self, method: RPCEndpoint, params: Any) -> RPCResponse: Returns: The response from the RPC call. """ - + # eth_call go thru a specialized Semaphore and other methods pass thru unblocked if method == "eth_call": async with self.eth_call_semaphores[params[1]]: # create a strong ref to the call that will be held until the caller completes or is cancelled - logger.debug(f'making {self.request_type.__name__} {method} with params {params}') + logger.debug(f"making {self.request_type.__name__} {method} with params {params}") if params[0]["to"] in self.no_multicall: return await RPCRequest(self, method, params) return await eth_call(self, params) # some methods go thru a SmartProcessingQueue, we check those next queue = self.method_queues[method] - logger.debug(f'making {self.request_type.__name__} {method} with params {params}') + logger.debug(f"making {self.request_type.__name__} {method} with params {params}") # no queue, we can make the request normally if queue is None: return await RPCRequest(self, method, params) - + # queue found, queue up the call and await the future try: - # NOTE: is this a strong enough ref? + # NOTE: is this a strong enough ref? return await queue(self, method, params) except TypeError as e: if "unhashable type" not in str(e): raise return await queue(self, method, _helpers._make_hashable(params)) - + @eth_retry.auto_retry - async def make_request(self, method: str, params: List[Any], request_id: Optional[int] = None) -> RawResponse: + async def make_request( + self, method: str, params: List[Any], request_id: Optional[int] = None + ) -> RawResponse: """ Makes an RPC request to the Ethereum node. @@ -223,12 +238,21 @@ async def make_request(self, method: str, params: List[Any], request_id: Optiona Raises: Exception: If DEBUG environment variable is set, any exception that occurs during the request is logged and re-raised. """ - request = self.request_type(method=method, params=params, id=request_id or self.call_uid.next) + request = self.request_type( + method=method, params=params, id=request_id or self.call_uid.next + ) try: return await _session.post(self.endpoint, data=request, loads=_codec.decode_raw) except Exception as e: if ENVS.DEBUG: - _debugging.failures.record(self.chain_id, e, "eth_call" if method == "eth_call" else "RPCRequest", "unknown", "unknown", request.data) + _debugging.failures.record( + self.chain_id, + e, + "eth_call" if method == "eth_call" else "RPCRequest", + "unknown", + "unknown", + request.data, + ) raise async def execute_batch(self) -> None: @@ -244,10 +268,10 @@ async def execute_batch(self) -> None: self.num_pending_eth_calls = 0 rpc_calls = self.pending_rpc_calls[:] self.pending_rpc_calls = JSONRPCBatch(self) - demo_logger.info(f'executing dank batch (current cid: {self.call_uid.latest})') # type: ignore + demo_logger.info(f"executing dank batch (current cid: {self.call_uid.latest})") # type: ignore batch = DankBatch(self, multicalls, rpc_calls) await batch - demo_logger.info(f'{batch} done') + demo_logger.info(f"{batch} done") @property def queue_is_full(self) -> bool: @@ -263,7 +287,7 @@ def queue_is_full(self) -> bool: eth_calls = sum(len(calls) for calls in self.pending_eth_calls.values()) other_calls = sum(len(call) for call in self.pending_rpc_calls) return eth_calls + other_calls >= self.batcher.step - + def early_start(self): """ Initiate processing of queued calls when a full batch is available. @@ -277,7 +301,7 @@ def early_start(self): self.pending_rpc_calls.extend(self.pending_eth_calls.values(), skip_check=True) self.pending_eth_calls.clear() self.pending_rpc_calls.start() - + def reduce_multicall_size(self, num_calls: int) -> None: """ Decrease the size of multicall batches in response to failures. @@ -289,7 +313,7 @@ def reduce_multicall_size(self, num_calls: int) -> None: num_calls: The number of calls in the failed multicall operation. """ self._reduce_chunk_size(num_calls, "multicall") - + def reduce_batch_size(self, num_calls: int) -> None: """ Decrease the size of JSON-RPC batches in response to failures. @@ -301,7 +325,7 @@ def reduce_batch_size(self, num_calls: int) -> None: num_calls: The number of calls in the failed batch operation. """ self._reduce_chunk_size(num_calls, "batch") - + def _reduce_chunk_size(self, num_calls: int, chunk_name: Literal["multicall", "batch"]) -> None: """ Internal method to reduce the size of processing chunks. @@ -318,7 +342,9 @@ def _reduce_chunk_size(self, num_calls: int, chunk_name: Literal["multicall", "b """ new_chunk_size = round(num_calls * 0.99) if num_calls >= 100 else num_calls - 1 if new_chunk_size < 30: - logger.warning(f"your {chunk_name} batch size is really low, did you have some connection issue earlier? You might want to restart your script. {chunk_name} chunk size will not be further lowered.") + logger.warning( + f"your {chunk_name} batch size is really low, did you have some connection issue earlier? You might want to restart your script. {chunk_name} chunk size will not be further lowered." + ) return # NOTE: We need the 2nd check because one of the other calls in a batch might have already reduced the chunk size if chunk_name == "batch": @@ -330,7 +356,7 @@ def _reduce_chunk_size(self, num_calls: int, chunk_name: Literal["multicall", "b logger.info("The failed multicall had %s calls", num_calls) return raise DankMidsInternalError(ValueError(f"chunk name {chunk_name} is invalid")) - + def set_multicall_size_limit(self, new_limit: int) -> None: """ Set a new limit for the multicall size. @@ -343,8 +369,12 @@ def set_multicall_size_limit(self, new_limit: int) -> None: self.batcher.step = new_limit logger.warning("multicall size limit reduced from %s to %s", existing_limit, new_limit) else: - logger.info("new multicall size limit %s is not lower than existing limit %s", new_limit, existing_limit) - + logger.info( + "new multicall size limit %s is not lower than existing limit %s", + new_limit, + existing_limit, + ) + def set_batch_size_limit(self, new_limit: int) -> None: """ Set a new limit for the JSON-RPC batch size. @@ -355,10 +385,12 @@ def set_batch_size_limit(self, new_limit: int) -> None: existing_limit = ENVS.MAX_JSONRPC_BATCH_SIZE # type: ignore [attr-defined] if new_limit < existing_limit: # type: ignore [operator] ENVS.MAX_JSONRPC_BATCH_SIZE = new_limit # type: ignore [attr-defined,assignment] - logger.warning("jsonrpc batch size limit reduced from %s to %s", existing_limit, new_limit) + logger.warning( + "jsonrpc batch size limit reduced from %s to %s", existing_limit, new_limit + ) else: logger.info("new jsonrpc batch size limit %s is not lower than existing limit %s", new_limit, int(existing_limit)) # type: ignore [call-overload] - + @lru_cache(maxsize=1024) def _select_mcall_target_for_block(self, block) -> "_MulticallContract": """ @@ -370,7 +402,7 @@ def _select_mcall_target_for_block(self, block) -> "_MulticallContract": Returns: The selected multicall contract. """ - if block == 'latest': + if block == "latest": return self.mc3 or self.mc2 # type: ignore [return-value] if self.mc3 and not self.mc3.needs_override_code_for_block(block): return self.mc3 @@ -387,7 +419,7 @@ class _MulticallContract(Struct): """ The Ethereum address of the multicall contract. """ - + deploy_block: Optional[BlockNumber] """ The block number at which the multicall contract was deployed. @@ -399,7 +431,7 @@ class _MulticallContract(Struct): The bytecode of the multicall contract. This is used for state override if necessary. """ - + @lru_cache(maxsize=1024) def needs_override_code_for_block(self, block: BlockNumber) -> bool: """ @@ -411,18 +443,18 @@ def needs_override_code_for_block(self, block: BlockNumber) -> bool: Returns: True if override code is needed, False otherwise. """ - if block == 'latest': + if block == "latest": return False if self.deploy_block is None: return True if isinstance(block, str): block = int(block, 16) return block < self.deploy_block - + def __hash__(self) -> int: """ Generates a hash for the _MulticallContract instance. - + Returns: A hash value based on the contract's address. """ diff --git a/dank_mids/exceptions.py b/dank_mids/exceptions.py index b0c56d12..9d620788 100644 --- a/dank_mids/exceptions.py +++ b/dank_mids/exceptions.py @@ -1,16 +1,18 @@ +class Revert(ValueError): ... -class Revert(ValueError): - ... class BrownieNotConnectedError(RuntimeError): """ :class:`RuntimeError` raised when brownie is not connected but needs to be in order to access functionality within :mod:`dank_mids.brownie_patch`. """ + def __init__(self, obj_name: str): """ Initializes a new BrownieNotConnectedError. - + Params: obj_name: The name of the object the user attempted to access, which requires brownie to be connected. """ - super().__init__(f"Brownie is not connected to a network. You must be connected to a network in order to access `dank_mids.{obj_name}`.") + super().__init__( + f"Brownie is not connected to a network. You must be connected to a network in order to access `dank_mids.{obj_name}`." + ) diff --git a/dank_mids/middleware.py b/dank_mids/middleware.py index 54a1d23b..b1ed1db2 100644 --- a/dank_mids/middleware.py +++ b/dank_mids/middleware.py @@ -12,9 +12,9 @@ logger = logging.getLogger(__name__) + async def dank_middleware( - make_request: Callable[[RPCEndpoint, Any], Any], - web3: Web3 + make_request: Callable[[RPCEndpoint, Any], Any], web3: Web3 ) -> AsyncMiddleware: """ Create and return a :class:`DankMiddlewareController` instance for an asynchronous :class:`~Web3`. diff --git a/dank_mids/semaphores.py b/dank_mids/semaphores.py index 219e892b..8d3bd386 100644 --- a/dank_mids/semaphores.py +++ b/dank_mids/semaphores.py @@ -5,7 +5,9 @@ import a_sync from a_sync.primitives import DummySemaphore, ThreadsafeSemaphore from a_sync.primitives.locks.prio_semaphore import ( - _AbstractPrioritySemaphore, _PrioritySemaphoreContextManager) + _AbstractPrioritySemaphore, + _PrioritySemaphoreContextManager, +) from web3.types import RPCEndpoint if TYPE_CHECKING: @@ -15,7 +17,7 @@ class _BlockSemaphoreContextManager(_PrioritySemaphoreContextManager): """ A context manager for block-specific semaphores. - + This class is used internally to manage concurrency for operations related to specific blockchain blocks. """ @@ -23,11 +25,17 @@ class _BlockSemaphoreContextManager(_PrioritySemaphoreContextManager): _priority_name = "block" """The noun that describes the priority, set to "block".""" - def __init__(self, parent: "BlockSemaphore", priority: Union[int, float, Decimal], name: Optional[str] = None) -> None: + def __init__( + self, + parent: "BlockSemaphore", + priority: Union[int, float, Decimal], + name: Optional[str] = None, + ) -> None: if not isinstance(priority, (int, float, Decimal)): raise TypeError(priority) super().__init__(parent, priority, name) - + + class BlockSemaphore(_AbstractPrioritySemaphore[str, _BlockSemaphoreContextManager]): # type: ignore [type-var] """A semaphore for managing concurrency based on block numbers. @@ -46,14 +54,28 @@ class BlockSemaphore(_AbstractPrioritySemaphore[str, _BlockSemaphoreContextManag _top_priority: int = -1 # type: ignore [assignment] """The highest priority value, set to -1.""" - + def __getitem__(self, block: Union[int, str, Literal["latest", None]]) -> "_BlockSemaphoreContextManager": # type: ignore [override] return super().__getitem__( # type: ignore [return-value] - block if isinstance(block, int) # type: ignore [index] - else int(block.hex(), 16) if isinstance(block, bytes) # type: ignore [union-attr] - else int(block, 16) if isinstance(block, str) and "0x" in block - else block if block not in [None, 'latest'] # NOTE: We do this to generate an err if an unsuitable value was provided - else self._top_priority + block + if isinstance(block, int) # type: ignore [index] + else ( + int(block.hex(), 16) + if isinstance(block, bytes) # type: ignore [union-attr] + else ( + int(block, 16) + if isinstance(block, str) and "0x" in block + else ( + block + if block + not in [ + None, + "latest", + ] # NOTE: We do this to generate an err if an unsuitable value was provided + else self._top_priority + ) + ) + ) ) # type: ignore [index] @@ -68,9 +90,13 @@ class _MethodSemaphores: def __init__(self, controller: "DankMiddlewareController") -> None: # TODO: refactor this out, just use BlockSemaphore for eth_call and SmartProcessingQueue to limit other methods from dank_mids import ENVIRONMENT_VARIABLES + self.controller = controller + semaphore_types = {"eth_call": BlockSemaphore} self.method_semaphores = { - method: (BlockSemaphore if method == "eth_call" else ThreadsafeSemaphore)(sem._value, name=f"{method} {controller}") + method: semaphore_types.get(method, ThreadsafeSemaphore)( + sem._value, name=f"{method} {controller}" + ) for method, sem in ENVIRONMENT_VARIABLES.method_semaphores.items() } self.keys = self.method_semaphores.keys() @@ -89,6 +115,7 @@ def __getitem__(self, method: RPCEndpoint) -> Union[ThreadsafeSemaphore, DummySe """ return next((self.method_semaphores[key] for key in self.keys if key in method), self.dummy) + class _MethodQueues: """ A class that manages queues for different RPC methods. @@ -100,14 +127,16 @@ class _MethodQueues: def __init__(self, controller: "DankMiddlewareController") -> None: from dank_mids import ENVIRONMENT_VARIABLES from dank_mids._requests import RPCRequest - + self.controller = controller """ A reference to the DankMiddlewareController instance that this _MethodQueues is associated with. """ self.method_queues = { - method: a_sync.SmartProcessingQueue(RPCRequest, num_workers=sem._value, name=f"{method} {controller}") + method: a_sync.SmartProcessingQueue( + RPCRequest, num_workers=sem._value, name=f"{method} {controller}" + ) for method, sem in ENVIRONMENT_VARIABLES.method_semaphores.items() if method != "eth_call" } diff --git a/dank_mids/stats.py b/dank_mids/stats.py index da4cb071..a2b9ceec 100644 --- a/dank_mids/stats.py +++ b/dank_mids/stats.py @@ -1,4 +1,3 @@ - """ stats.py @@ -22,8 +21,7 @@ from collections import defaultdict, deque from concurrent.futures import ProcessPoolExecutor from time import time -from typing import (TYPE_CHECKING, Any, Callable, DefaultDict, Deque, Set, - Type, TypeVar) +from typing import TYPE_CHECKING, Any, Callable, DefaultDict, Deque, Set, Type, TypeVar import msgspec from typed_envs.registry import _ENVIRONMENT_VARIABLES_SET_BY_USER @@ -40,7 +38,7 @@ T = TypeVar("T") # New logging levels: -# DEBUG=10, INFO=20, +# DEBUG=10, INFO=20, STATS = 13 """Custom logging level for statistics, between DEBUG and INFO.""" @@ -52,6 +50,7 @@ # if you're both collecting data and logging something, put the function here: + def log_errd_batch(batch: "JSONRPCBatch") -> None: """ Log information about a failed JSON-RPC batch. @@ -60,8 +59,11 @@ def log_errd_batch(batch: "JSONRPCBatch") -> None: batch: The failed batch to log. """ collector.errd_batches.append(batch) - logger.devhint(f"jsonrpc batch failed\njson batch id: {batch.jid} | len: {len(batch)} | total calls: {batch.total_calls}\n" - + f"methods called: {batch.method_counts}") + logger.devhint( + f"jsonrpc batch failed\njson batch id: {batch.jid} | len: {len(batch)} | total calls: {batch.total_calls}\n" + + f"methods called: {batch.method_counts}" + ) + def log_duration(work_descriptor: str, start: float, *, level: _LogLevel = STATS) -> None: """ @@ -85,7 +87,7 @@ def log_duration(work_descriptor: str, start: float, *, level: _LogLevel = STATS class _StatsLogger(logging.Logger): """ A custom logger class for collecting and logging statistics about RPC method calls and responses. - + This logger extends the standard Python logging.Logger with methods for tracking validation errors, response types, and other statistics related to RPC interactions. """ @@ -95,7 +97,7 @@ def enabled(self) -> bool: """Returns `True` if level is set to `STATS` (`11`) or below.""" self._ensure_daemon() return self.isEnabledFor(STATS) - + # New logging levels def stats(self, msg, *args, **kwargs) -> None: @@ -106,7 +108,7 @@ def devhint(self, msg, *args, **kwargs) -> None: self._log(DEVHINT, msg, args, **kwargs) # Functions to print stats to your logs. - + def log_brownie_stats(self, *, level: _LogLevel = STATS) -> None: self._log_fn_result(level, _Writer.brownie) @@ -116,25 +118,27 @@ def log_event_loop_stats(self, *, level: _LogLevel = STATS) -> None: def log_subprocess_stats(self, *, level: _LogLevel = STATS) -> None: for pool in {ENVS.BROWNIE_ENCODER_PROCESSES, ENVS.BROWNIE_DECODER_PROCESSES, ENVS.MULTICALL_DECODER_PROCESSES}: # type: ignore [attr-defined] self._log_fn_result(level, _Writer.queue, pool) - + # Internal helpers def _log(self, level: _LogLevel, *args, **kwargs) -> None: # Saves us having to do this ourselves for each custom message if self.isEnabledFor(level): return self._log_nocheck(level, *args, **kwargs) - + def _log_nocheck(self, level: _LogLevel, *args, **kwargs) -> None: try: return super()._log(level, args[0], args[1:], **kwargs) except IndexError: raise ValueError("Both a level and a message are required.") from None - def _log_fn_result(self, level: _LogLevel, callable: Callable[[T], str], *callable_args: T, **logging_kwargs) -> None: + def _log_fn_result( + self, level: _LogLevel, callable: Callable[[T], str], *callable_args: T, **logging_kwargs + ) -> None: """If `self.isEnabledFor(level)` is True, will call `callable` with your args and log the output.""" if self.isEnabledFor(level): return self._log_nocheck(level, callable(*callable_args), (), **logging_kwargs) - + # Daemon def _ensure_daemon(self) -> None: @@ -147,7 +151,7 @@ def _ensure_daemon(self) -> None: self._daemon = asyncio.create_task(self._stats_daemon()) elif self._daemon.done(): raise self._daemon.exception() # type: ignore [misc] - + async def _stats_daemon(self) -> None: """ The main loop of the stats daemon. It continuously collects event loop times @@ -173,7 +177,7 @@ async def _stats_daemon(self) -> None: def log_validation_error(self, method: RPCEndpoint, e: msgspec.ValidationError) -> None: """ Log a validation error for a specific RPC method. - + Args: method: The RPC method that encountered the error. e: The validation error that occurred. @@ -182,12 +186,17 @@ def log_validation_error(self, method: RPCEndpoint, e: msgspec.ValidationError) if COLLECT_STATS or enabled: collector.validation_errors[method].append(e) if enabled: - self._log(DEVHINT, f"ValidationError when decoding response for {method}", ("This *should* not impact your script. If it does, you'll know."), e) + self._log( + DEVHINT, + f"ValidationError when decoding response for {method}", + ("This *should* not impact your script. If it does, you'll know."), + e, + ) def log_types(self, method: RPCEndpoint, decoded: Any) -> None: """ Log the types of decoded values for a specific RPC method. - + This method analyzes the decoded response data from an RPC call and logs information about the data types encountered. It's useful for understanding the structure of responses from different RPC methods. @@ -198,7 +207,7 @@ def log_types(self, method: RPCEndpoint, decoded: Any) -> None: """ # TODO fix this, use enabled check types = {type(v) for v in decoded.values()} - self.devhint(f'my method and types: {method} {types}') + self.devhint(f"my method and types: {method} {types}") if list in types: self._log_list_types(decoded.values()) collector.types.update(types) @@ -206,7 +215,7 @@ def log_types(self, method: RPCEndpoint, decoded: Any) -> None: def _log_list_types(self, values, level: _LogLevel = DEVHINT) -> None: """ Log the types of items in a list. - + This internal method is used to analyze and log the types of elements found in list structures within RPC responses. It's particularly useful for understanding complex, nested data structures. @@ -223,8 +232,10 @@ def _log_list_types(self, values, level: _LogLevel = DEVHINT) -> None: _Times = Deque[float] + class _Collector: """Handles the collection and computation of stats-related data.""" + def __init__(self): self.errd_batches: Deque["JSONRPCBatch"] = deque(maxlen=500) """ @@ -252,7 +263,9 @@ def __init__(self): """ # not implemented - self.validation_errors: DefaultDict[RPCEndpoint, Deque[msgspec.ValidationError]] = defaultdict(lambda: deque(maxlen=100)) + self.validation_errors: DefaultDict[RPCEndpoint, Deque[msgspec.ValidationError]] = ( + defaultdict(lambda: deque(maxlen=100)) + ) """ A default dictionary that stores validation errors encountered for each RPC endpoint. The keys are RPC endpoints, and the values are deques of ValidationError objects. @@ -268,6 +281,7 @@ def avg_loop_time(self) -> float: The average time taken for event loop iterations. """ return sum(collector.event_loop_times) / len(collector.event_loop_times) + @property def count_active_brownie_calls(self) -> int: """ @@ -277,6 +291,7 @@ def count_active_brownie_calls(self) -> int: The count of active Brownie calls. """ return ENVS.BROWNIE_CALL_SEMAPHORE.default_value - ENVS.BROWNIE_CALL_SEMAPHORE.semaphore._value # type: ignore [attr-defined] + @property def count_queued_brownie_calls(self) -> int: """ @@ -286,6 +301,7 @@ def count_queued_brownie_calls(self) -> int: The count of queued Brownie calls. """ return len(ENVS.BROWNIE_CALL_SEMAPHORE.semaphore._waiters) # type: ignore [attr-defined] + @property def encoder_queue_len(self) -> int: """ @@ -295,6 +311,7 @@ def encoder_queue_len(self) -> int: The number of items in the encoder queue. """ return ENVS.BROWNIE_ENCODER_PROCESSES._queue_count # type: ignore [attr-defined] + @property def decoder_queue_len(self) -> int: """ @@ -304,6 +321,7 @@ def decoder_queue_len(self) -> int: The number of items in the decoder queue. """ return ENVS.BROWNIE_DECODER_PROCESSES._queue_count # type: ignore [attr-defined] + @property def mcall_decoder_queue_len(self) -> int: """ @@ -313,42 +331,45 @@ def mcall_decoder_queue_len(self) -> int: The number of items in the multicall decoder queue. """ return ENVS.MULTICALL_DECODER_PROCESSES._queue_count # type: ignore [attr-defined] - + class _Writer: """ `Writer` is used to turn `Collector` stats into human readable on a as-needed, JIT basis without wasting compute or cluttering `Collector` or `StatsLogger` class definitions. """ + def event_loop(self) -> str: """ Generates a human-readable string describing the average event loop time. - + Returns: A string containing the average event loop time. """ return f"Average event loop time: {collector.avg_loop_time}" + def brownie(self) -> str: """ Generates a human-readable string describing the current state of Brownie calls. - + Returns: A string containing the number of active and queued Brownie calls. """ return f"{collector.count_active_brownie_calls} brownie calls are processing, {collector.count_queued_brownie_calls} are queued in {ENVS.BROWNIE_CALL_SEMAPHORE}." + def queue(self, pool: ProcessPoolExecutor) -> str: """ Generates a human-readable string describing the state of a process pool's queue. - + Args: pool: The ProcessPoolExecutor to describe. - + Returns: A string containing the number of items in the pool's queue. """ return f"{pool} has {pool._queue_count} items in its queue" - + class _SentryExporter: """ A class for exporting statistics and metrics from the :obj:`metrics` dict to Sentry. @@ -366,9 +387,9 @@ class _SentryExporter: metrics = { "active_eth_calls": "count_active_brownie_calls", "queued_eth_calls": "count_queued_brownie_calls", - "encoder_queue": "encoder_queue_len", - "decoder_queue": "decoder_queue_len", - "loop_time": "avg_loop_time", + "encoder_queue": "encoder_queue_len", + "decoder_queue": "decoder_queue_len", + "loop_time": "avg_loop_time", } units = {"loop_time": "seconds"} @@ -389,10 +410,14 @@ def push_envs(self) -> None: try: self.set_tag(env, value) except Exception as e: - logger.warning(f"Unable to set sentry tag {env} to {value}. See {e.__class__.__name__} below:") + logger.warning( + f"Unable to set sentry tag {env} to {value}. See {e.__class__.__name__} below:" + ) logger.info(e, exc_info=True) + try: import sentry_sdk + set_tag = sentry_sdk.set_tag """ Set a tag for the current scope in Sentry. diff --git a/dank_mids/types.py b/dank_mids/types.py index 4f005572..1a199fb3 100644 --- a/dank_mids/types.py +++ b/dank_mids/types.py @@ -1,10 +1,26 @@ - import logging import re from time import time -from typing import (TYPE_CHECKING, Any, Callable, Coroutine, DefaultDict, Dict, - Iterator, List, Literal, Mapping, NewType, Optional, Set, - Tuple, TypedDict, TypeVar, Union, overload) +from typing import ( + TYPE_CHECKING, + Any, + Callable, + Coroutine, + DefaultDict, + Dict, + Iterator, + List, + Literal, + Mapping, + NewType, + Optional, + Set, + Tuple, + TypedDict, + TypeVar, + Union, + overload, +) import msgspec from eth_typing import ChecksumAddress @@ -12,7 +28,12 @@ from web3.types import RPCEndpoint, RPCResponse from dank_mids import constants, stats -from dank_mids._exceptions import BadResponse, ChainstackRateLimited, ExceedsMaxBatchSize, PayloadTooLarge +from dank_mids._exceptions import ( + BadResponse, + ChainstackRateLimited, + ExceedsMaxBatchSize, + PayloadTooLarge, +) if TYPE_CHECKING: from dank_mids._requests import Multicall @@ -62,13 +83,14 @@ _nested_dict_of_stuff = Dict[str, Union[str, None, _list_of_stuff, _dict_of_stuff]] """A type alias for a nested dictionary structure.""" + class _DictStruct(msgspec.Struct): """A base class enhancing :class:`~msgspec.Struct` with additional dictionary-like functionality.""" def __bool__(self) -> bool: """A Struct will always exist.""" return True - + def __getitem__(self, attr: str) -> Any: """ Allow dictionary-style access to attributes. @@ -83,17 +105,17 @@ def __getitem__(self, attr: str) -> Any: return getattr(self, attr) except AttributeError: raise KeyError(attr) from None - + def __getattr__(self, attr: str) -> Any: """ Get the value of an attribute, raising AttributeError if the value is :obj:`msgspec.UNSET`. - + Parameters: attr: The name of the attribute to fetch. Raises: AttributeError: If the value is :obj:`~msgspec.UNSET`. - + Returns: The value of the attribute. """ @@ -112,11 +134,11 @@ def __iter__(self) -> Iterator[str]: for field in self.__struct_fields__: if getattr(self, field, msgspec.UNSET) is not msgspec.UNSET: yield field - + def __len__(self) -> int: """ The number of keys in the Struct. - + Returns: The number of keys. """ @@ -131,7 +153,7 @@ def items(self) -> Iterator[Tuple[str, Any]]: yield key, getattr(self, key) except AttributeError: continue - + def values(self) -> Iterator[Any]: for key in self.__struct_fields__: try: @@ -142,10 +164,10 @@ def values(self) -> Iterator[Any]: class PartialRequest(_DictStruct, frozen=True): # type: ignore [call-arg] """ - Represents a partial JSON-RPC request. - + Represents a partial JSON-RPC request. + While technially part of a request, we can successfully make requests to many nodes without including the `jsonrpc` field. - + This class leaves off the `jsonrpc` field reduce encoding burden and web traffic. This works with many but not all nodes. @@ -164,16 +186,18 @@ class PartialRequest(_DictStruct, frozen=True): # type: ignore [call-arg] def data(self) -> bytes: return msgspec.json.encode(self) + class Request(PartialRequest): """ Represents a complete JSON-RPC request. - + Inherits from PartialRequest and adds the JSON-RPC version. """ jsonrpc: Literal["2.0"] = "2.0" """The JSON-RPC version, always set to "2.0".""" + class Error(_DictStruct, frozen=True): # type: ignore [call-arg] """ Represents an error in a JSON-RPC response. @@ -188,6 +212,7 @@ class Error(_DictStruct, frozen=True): # type: ignore [call-arg] data: Optional[Any] = msgspec.UNSET """Additional error data, if any.""" + # some devving tools that will go away eventually _dict_responses: Set[str] = set() _str_responses: Set[str] = set() @@ -204,10 +229,12 @@ class Log(_DictStruct, frozen=True): # type: ignore [call-arg] data: Optional[str] topics: Optional[List[str]] + class AccessListEntry(_DictStruct, frozen=True): # type: ignore [call-arg] address: str storageKeys: List[str] + AccessList = List[AccessListEntry] # TODO: use the types from snek @@ -216,13 +243,16 @@ class AccessListEntry(_DictStruct, frozen=True): # type: ignore [call-arg] class FeeStats(_DictStruct, frozen=True): # type: ignore [call-arg] """Arbitrum includes this in the `feeStats` field of a tx receipt.""" + l1Calldata: str l2Storage: str l1Transaction: str l2Computation: str + class ArbitrumFeeStats(_DictStruct, frozen=True): # type: ignore [call-arg] """Arbitrum includes these with a tx receipt.""" + paid: FeeStats """ The breakdown of gas paid for the transaction. @@ -235,6 +265,7 @@ class ArbitrumFeeStats(_DictStruct, frozen=True): # type: ignore [call-arg] prices: FeeStats = msgspec.UNSET """The breakdown of gas prices for the transaction.""" + class TransactionReceipt(_DictStruct, frozen=True, omit_defaults=True): # type: ignore [call-arg] transactionHash: str blockHash: str @@ -257,7 +288,7 @@ class TransactionReceipt(_DictStruct, frozen=True, omit_defaults=True): # type: feeStats: ArbitrumFeeStats = msgspec.UNSET """This field is only present on Arbitrum.""" - + class Block(_DictStruct, frozen=True): parentHash: str sha3Uncles: str @@ -276,7 +307,7 @@ class Block(_DictStruct, frozen=True): size: str uncles: List[str] transactions: List[Union[str, Transaction]] - + _RETURN_TYPES = { "eth_call": str, @@ -289,7 +320,7 @@ class Block(_DictStruct, frozen=True): "eth_getBlockByNumber": Block, "eth_getTransactionCount": str, "eth_getTransactionByHash": Transaction, - "eth_getTransactionReceipt": TransactionReceipt, + "eth_getTransactionReceipt": TransactionReceipt, "erigon_getHeaderByNumber": Dict[str, Union[str, int, bool, None]], } """ @@ -297,14 +328,15 @@ class Block(_DictStruct, frozen=True): Used to enable more efficient decoding and validation of RPC responses. """ -decoder_logger = logging.getLogger('dank_mids.decoder') +decoder_logger = logging.getLogger("dank_mids.decoder") _chainstack_429_msg = "You've exceeded the RPS limit available on the current plan." + class PartialResponse(_DictStruct, frozen=True): """ - Represents a partial JSON-RPC response. - + Represents a partial JSON-RPC response. + We use these to more efficiently decode responses from the node. """ @@ -320,18 +352,31 @@ def exception(self) -> Exception: if self.error is None: raise AttributeError(f"{self} did not error.") return ( - PayloadTooLarge(self) if self.payload_too_large - else ExceedsMaxBatchSize(self) if re.search(r'batch limit (\d+) exceeded', self.error.message) - else TypeError(self.error.message, "DANKMIDS NOTE: You're probably passing what should be an integer type as a string type. The usual culprit is a block number.") if self.error.message == 'invalid argument 1: hex string without 0x prefix' - # chainstack doesnt return a response with status code 429 when we reach rate limits, so we need to handle it specifically here instead of in the usual place - else ChainstackRateLimited(self) if _chainstack_429_msg in self.error.message - else BadResponse(self) + PayloadTooLarge(self) + if self.payload_too_large + else ( + ExceedsMaxBatchSize(self) + if re.search(r"batch limit (\d+) exceeded", self.error.message) + else ( + TypeError( + self.error.message, + "DANKMIDS NOTE: You're probably passing what should be an integer type as a string type. The usual culprit is a block number.", + ) + if self.error.message == "invalid argument 1: hex string without 0x prefix" + # chainstack doesnt return a response with status code 429 when we reach rate limits, so we need to handle it specifically here instead of in the usual place + else ( + ChainstackRateLimited(self) + if _chainstack_429_msg in self.error.message + else BadResponse(self) + ) + ) + ) ) - + @property def payload_too_large(self) -> bool: return any(err in self.error.message for err in constants.TOO_MUCH_DATA_ERRS) # type: ignore [union-attr] - + def to_dict(self, method: Optional[RPCEndpoint] = None) -> RPCResponse: # type: ignore [override] """Returns a complete dictionary representation of this response ``Struct``.""" data: RPCResponse = {} @@ -343,22 +388,34 @@ def to_dict(self, method: Optional[RPCEndpoint] = None) -> RPCResponse: # type: data[field] = AttributeDict(attr) if isinstance(attr, Mapping) else attr # type: ignore [literal-required] return data - def decode_result(self, method: Optional[RPCEndpoint] = None, _caller = None) -> Union[str, AttributeDict]: + def decode_result( + self, method: Optional[RPCEndpoint] = None, _caller=None + ) -> Union[str, AttributeDict]: # NOTE: These must be added to the `_RETURN_TYPES` constant above manually if method and (typ := _RETURN_TYPES.get(method)): - if method in ["eth_call", "eth_blockNumber", "eth_getCode", "eth_getBlockByNumber", "eth_getTransactionReceipt", "eth_getTransactionCount", "eth_getBalance", "eth_chainId", "erigon_getHeaderByNumber"]: + if method in [ + "eth_call", + "eth_blockNumber", + "eth_getCode", + "eth_getBlockByNumber", + "eth_getTransactionReceipt", + "eth_getTransactionCount", + "eth_getBalance", + "eth_chainId", + "erigon_getHeaderByNumber", + ]: try: return msgspec.json.decode(self.result, type=typ) except (msgspec.ValidationError, TypeError) as e: raise ValueError( e, - f'method: {method} result: {msgspec.json.decode(self.result)}', + f"method: {method} result: {msgspec.json.decode(self.result)}", ).with_traceback(e.__traceback__) from e try: start = time() decoded = msgspec.json.decode(self.result, type=typ) if _caller: - stats.log_duration(f'decoding {type(_caller)} {method}', start) + stats.log_duration(f"decoding {type(_caller)} {method}", start) return AttributeDict(decoded) if isinstance(decoded, dict) else decoded except (msgspec.ValidationError, TypeError) as e: stats.logger.log_validation_error(self, e) @@ -367,12 +424,14 @@ def decode_result(self, method: Optional[RPCEndpoint] = None, _caller = None) -> if method: try: if method in _dict_responses: - decoded = AttributeDict(msgspec.json.decode(self.result, type=_nested_dict_of_stuff)) + decoded = AttributeDict( + msgspec.json.decode(self.result, type=_nested_dict_of_stuff) + ) stats.logger.log_types(method, decoded) return decoded elif method in _str_responses: # TODO: finish adding methods and get rid of this - stats.logger.devhint(f'Must add `{method}: str` to `_RETURN_TYPES`') + stats.logger.devhint(f"Must add `{method}: str` to `_RETURN_TYPES`") return msgspec.json.decode(self.result, type=str) except (msgspec.ValidationError, TypeError) as e: stats.logger.log_validation_error(method, e) @@ -390,13 +449,15 @@ def decode_result(self, method: Optional[RPCEndpoint] = None, _caller = None) -> elif isinstance(decoded, list): if method is None: return decoded - raise TypeError(f"type {type(decoded)} is not supported. method: {method} decoded: {decoded}") - + raise TypeError( + f"type {type(decoded)} is not supported. method: {method} decoded: {decoded}" + ) + class Response(PartialResponse): """ Represents a complete JSON-RPC response. - + Inherits from PartialResponse and adds the response identifier and JSON-RPC version. """ @@ -406,19 +467,22 @@ class Response(PartialResponse): jsonrpc: Literal["2.0"] = "2.0" """The JSON-RPC version, always set to "2.0".""" + class RawResponse: """ Wraps a Raw object that we know represents a Response with a `decode` helper method. A `RawResponse` is a properly shaped response for one rpc call, received back from a jsonrpc batch request. They represent either a successful or a failed response, stored as pre-decoded bytes. """ + def __init__(self, raw: msgspec.Raw) -> None: self._raw = raw """The `msgspec.Raw` object wrapped by this wrapper.""" + @overload - def decode(self, partial: Literal[True]) -> PartialResponse:... + def decode(self, partial: Literal[True]) -> PartialResponse: ... @overload - def decode(self, partial: Literal[False] = False) -> Response:... + def decode(self, partial: Literal[False] = False) -> Response: ... def decode(self, partial: bool = False) -> Union[Response, PartialResponse]: """Decode the wrapped `msgspec.Raw` object into a `Response` or a `PartialResponse`.""" try: @@ -427,12 +491,14 @@ def decode(self, partial: bool = False) -> Union[Response, PartialResponse]: e.args = (*e.args, f"decoded: {msgspec.json.decode(self._raw)}") raise + JSONRPCBatchRequest = List[Request] # NOTE: A PartialResponse result implies a failure response from the rpc. JSONRPCBatchResponse = Union[List[RawResponse], PartialResponse] # We need this for proper decoding. JSONRPCBatchResponseRaw = Union[List[msgspec.Raw], PartialResponse] + def _encode_hook(obj: Any) -> Any: """ A hook function for encoding objects during JSON serialization. @@ -448,4 +514,4 @@ def _encode_hook(obj: Any) -> Any: """ if isinstance(obj, AttributeDict): return dict(obj) - raise NotImplementedError(type(obj)) \ No newline at end of file + raise NotImplementedError(type(obj))