Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: limit concurrent connections to 32 #131

Merged
merged 3 commits into from
Apr 1, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 13 additions & 9 deletions dank_mids/helpers/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@
from itertools import chain
from threading import get_ident
from random import random
from typing import Any, overload
from typing import Any, Callable, List, Optional, overload

import msgspec
from aiohttp import ClientSession as DefaultClientSession
from aiohttp import ClientTimeout
from aiohttp import ClientTimeout, TCPConnector
from aiohttp.client_exceptions import ClientResponseError
from aiohttp.typedefs import JSONDecoder
from aiolimiter import AsyncLimiter
Expand Down Expand Up @@ -75,10 +75,10 @@ def __new__(cls, value, phrase, description=''):
limiter = AsyncLimiter(5, 0.1) # 50 requests/second

@overload
async def post(endpoint: str, *args, loads = decode.raw, **kwargs) -> RawResponse:...
async def post(endpoint: str, *args, loads: Callable[[Any], RawResponse], **kwargs) -> RawResponse:...
@overload
async def post(endpoint: str, *args, loads = decode.jsonrpc_batch, **kwargs) -> JSONRPCBatchResponse:...
async def post(endpoint: str, *args, loads: JSONDecoder = None, **kwargs) -> Any:
async def post(endpoint: str, *args, loads: Callable[[Any], JSONRPCBatchResponse], **kwargs) -> JSONRPCBatchResponse:...
async def post(endpoint: str, *args, loads: Optional[JSONDecoder] = None, **kwargs) -> Any:
"""Returns decoded json data from `endpoint`"""
session = await get_session()
return await session.post(endpoint, *args, loads=loads, **kwargs)
Expand All @@ -87,7 +87,7 @@ async def get_session() -> "ClientSession":
return await _get_session_for_thread(get_ident())

class ClientSession(DefaultClientSession):
async def post(self, endpoint: str, *args, loads: JSONDecoder = None, _retry_after: int = 1, **kwargs) -> bytes:
async def post(self, endpoint: str, *args, loads: Optional[JSONDecoder] = None, _retry_after: int = 1, **kwargs) -> bytes: # type: ignore [override]
# Process input arguments.
if isinstance(kwargs.get('data'), PartialRequest):
logger.debug("making request for %s", kwargs['data'])
Expand Down Expand Up @@ -134,7 +134,11 @@ async def _get_session_for_thread(thread_ident: int) -> ClientSession:
This makes our ClientSession threadsafe just in case.
Most everything should be run in main thread though.
"""
timeout = ClientTimeout(ENVIRONMENT_VARIABLES.AIOHTTP_TIMEOUT)
return ClientSession(headers={'content-type': 'application/json'}, timeout=timeout, raise_for_status=True)
return ClientSession(
connector = TCPConnector(limit=32),
headers = {'content-type': 'application/json'},
timeout = ClientTimeout(ENVIRONMENT_VARIABLES.AIOHTTP_TIMEOUT), # type: ignore [arg-type]
raise_for_status = True,
)

_limited = []
_limited: List[ClientSession] = []
Loading