Skip to content

Commit

Permalink
feat: DANKMIDS_DEBUG env to record failures to csv (#175)
Browse files Browse the repository at this point in the history
  • Loading branch information
BobTheBuidler authored Apr 20, 2024
1 parent eedfb20 commit f2e775a
Show file tree
Hide file tree
Showing 7 changed files with 125 additions and 9 deletions.
1 change: 1 addition & 0 deletions dank_mids/ENVIRONMENT_VARIABLES.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
# Set this env var to any value to force the full request spec always
USE_FULL_REQUEST = _envs.create_env("USE_FULL_REQUEST", bool, default=False, verbose=False)

DEBUG = _envs.create_env("DEBUG", bool, default=False, verbose=False)
# NOTE: EXPORT_STATS is not implemented
# TODO: implement this
EXPORT_STATS = _envs.create_env("EXPORT_STATS", bool, default=False, verbose=False)
Expand Down
5 changes: 5 additions & 0 deletions dank_mids/_debugging/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
"""This module is private for now but will eventually be made public when the api ossifies"""

from dank_mids._debugging import failures

__all__ = ['failures']
59 changes: 59 additions & 0 deletions dank_mids/_debugging/_base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@

import abc
import logging
import os
from functools import cached_property, lru_cache
from typing import Any, Iterable

import aiofiles
from aiofiles.base import AiofilesContextManager
from aiofiles.threadpool.text import AsyncTextIOWrapper
from async_lru import alru_cache

logger = logging.getLogger("dank_mids.debugging")

class _FileHelper(metaclass=abc.ABCMeta):
path = f"{os.path.expanduser( '~' )}/.dank_mids/debug"
def __init__(self, chainid: int):
if not isinstance(chainid, int):
raise TypeError(f"`chainid` must be an integer. You passed {chainid}") from None
self.chainid = chainid
self.path = self.path + f"/{self.chainid}"
self.ensure_dir()
@lru_cache(maxsize=1)
def ensure_dir(self) -> None:
os.makedirs(self.path, exist_ok=True)
def open(self) -> "AiofilesContextManager[None, None, AsyncTextIOWrapper]":
logger.info("opening %s with mode %s", self.uri, self.mode)
return aiofiles.open(self.uri, self.mode)
@abc.abstractproperty
def uri(self) -> str:
...
@abc.abstractproperty
def mode(self) -> str:
...

class _CSVWriter(_FileHelper):
mode = "a"
@cached_property
def uri(self) -> str:
return f"{self.path}/{self.filename}"
async def write_row(self, *values: Any) -> None:
await self._ensure_headers()
await self._write_row(*values)
@alru_cache(maxsize=None)
async def _ensure_headers(self) -> None:
await self._write_row(*self.column_names, new_line=False)
async def _write_row(self, *values: Any, new_line: bool = True) -> None:
row = ','.join(str(obj) for obj in values)
if new_line:
row = f'\n{row}'
async with self.open() as file:
logger.debug("writing row %s to file %s", row, file)
await file.write(row)
@abc.abstractproperty
def filename(self) -> str:
...
@abc.abstractproperty
def column_names(self) -> Iterable[str]:
...
31 changes: 31 additions & 0 deletions dank_mids/_debugging/failures.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@

from datetime import datetime
from functools import cached_property, lru_cache
from typing import TYPE_CHECKING, List, Type, Union

from a_sync import ProcessingQueue

from dank_mids._debugging._base import _CSVWriter

if TYPE_CHECKING:
from dank_mids.types import PartialRequest, Request

@lru_cache(maxsize=None)
class FailedRequestWriter(_CSVWriter):
column_names = "request_type", "request_uid", "error", "request_data"
def __init__(self, chainid: int, failure_type: Type[BaseException]):
super().__init__(chainid)
if not issubclass(failure_type, BaseException):
raise TypeError(f"`failure_type` must be an Exception type. You passed {failure_type}")
self.failure_type = failure_type
self.record_failure = ProcessingQueue(self._record_failure, num_workers=1, return_data=True)
@cached_property
def filename(self) -> str:
return f"{int(datetime.now().timestamp())}_{self.failure_type.__name__}s.csv"
async def _record_failure(self, e: Exception, request_type: str, request_uid: Union[str, int], request_data: Union[List["Request"], List["PartialRequest"], bytes]):
if not isinstance(e, self.failure_type):
raise TypeError(e, self.failure_type)
await self.write_row(request_type, request_uid, e, request_data)

def record(chainid: int, e: Exception, request_type: str, request_uid: Union[int, str], request_data: Union[List["Request"], List["PartialRequest"], bytes]) -> None:
FailedRequestWriter(chainid, type(e)).record_failure(e, request_type, request_uid, request_data)
26 changes: 20 additions & 6 deletions dank_mids/_requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from web3.types import RPCEndpoint, RPCResponse

from dank_mids import ENVIRONMENT_VARIABLES as ENVS
from dank_mids import constants, stats
from dank_mids import _debugging, constants, stats
from dank_mids._demo_mode import demo_logger
from dank_mids._exceptions import (BadResponse, DankMidsClientResponseError,
DankMidsInternalError, EmptyBatch,
Expand Down Expand Up @@ -521,11 +521,14 @@ async def get_response(self) -> None:
if e.message == "Payload Too Large":
logger.info("Payload too large. response headers: %s", e.headers)
self.controller.reduce_multicall_size(len(self))
_debugging.failures.record(self.controller.chain_id, e, type(self).__name__, self.uid, self.request.data)
else:
_log_exception(e)
if _log_exception(e):
_debugging.failures.record(self.controller.chain_id, e, type(self).__name__, self.uid, self.request.data)
await (self.bisect_and_retry(e) if self.should_retry(e) else self.spoof_response(e)) # type: ignore [misc]
except Exception as e:
_log_exception(e)
if _log_exception(e):
_debugging.failures.record(self.controller.chain_id, e, type(self).__name__, self.uid, self.request.data)
await (self.bisect_and_retry(e) if self.should_retry(e) else self.spoof_response(e)) # type: ignore [misc]
demo_logger.info(f'request {rid} for multicall {self.bid} complete') # type: ignore

Expand Down Expand Up @@ -697,7 +700,8 @@ async def get_response(self) -> None:
self.adjust_batch_size()
await self.bisect_and_retry(e)
except Exception as e:
_log_exception(e)
if _log_exception(e):
_debugging.failures.record(self.controller.chain_id, e, type(self).__name__, self.uid, self.data)
stats.log_errd_batch(self)
if self.should_retry(e):
await self.bisect_and_retry(e)
Expand Down Expand Up @@ -728,10 +732,14 @@ async def post(self) -> List[RawResponse]:
elif 'broken pipe' in str(e).lower():
logger.warning("This is what broke the pipe: %s", self.method_counts)
logger.debug("caught %s for %s, reraising", e, self)
if ENVS.DEBUG:
_debugging.failures.record(self.controller.chain_id, e, type(self).__name__, self.uid, self.data)
raise e
except Exception as e:
if 'broken pipe' in str(e).lower():
logger.warning("This is what broke the pipe: %s", self.method_counts)
if ENVS.DEBUG:
_debugging.failures.record(self.controller.chain_id, e, type(self).__name__, self.uid, self.data)
raise e
# NOTE: A successful response will be a list of `RawResponse` objects.
# A single `PartialResponse` implies an error.
Expand All @@ -747,6 +755,9 @@ async def post(self) -> List[RawResponse]:
logger.debug("your node says the partial request was invalid but its okay, we can use the full jsonrpc spec instead")
elif response.error.message == "batch limit exceeded":
self.adjust_batch_size()

if ENVS.DEBUG:
_debugging.failures.record(self.controller.chain_id, response.exception, type(self).__name__, self.uid, self.data)
raise response.exception

def should_retry(self, e: Exception) -> bool:
Expand Down Expand Up @@ -819,7 +830,7 @@ def _post_future_cleanup(self) -> None:
with self.controller.pools_closed_lock:
self.controller.pending_rpc_calls = JSONRPCBatch(self.controller)

def _log_exception(e: Exception) -> None:
def _log_exception(e: Exception) -> bool:
# NOTE: These errors are expected during normal use and are not indicative of any problem(s). No need to log them.
# TODO: Better filter what we choose to log here
dont_need_to_see_errs = constants.RETRY_ERRS + ['out of gas', 'non_empty_data', 'exceeding --rpc.returndata.limit', "'code': 429", 'payload too large']
Expand All @@ -830,9 +841,12 @@ def _log_exception(e: Exception) -> None:
# We pass these down to the call they originated from
*INDIVIDUAL_CALL_REVERT_STRINGS
]



stre = str(e).lower()
if any(err in stre for err in dont_need_to_see_errs):
return
return ENVS.DEBUG
logger.warning("The following exception is being logged for informational purposes and does not indicate failure:")
logger.warning(e, exc_info=True)
return ENVS.DEBUG
9 changes: 7 additions & 2 deletions dank_mids/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from web3.types import RPCEndpoint, RPCResponse

from dank_mids import ENVIRONMENT_VARIABLES as ENVS
from dank_mids import constants
from dank_mids import _debugging, constants
from dank_mids._batch import DankBatch
from dank_mids._demo_mode import demo_logger
from dank_mids._exceptions import DankMidsInternalError
Expand Down Expand Up @@ -133,7 +133,12 @@ async def __call__(self, method: RPCEndpoint, params: Any) -> RPCResponse:
@eth_retry.auto_retry
async def make_request(self, method: str, params: List[Any], request_id: Optional[int] = None) -> RawResponse:
request = self.request_type(method=method, params=params, id=request_id or self.call_uid.next)
return await _session.post(self.endpoint, data=request, loads=_decode.raw)
try:
return await _session.post(self.endpoint, data=request, loads=_decode.raw)
except Exception as e:
if ENVS.DEBUG:
_debugging.failures.record(self.chain_id, e, "eth_call" if method == "eth_call" else "RPCRequest", "unknown", request.data)
raise

async def execute_batch(self) -> None:
with self.pools_closed_lock: # Do we really need this? # NOTE: yes we do
Expand Down
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
aiofiles
eth_retry>=0.1.15,<0.2
ez-a-sync>=0.6.3
ez-a-sync>=0.19.4
msgspec
multicall>=0.6.2,<1
typed-envs>=0.0.2
Expand Down

0 comments on commit f2e775a

Please sign in to comment.