diff --git a/dank_mids/ENVIRONMENT_VARIABLES.py b/dank_mids/ENVIRONMENT_VARIABLES.py index e13d746d..f8d85bad 100644 --- a/dank_mids/ENVIRONMENT_VARIABLES.py +++ b/dank_mids/ENVIRONMENT_VARIABLES.py @@ -58,6 +58,7 @@ # Set this env var to any value to force the full request spec always USE_FULL_REQUEST = _envs.create_env("USE_FULL_REQUEST", bool, default=False, verbose=False) +DEBUG = _envs.create_env("DEBUG", bool, default=False, verbose=False) # NOTE: EXPORT_STATS is not implemented # TODO: implement this EXPORT_STATS = _envs.create_env("EXPORT_STATS", bool, default=False, verbose=False) diff --git a/dank_mids/_debugging/__init__.py b/dank_mids/_debugging/__init__.py new file mode 100644 index 00000000..09395aba --- /dev/null +++ b/dank_mids/_debugging/__init__.py @@ -0,0 +1,5 @@ +"""This module is private for now but will eventually be made public when the api ossifies""" + +from dank_mids._debugging import failures + +__all__ = ['failures'] diff --git a/dank_mids/_debugging/_base.py b/dank_mids/_debugging/_base.py new file mode 100644 index 00000000..4edc7280 --- /dev/null +++ b/dank_mids/_debugging/_base.py @@ -0,0 +1,59 @@ + +import abc +import logging +import os +from functools import cached_property, lru_cache +from typing import Any, Iterable + +import aiofiles +from aiofiles.base import AiofilesContextManager +from aiofiles.threadpool.text import AsyncTextIOWrapper +from async_lru import alru_cache + +logger = logging.getLogger("dank_mids.debugging") + +class _FileHelper(metaclass=abc.ABCMeta): + path = f"{os.path.expanduser( '~' )}/.dank_mids/debug" + def __init__(self, chainid: int): + if not isinstance(chainid, int): + raise TypeError(f"`chainid` must be an integer. You passed {chainid}") from None + self.chainid = chainid + self.path = self.path + f"/{self.chainid}" + self.ensure_dir() + @lru_cache(maxsize=1) + def ensure_dir(self) -> None: + os.makedirs(self.path, exist_ok=True) + def open(self) -> "AiofilesContextManager[None, None, AsyncTextIOWrapper]": + logger.info("opening %s with mode %s", self.uri, self.mode) + return aiofiles.open(self.uri, self.mode) + @abc.abstractproperty + def uri(self) -> str: + ... + @abc.abstractproperty + def mode(self) -> str: + ... + +class _CSVWriter(_FileHelper): + mode = "a" + @cached_property + def uri(self) -> str: + return f"{self.path}/{self.filename}" + async def write_row(self, *values: Any) -> None: + await self._ensure_headers() + await self._write_row(*values) + @alru_cache(maxsize=None) + async def _ensure_headers(self) -> None: + await self._write_row(*self.column_names, new_line=False) + async def _write_row(self, *values: Any, new_line: bool = True) -> None: + row = ','.join(str(obj) for obj in values) + if new_line: + row = f'\n{row}' + async with self.open() as file: + logger.debug("writing row %s to file %s", row, file) + await file.write(row) + @abc.abstractproperty + def filename(self) -> str: + ... + @abc.abstractproperty + def column_names(self) -> Iterable[str]: + ... \ No newline at end of file diff --git a/dank_mids/_debugging/failures.py b/dank_mids/_debugging/failures.py new file mode 100644 index 00000000..785666da --- /dev/null +++ b/dank_mids/_debugging/failures.py @@ -0,0 +1,31 @@ + +from datetime import datetime +from functools import cached_property, lru_cache +from typing import TYPE_CHECKING, List, Type, Union + +from a_sync import ProcessingQueue + +from dank_mids._debugging._base import _CSVWriter + +if TYPE_CHECKING: + from dank_mids.types import PartialRequest, Request + +@lru_cache(maxsize=None) +class FailedRequestWriter(_CSVWriter): + column_names = "request_type", "request_uid", "error", "request_data" + def __init__(self, chainid: int, failure_type: Type[BaseException]): + super().__init__(chainid) + if not issubclass(failure_type, BaseException): + raise TypeError(f"`failure_type` must be an Exception type. You passed {failure_type}") + self.failure_type = failure_type + self.record_failure = ProcessingQueue(self._record_failure, num_workers=1, return_data=True) + @cached_property + def filename(self) -> str: + return f"{int(datetime.now().timestamp())}_{self.failure_type.__name__}s.csv" + async def _record_failure(self, e: Exception, request_type: str, request_uid: Union[str, int], request_data: Union[List["Request"], List["PartialRequest"], bytes]): + if not isinstance(e, self.failure_type): + raise TypeError(e, self.failure_type) + await self.write_row(request_type, request_uid, e, request_data) + +def record(chainid: int, e: Exception, request_type: str, request_uid: Union[int, str], request_data: Union[List["Request"], List["PartialRequest"], bytes]) -> None: + FailedRequestWriter(chainid, type(e)).record_failure(e, request_type, request_uid, request_data) diff --git a/dank_mids/_requests.py b/dank_mids/_requests.py index fb057abb..cfcda395 100644 --- a/dank_mids/_requests.py +++ b/dank_mids/_requests.py @@ -24,7 +24,7 @@ from web3.types import RPCEndpoint, RPCResponse from dank_mids import ENVIRONMENT_VARIABLES as ENVS -from dank_mids import constants, stats +from dank_mids import _debugging, constants, stats from dank_mids._demo_mode import demo_logger from dank_mids._exceptions import (BadResponse, DankMidsClientResponseError, DankMidsInternalError, EmptyBatch, @@ -521,11 +521,14 @@ async def get_response(self) -> None: if e.message == "Payload Too Large": logger.info("Payload too large. response headers: %s", e.headers) self.controller.reduce_multicall_size(len(self)) + _debugging.failures.record(self.controller.chain_id, e, type(self).__name__, self.uid, self.request.data) else: - _log_exception(e) + if _log_exception(e): + _debugging.failures.record(self.controller.chain_id, e, type(self).__name__, self.uid, self.request.data) await (self.bisect_and_retry(e) if self.should_retry(e) else self.spoof_response(e)) # type: ignore [misc] except Exception as e: - _log_exception(e) + if _log_exception(e): + _debugging.failures.record(self.controller.chain_id, e, type(self).__name__, self.uid, self.request.data) await (self.bisect_and_retry(e) if self.should_retry(e) else self.spoof_response(e)) # type: ignore [misc] demo_logger.info(f'request {rid} for multicall {self.bid} complete') # type: ignore @@ -697,7 +700,8 @@ async def get_response(self) -> None: self.adjust_batch_size() await self.bisect_and_retry(e) except Exception as e: - _log_exception(e) + if _log_exception(e): + _debugging.failures.record(self.controller.chain_id, e, type(self).__name__, self.uid, self.data) stats.log_errd_batch(self) if self.should_retry(e): await self.bisect_and_retry(e) @@ -728,10 +732,14 @@ async def post(self) -> List[RawResponse]: elif 'broken pipe' in str(e).lower(): logger.warning("This is what broke the pipe: %s", self.method_counts) logger.debug("caught %s for %s, reraising", e, self) + if ENVS.DEBUG: + _debugging.failures.record(self.controller.chain_id, e, type(self).__name__, self.uid, self.data) raise e except Exception as e: if 'broken pipe' in str(e).lower(): logger.warning("This is what broke the pipe: %s", self.method_counts) + if ENVS.DEBUG: + _debugging.failures.record(self.controller.chain_id, e, type(self).__name__, self.uid, self.data) raise e # NOTE: A successful response will be a list of `RawResponse` objects. # A single `PartialResponse` implies an error. @@ -747,6 +755,9 @@ async def post(self) -> List[RawResponse]: logger.debug("your node says the partial request was invalid but its okay, we can use the full jsonrpc spec instead") elif response.error.message == "batch limit exceeded": self.adjust_batch_size() + + if ENVS.DEBUG: + _debugging.failures.record(self.controller.chain_id, response.exception, type(self).__name__, self.uid, self.data) raise response.exception def should_retry(self, e: Exception) -> bool: @@ -819,7 +830,7 @@ def _post_future_cleanup(self) -> None: with self.controller.pools_closed_lock: self.controller.pending_rpc_calls = JSONRPCBatch(self.controller) -def _log_exception(e: Exception) -> None: +def _log_exception(e: Exception) -> bool: # NOTE: These errors are expected during normal use and are not indicative of any problem(s). No need to log them. # TODO: Better filter what we choose to log here dont_need_to_see_errs = constants.RETRY_ERRS + ['out of gas', 'non_empty_data', 'exceeding --rpc.returndata.limit', "'code': 429", 'payload too large'] @@ -830,9 +841,12 @@ def _log_exception(e: Exception) -> None: # We pass these down to the call they originated from *INDIVIDUAL_CALL_REVERT_STRINGS ] + + stre = str(e).lower() if any(err in stre for err in dont_need_to_see_errs): - return + return ENVS.DEBUG logger.warning("The following exception is being logged for informational purposes and does not indicate failure:") logger.warning(e, exc_info=True) + return ENVS.DEBUG \ No newline at end of file diff --git a/dank_mids/controller.py b/dank_mids/controller.py index a5a180a4..51f87834 100644 --- a/dank_mids/controller.py +++ b/dank_mids/controller.py @@ -16,7 +16,7 @@ from web3.types import RPCEndpoint, RPCResponse from dank_mids import ENVIRONMENT_VARIABLES as ENVS -from dank_mids import constants +from dank_mids import _debugging, constants from dank_mids._batch import DankBatch from dank_mids._demo_mode import demo_logger from dank_mids._exceptions import DankMidsInternalError @@ -133,7 +133,12 @@ async def __call__(self, method: RPCEndpoint, params: Any) -> RPCResponse: @eth_retry.auto_retry async def make_request(self, method: str, params: List[Any], request_id: Optional[int] = None) -> RawResponse: request = self.request_type(method=method, params=params, id=request_id or self.call_uid.next) - return await _session.post(self.endpoint, data=request, loads=_decode.raw) + try: + return await _session.post(self.endpoint, data=request, loads=_decode.raw) + except Exception as e: + if ENVS.DEBUG: + _debugging.failures.record(self.chain_id, e, "eth_call" if method == "eth_call" else "RPCRequest", "unknown", request.data) + raise async def execute_batch(self) -> None: with self.pools_closed_lock: # Do we really need this? # NOTE: yes we do diff --git a/requirements.txt b/requirements.txt index ad6d4873..3c7eb796 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,6 @@ +aiofiles eth_retry>=0.1.15,<0.2 -ez-a-sync>=0.6.3 +ez-a-sync>=0.19.4 msgspec multicall>=0.6.2,<1 typed-envs>=0.0.2