Skip to content

Commit

Permalink
feat: optimize _Batch.calls property (#287)
Browse files Browse the repository at this point in the history
* feat: optimize _Batch.calls property

* chore: `black .`

* fix: TypeErrors

* chore: `black .`

---------

Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
BobTheBuidler and github-actions[bot] authored Nov 18, 2024
1 parent 5da57dc commit 0487fa4
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 51 deletions.
2 changes: 1 addition & 1 deletion dank_mids/_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ def coroutines(self) -> Generator[Union["_Batch", Awaitable[RawResponse]], None,
yield working_batch
working_batch = JSONRPCBatch(self.controller)

rpc_calls_to_batch = self.rpc_calls[:]
rpc_calls_to_batch = list(self.rpc_calls)
while rpc_calls_to_batch:
if working_batch.is_full:
yield working_batch
Expand Down
92 changes: 42 additions & 50 deletions dank_mids/_requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,9 +271,7 @@ async def get_response(self) -> RPCResponse: # type: ignore [override]
logger.debug(
"your node says the partial request was invalid but its okay, we can use the full jsonrpc spec instead"
)
method = self.method
if self.raw:
method += "_raw"
method = f"{self.method}_raw" if self.raw else self.method
return await self.controller(method, self.params)

error = dict(response.error.items())
Expand Down Expand Up @@ -377,10 +375,8 @@ async def create_duplicate(
# Creating the task before awaiting the new call ensures the new call will grab the semaphore immediately
# and then the task will try to acquire at the very next event loop _run_once cycle
logger.warning("%s got stuck, we're creating a new one", self)
method = self.method
if self.raw:
method += "_raw"
retval = await self.controller(method, self.params)
method = f"{self.method}_raw" if self.raw else self.method
retval = await self.controller(method, self.params) # type: ignore [arg-type]
await self.semaphore.acquire()
assert "result" in retval or "error" in retval, (retval, type(retval))
return retval
Expand Down Expand Up @@ -511,42 +507,37 @@ def __init__(self, controller: "DankMiddlewareController", calls: Iterable[_Requ
super().__init__()

def __bool__(self) -> bool:
return bool(self.calls)
try:
next(self.calls)
return True
except StopIteration:
return False

@overload
def __getitem__(self, ix: int) -> _Request: ...
@overload
def __getitem__(self, ix: slice) -> List[_Request]: ...
def __getitem__(self, ix: Union[int, slice]) -> Union[_Request, List[_Request]]:
return self.calls[ix]
def __getitem__(self, ix: slice) -> Tuple[_Request, ...]: ...
def __getitem__(self, ix: Union[int, slice]) -> Union[_Request, Tuple[_Request, ...]]:
return tuple(self.calls)[ix]

def __iter__(self) -> Iterator[_Request]:
return iter(self.calls)
return self.calls

def __len__(self) -> int:
return len(self.calls)
return sum(1 for _ in self.calls)

@property
def calls(self) -> List[_Request]:
def calls(self) -> Iterator[_Request]:
"Returns a list of calls. Creates a temporary strong reference to each call in the batch, if it still exists."
return [call for ref in self._calls if (call := ref())]
return (call for ref in self._calls if (call := ref()))

@property
def halfpoint(self) -> int:
return len(self) // 2

@property
def bisected(self) -> Generator[List[_Request], None, None]:
yield self.chunk0
yield self.chunk1

@property
def chunk0(self) -> List[_Request]:
return self.calls[: self.halfpoint]

@property
def chunk1(self) -> List[_Request]:
return self.calls[self.halfpoint :]
def bisected(self) -> Generator[Tuple[_Request, ...], None, None]:
# set `self.calls` output to var so its only computed once
calls = tuple(self.calls)
half = len(calls) // 2
yield calls[:half]
yield calls[half:]

@property
def is_full(self) -> bool:
Expand Down Expand Up @@ -642,7 +633,7 @@ def __repr__(self) -> str:

@cached_property
def block(self) -> BlockId:
return self.calls[0].block
return next(self.calls).block

@property
def calldata(self) -> str:
Expand Down Expand Up @@ -688,7 +679,7 @@ async def get_response(self) -> None: # type: ignore [override]
demo_logger.info(f"request {rid} for multicall {self.bid} starting") # type: ignore
try:
# create a strong ref to all calls we will execute so they cant get gced mid execution and mess up response ordering
calls = self.calls
calls = tuple(self.calls)
await self.spoof_response(
await self.controller.make_request(self.method, self.params, request_id=self.uid),
calls,
Expand All @@ -707,16 +698,15 @@ async def get_response(self) -> None: # type: ignore [override]
len(self),
self.request.data,
)
else:
if _log_exception(e):
_debugging.failures.record(
self.controller.chain_id,
e,
type(self).__name__,
self.uid,
len(self),
self.request.data,
)
elif _log_exception(e):
_debugging.failures.record(
self.controller.chain_id,
e,
type(self).__name__,
self.uid,
len(self),
self.request.data,
)
await (self.bisect_and_retry(e) if self.should_retry(e) else self.spoof_response(e)) # type: ignore [misc]
except Exception as e:
if _log_exception(e):
Expand Down Expand Up @@ -753,7 +743,7 @@ async def spoof_response(
) -> None:
# NOTE: we pass in the calls to create a strong reference so when we zip up the results everything gets to the right place
if calls is None:
calls = self.calls
calls = tuple(self.calls)
# This happens if an Exception takes place during a singular Multicall request.
if isinstance(data, Exception):
logger.debug("%s had Exception %s", self, data)
Expand Down Expand Up @@ -829,7 +819,7 @@ async def bisect_and_retry(self, e: Exception) -> List[RPCResponse]:

@set_done
async def _exec_single_call(self) -> None:
await self.calls[0].make_request()
await next(self.calls).make_request()

def _post_future_cleanup(self) -> None:
with suppress(KeyError):
Expand Down Expand Up @@ -977,7 +967,7 @@ async def get_response(self) -> None: # type: ignore [override]
# I include this elif clause as a failsafe. This is rare and should not impact performance.
elif not self.is_single_multicall:
# Just to force my IDE to resolve types correctly
calls: List[RPCRequest] = self.calls # type: ignore [assignment]
calls: Tuple[RPCRequest, ...] = tuple(self.calls) # type: ignore [arg-type]
logger.debug(
"%s had exception %s, aborting and setting Exception as call._response", self, e
)
Expand Down Expand Up @@ -1009,9 +999,9 @@ async def post(self) -> Tuple[List[RawResponse], List[Union[Multicall, RPCReques
"""
try:
# we need strong refs so the results all get to the right place
calls = self.calls
calls = tuple(self.calls)
# for the multicalls too
mcall_calls_strong_refs = [call.calls for call in filter(lambda x: isinstance(x, Multicall), calls)] # type: ignore [union-attr]
mcall_calls_strong_refs = tuple(tuple(call.calls) for call in calls if isinstance(call, Multicall)) # type: ignore [union-attr]
response: JSONRPCBatchResponse = await _session.post(
self.controller.endpoint, data=self.data, loads=_codec.decode_jsonrpc_batch
)
Expand Down Expand Up @@ -1084,7 +1074,9 @@ def should_retry(self, e: Exception) -> bool:
return self.is_single_multicall

@set_done
async def spoof_response(self, response: List[RawResponse], calls: List[RPCRequest]) -> None:
async def spoof_response(
self, response: List[RawResponse], calls: Tuple[RPCRequest, ...]
) -> None:
"""
Process the responses from the Ethereum node and set the results for each call.
Expand All @@ -1101,7 +1093,7 @@ async def spoof_response(self, response: List[RawResponse], calls: List[RPCReque
if self.controller._sort_calls:
# NOTE: these providers don't always return batch results in the correct ordering
# NOTE: is it maybe because they
calls.sort(key=lambda call: call.uid)
calls = sorted(calls, key=lambda call: call.uid) # type: ignore [assignment]

if self.controller._sort_response:
response.sort(key=lambda raw: raw.decode().id)
Expand Down Expand Up @@ -1140,7 +1132,7 @@ async def bisect_and_retry(self, e: Exception) -> None:
logger.debug("%s had exception %s, retrying", self, e)
batches = [
(
Multicall(self.controller, chunk[0].calls, f"json{self.jid}_{i}") # type: ignore [misc]
Multicall(self.controller, tuple(chunk[0].calls), f"json{self.jid}_{i}") # type: ignore [misc]
if len(chunk) == 1 and isinstance(chunk[0], Multicall)
else JSONRPCBatch(self.controller, chunk, f"{self.jid}_{i}")
)
Expand Down

0 comments on commit 0487fa4

Please sign in to comment.