Skip to content

Commit

Permalink
chore: black . (#271)
Browse files Browse the repository at this point in the history
  • Loading branch information
BobTheBuidler authored Nov 8, 2024
1 parent 5b1b2d4 commit ae9ca0d
Show file tree
Hide file tree
Showing 16 changed files with 854 additions and 377 deletions.
122 changes: 101 additions & 21 deletions dank_mids/ENVIRONMENT_VARIABLES.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
logger = logging.getLogger("dank_mids.envs")

if not typed_envs.logger.disabled:
logger.info("For your information, you can tweak your configuration for optimal performance using any of the envs below:")
logger.info(
"For your information, you can tweak your configuration for optimal performance using any of the envs below:"
)

###############
# ENVIRONMENT #
Expand All @@ -34,27 +36,43 @@
DEMO_MODE = _envs.create_env("DEMO_MODE", bool, default=demo_mode, verbose=False)

# Are you calling a ganache fork? Can't use state override code
ganache_fork = _envs._deprecated_format.create_env("GANACHE_FORK", bool, default=False, verbose=False)
ganache_fork = _envs._deprecated_format.create_env(
"GANACHE_FORK", bool, default=False, verbose=False
)
GANACHE_FORK = _envs.create_env("GANACHE_FORK", bool, default=ganache_fork)
"""Flag indicating whether the current environment is a Ganache fork."""

# We set the default to 20 minutes to account for potentially long event loop times if you're doing serious work.
AIOHTTP_TIMEOUT = _envs.create_env("AIOHTTP_TIMEOUT", int, default=20*60, string_converter=int)
AIOHTTP_TIMEOUT = _envs.create_env("AIOHTTP_TIMEOUT", int, default=20 * 60, string_converter=int)
"""Timeout value in seconds for aiohttp requests."""

# Brownie call Semaphore
# Used because I experienced some OOM errs due to web3 formatters when I was batching an absurd number of brownie calls.
# We need a separate semaphore here because the method-specific semaphores are too late in the code to prevent this OOM issue.
brownie_semaphore = _envs._deprecated_format.create_env("BROWNIE_CALL_SEMAPHORE", int, default=100_000, string_converter=int, verbose=False)
BROWNIE_CALL_SEMAPHORE = _envs.create_env("BROWNIE_CALL_SEMAPHORE", BlockSemaphore, default=brownie_semaphore, string_converter=int, verbose=not OPERATION_MODE.infura)
brownie_semaphore = _envs._deprecated_format.create_env(
"BROWNIE_CALL_SEMAPHORE", int, default=100_000, string_converter=int, verbose=False
)
BROWNIE_CALL_SEMAPHORE = _envs.create_env(
"BROWNIE_CALL_SEMAPHORE",
BlockSemaphore,
default=brownie_semaphore,
string_converter=int,
verbose=not OPERATION_MODE.infura,
)
"""
Semaphore for limiting concurrent Brownie calls.
See Also:
:class:`dank_mids.semaphores.BlockSemaphore`: The semaphore class used for concurrency control.
"""

BROWNIE_ENCODER_SEMAPHORE = _envs.create_env("BROWNIE_ENCODER_SEMAPHORE", BlockSemaphore, default=BROWNIE_CALL_SEMAPHORE._default_value * 2, string_converter=int, verbose=not OPERATION_MODE.infura)
BROWNIE_ENCODER_SEMAPHORE = _envs.create_env(
"BROWNIE_ENCODER_SEMAPHORE",
BlockSemaphore,
default=BROWNIE_CALL_SEMAPHORE._default_value * 2,
string_converter=int,
verbose=not OPERATION_MODE.infura,
)
"""
Semaphore for limiting concurrent Brownie encoding operations. This limits memory consumption.
Expand All @@ -65,31 +83,55 @@
# Processes for decoding. This determines process pool size, not total subprocess count.
# There are 3 pools, each initialized with the same value.
# NOTE: Don't stress, these are good for you and will not hog your cpu. You can disable them by setting the var = 0. #TODO: lol u cant yet
BROWNIE_ENCODER_PROCESSES = _envs.create_env("BROWNIE_ENCODER_PROCESSES", AsyncProcessPoolExecutor, default=0 if OPERATION_MODE.infura else 1, string_converter=int, verbose=not OPERATION_MODE.infura)
BROWNIE_ENCODER_PROCESSES = _envs.create_env(
"BROWNIE_ENCODER_PROCESSES",
AsyncProcessPoolExecutor,
default=0 if OPERATION_MODE.infura else 1,
string_converter=int,
verbose=not OPERATION_MODE.infura,
)
"""
Process pool for Brownie encoding operations.
See Also:
:class:`a_sync.AsyncProcessPoolExecutor`: The executor class used for managing asynchronous processes.
"""

BROWNIE_DECODER_PROCESSES = _envs.create_env("BROWNIE_DECODER_PROCESSES", AsyncProcessPoolExecutor, default=0 if OPERATION_MODE.infura else 1, string_converter=int, verbose=not OPERATION_MODE.infura)
BROWNIE_DECODER_PROCESSES = _envs.create_env(
"BROWNIE_DECODER_PROCESSES",
AsyncProcessPoolExecutor,
default=0 if OPERATION_MODE.infura else 1,
string_converter=int,
verbose=not OPERATION_MODE.infura,
)
"""
Process pool for Brownie decoding operations.
See Also:
:class:`a_sync.AsyncProcessPoolExecutor`: The executor class used for managing asynchronous processes.
"""

MULTICALL_DECODER_PROCESSES = _envs.create_env("MULTICALL_DECODER_PROCESSES", AsyncProcessPoolExecutor, default=0 if OPERATION_MODE.infura else 1, string_converter=int, verbose=not OPERATION_MODE.infura)
MULTICALL_DECODER_PROCESSES = _envs.create_env(
"MULTICALL_DECODER_PROCESSES",
AsyncProcessPoolExecutor,
default=0 if OPERATION_MODE.infura else 1,
string_converter=int,
verbose=not OPERATION_MODE.infura,
)
"""
Process pool for Multicall decoding operations.
See Also:
:class:`a_sync.AsyncProcessPoolExecutor`: The executor class used for managing asynchronous processes.
"""

COLLECTION_FACTOR = _envs.create_env("COLLECTION_FACTOR", int, default=10 if OPERATION_MODE.infura else 1, string_converter=int, verbose=not OPERATION_MODE.infura)
COLLECTION_FACTOR = _envs.create_env(
"COLLECTION_FACTOR",
int,
default=10 if OPERATION_MODE.infura else 1,
string_converter=int,
verbose=not OPERATION_MODE.infura,
)
"""Factor determining the size of data collection operations."""

# We use a modified version of the request spec that doesn't contain unnecessary fields, and switch to the full spec if necessary for your node.
Expand All @@ -102,32 +144,70 @@
# TODO: implement this
EXPORT_STATS = _envs.create_env("EXPORT_STATS", bool, default=False, verbose=False)
# NOTE: COLLECT_STATS is implemented
COLLECT_STATS = _envs.create_env("COLLECT_STATS", bool, default=EXPORT_STATS, verbose=not EXPORT_STATS)
COLLECT_STATS = _envs.create_env(
"COLLECT_STATS", bool, default=EXPORT_STATS, verbose=not EXPORT_STATS
)

# You probably don't need to use this unless you know you need to
STUCK_CALL_TIMEOUT = _envs.create_env("STUCK_CALL_TIMEOUT", int, default=60*60*2)
STUCK_CALL_TIMEOUT = _envs.create_env("STUCK_CALL_TIMEOUT", int, default=60 * 60 * 2)

# Method-specific Semaphores
method_semaphores: Dict[str, a_sync.Semaphore] = {
"eth_call": _envs.create_env("ETH_CALL_SEMAPHORE", BlockSemaphore, default=BROWNIE_CALL_SEMAPHORE._value, string_converter=int, verbose=False),
"eth_getBlock": _envs.create_env("ETH_GETBLOCK_SEMAPHORE", a_sync.Semaphore, default=1_000, string_converter=int, verbose=False),
"eth_getLogs": _envs.create_env("ETH_GETLOGS_SEMAPHORE", a_sync.Semaphore, default=64, string_converter=int, verbose=False),
"eth_getTransaction": _envs.create_env("ETH_GETTRANSACTION_SEMAPHORE", a_sync.Semaphore, default=1_000, string_converter=int, verbose=False),
"eth_getCode": _envs.create_env("ETH_GETCODE_SEMAPHORE", a_sync.Semaphore, default=5_000, string_converter=int, verbose=False),
"eth_call": _envs.create_env(
"ETH_CALL_SEMAPHORE",
BlockSemaphore,
default=BROWNIE_CALL_SEMAPHORE._value,
string_converter=int,
verbose=False,
),
"eth_getBlock": _envs.create_env(
"ETH_GETBLOCK_SEMAPHORE",
a_sync.Semaphore,
default=1_000,
string_converter=int,
verbose=False,
),
"eth_getLogs": _envs.create_env(
"ETH_GETLOGS_SEMAPHORE", a_sync.Semaphore, default=64, string_converter=int, verbose=False
),
"eth_getTransaction": _envs.create_env(
"ETH_GETTRANSACTION_SEMAPHORE",
a_sync.Semaphore,
default=1_000,
string_converter=int,
verbose=False,
),
"eth_getCode": _envs.create_env(
"ETH_GETCODE_SEMAPHORE",
a_sync.Semaphore,
default=5_000,
string_converter=int,
verbose=False,
),
}

if not typed_envs.logger.disabled:
logger.info("More details can be found in dank_mids/ENVIRONMENT_VARIABLES.py")
logger.info("NOTE: You can disable these logs by setting the `TYPEDENVS_SHUTUP` env var to any value.")
logger.info(
"NOTE: You can disable these logs by setting the `TYPEDENVS_SHUTUP` env var to any value."
)


# Validate some stuffs

# NOTE: The other modes are (probably) bugging out right now. More investigation needed. For now you use infura mode.
if not OPERATION_MODE.infura:
logger.warning("Unless you know what you're doing, dank mids should be run in infura mode for now")
logger.warning(
"Unless you know what you're doing, dank mids should be run in infura mode for now"
)

if OPERATION_MODE.infura:
for process_pool in {MULTICALL_DECODER_PROCESSES, BROWNIE_DECODER_PROCESSES, BROWNIE_ENCODER_PROCESSES}:
for process_pool in {
MULTICALL_DECODER_PROCESSES,
BROWNIE_DECODER_PROCESSES,
BROWNIE_ENCODER_PROCESSES,
}:
if process_pool._max_workers:
raise ValueError(f"You cannot set env var {process_pool.name} while running dank in infura mode.")
raise ValueError(
f"You cannot set env var {process_pool.name} while running dank in infura mode."
)
14 changes: 10 additions & 4 deletions dank_mids/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
from contextlib import suppress

from dank_mids.brownie_patch import DankContractCall, DankContractMethod, DankContractTx, DankOverloadedMethod
from dank_mids.brownie_patch import (
DankContractCall,
DankContractMethod,
DankContractTx,
DankOverloadedMethod,
)
from dank_mids.controller import instances
from dank_mids.exceptions import BrownieNotConnectedError
from dank_mids.helpers import setup_dank_w3, setup_dank_w3_from_sync
Expand All @@ -14,21 +19,24 @@
def _configure_concurrent_future_work_queue_size():
"""
Configures the concurrent futures process pool to allow for a larger number of queued calls.
This function increases the EXTRA_QUEUED_CALLS value to 50,000, which allows for more
concurrent operations to be queued in the process pool. This can significantly improve
performance for applications that make heavy use of brownie.
"""
import concurrent.futures.process as _cfp

_cfp.EXTRA_QUEUED_CALLS = 50_000


_configure_concurrent_future_work_queue_size()


# Import brownie objects
__brownie_objects = ["Contract", "dank_web3", "web3", "dank_eth", "eth", "patch_contract"]
with suppress(ImportError):
from dank_mids.brownie_patch import Contract, dank_eth, dank_web3, patch_contract

__all__ += __brownie_objects
# aliased for cleanliness and convenience
web3 = dank_web3
Expand All @@ -46,5 +54,3 @@ def __getattr__(name: str):
if name in __brownie_objects:
raise BrownieNotConnectedError(name)
raise AttributeError(f"module '{__name__}' has no attribute '{name}'")


25 changes: 17 additions & 8 deletions dank_mids/_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

logger = logging.getLogger(__name__)


class DankBatch:
"""
A batch of JSON-RPC batches.
Expand All @@ -32,9 +33,14 @@ class DankBatch:
:class:`dank_mids._requests.RPCRequest`: The RPCRequest class used in this batch.
"""

__slots__ = 'controller', 'multicalls', 'rpc_calls', '_started'
__slots__ = "controller", "multicalls", "rpc_calls", "_started"

def __init__(self, controller: "DankMiddlewareController", multicalls: Multicalls, rpc_calls: List[Union[Multicall, RPCRequest]]):
def __init__(
self,
controller: "DankMiddlewareController",
multicalls: Multicalls,
rpc_calls: List[Union[Multicall, RPCRequest]],
):
self.controller = controller
"""The controller managing this batch."""

Expand All @@ -46,7 +52,7 @@ def __init__(self, controller: "DankMiddlewareController", multicalls: Multicall

self._started = False
"""A flag indicating whether the batch has been started."""

def __await__(self) -> Generator[Any, None, Any]:
"""
Makes the DankBatch awaitable.
Expand All @@ -59,11 +65,11 @@ def __await__(self) -> Generator[Any, None, Any]:
"""
self.start()
return self._await().__await__()

async def _await(self) -> None:
"""
Internal method to await the completion of all coroutines in the batch.
This method gathers all coroutines in the batch and awaits their completion,
logging any exceptions that may occur during execution. It's designed to
handle both successful completions and potential errors gracefully.
Expand All @@ -76,7 +82,10 @@ async def _await(self) -> None:
for batch, result in zip(batches, await asyncio.gather(*batches, return_exceptions=True)):
if isinstance(result, Exception):
if not isinstance(result, DankMidsInternalError):
logger.error(f"That's not good, there was an exception in a {batch.__class__.__name__}. These are supposed to be handled.\n{result}\n", exc_info=True)
logger.error(
f"That's not good, there was an exception in a {batch.__class__.__name__}. These are supposed to be handled.\n{result}\n",
exc_info=True,
)
raise result

def start(self) -> None:
Expand All @@ -96,7 +105,7 @@ def start(self) -> None:
for call in self.rpc_calls:
call.start(self)
self._started = True

@property
def coroutines(self) -> Generator[Union["_Batch", Awaitable[RawResponse]], None, None]:
# Combine multicalls into one or more jsonrpc batches
Expand All @@ -116,7 +125,7 @@ def coroutines(self) -> Generator[Union["_Batch", Awaitable[RawResponse]], None,
if working_batch.is_full:
yield working_batch
working_batch = JSONRPCBatch(self.controller)

rpc_calls_to_batch = self.rpc_calls[:]
while rpc_calls_to_batch:
if working_batch.is_full:
Expand Down
1 change: 1 addition & 0 deletions dank_mids/_demo_mode.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,6 @@ def info(self, *args: Any, **kwargs: Any) -> None:
**kwargs: Keyword arguments (ignored).
"""


# Choose between a real logger and a dummy logger based on the demo mode setting
demo_logger: logging.Logger = logging.getLogger("dank_mids.demo") if ENVIRONMENT_VARIABLES.DEMO_MODE else DummyLogger() # type: ignore [attr-defined, assignment]
1 change: 0 additions & 1 deletion dank_mids/_envs.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

from typed_envs import EnvVarFactory

_factory = EnvVarFactory("DANKMIDS")
Expand Down
Loading

0 comments on commit ae9ca0d

Please sign in to comment.