From 9eed59ab1fa35f339021390a34825f5febe3c537 Mon Sep 17 00:00:00 2001 From: Marrony Neris Date: Thu, 7 Mar 2024 12:58:01 -0800 Subject: [PATCH] Reuse Retryable class --- fauna/client/client.py | 22 ++++++++++++++-------- fauna/client/retryable.py | 1 - 2 files changed, 14 insertions(+), 9 deletions(-) diff --git a/fauna/client/client.py b/fauna/client/client.py index 021ce855..eebf08ef 100644 --- a/fauna/client/client.py +++ b/fauna/client/client.py @@ -393,7 +393,7 @@ def stream(self, fql: Union[StreamToken, Query]) -> "StreamIterator": headers[_Header.Authorization] = self._auth.bearer() return StreamIterator(self._session, headers, self._endpoint + "/stream/1", - token) + self._max_attempts, self._max_backoff, token) def _check_protocol(self, response_json: Any, status_code): # TODO: Logic to validate wire protocol belongs elsewhere. @@ -602,10 +602,14 @@ def _set_endpoint(self, endpoint): class StreamIterator: """A class that mixes a ContextManager and an Iterator so we can detected retryable errors.""" - def __init__(self, http_client, headers, endpoint, token): + def __init__(self, http_client: HTTPClient, headers: Dict[str, + str], endpoint: str, + max_attempts: int, max_backoff: int, token: StreamToken): self.http_client = http_client self.headers = headers self.endpoint = endpoint + self.max_attempts = max_attempts + self.max_backoff = max_backoff self.token = token self.stream = None self.last_ts = None @@ -624,6 +628,11 @@ def __iter__(self): return self def __next__(self): + retryable = Retryable(self.max_attempts, self.max_backoff, + self._next_element) + return retryable.run().response + + def _next_element(self): try: if not self.error and self.stream is not None: event: Any = FaunaDecoder.decode(next(self.stream)) @@ -633,7 +642,9 @@ def __next__(self): raise StopIteration except NetworkError as e: - return self._retry() + self.ctx = self._create_stream() + self.stream = self.ctx.__enter__() + raise RetryableFaunaException from e def _create_stream(self): data: Dict[str, Any] = {"token": self.token.token} @@ -646,11 +657,6 @@ def _create_stream(self): data=data, ) - def _retry(self): - self.ctx = self._create_stream() - self.stream = self.ctx.__enter__() - return self.__next__() - def close(self): if self.stream is not None: self.stream.close() diff --git a/fauna/client/retryable.py b/fauna/client/retryable.py index 3d5e184f..466f970c 100644 --- a/fauna/client/retryable.py +++ b/fauna/client/retryable.py @@ -62,7 +62,6 @@ def run(self) -> RetryableResponse: Returns the number of attempts and the response """ - err: Optional[RetryableFaunaException] = None attempt = 0 while True: sleep_time = 0.0 if attempt == 0 else self._strategy.wait()