Skip to content

Commit

Permalink
fix error handling on retries and abort on error events
Browse files Browse the repository at this point in the history
  • Loading branch information
marrony committed Mar 9, 2024
1 parent c956ed1 commit 8ab264e
Show file tree
Hide file tree
Showing 7 changed files with 109 additions and 65 deletions.
60 changes: 33 additions & 27 deletions fauna/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from fauna.errors import AuthenticationError, ClientError, ProtocolError, ServiceError, AuthorizationError, \
ServiceInternalError, ServiceTimeoutError, ThrottlingError, QueryTimeoutError, QueryRuntimeError, \
QueryCheckError, ContendedTransactionError, AbortError, InvalidRequestError, RetryableFaunaException, \
NetworkError, StreamTimeout
NetworkError, StreamError
from fauna.client.headers import _DriverEnvironment, _Header, _Auth, Header
from fauna.http.http_client import HTTPClient
from fauna.query import Query, Page, fql
Expand Down Expand Up @@ -56,12 +56,10 @@ class StreamOptions:
"""
A dataclass representing options available for a stream.
* idle_timeout - Controls the maximum amount of time the driver will wait on stream idling.
* retry_on_timeout - If true, streaming is reconnected on timeouts.
* max_attempts - The maximum number of times to attempt a stream query when a retryable exception is thrown.
"""

idle_timeout: Optional[timedelta] = None
retry_on_timeout: bool = True
max_attempts: Optional[int] = None


class Client:
Expand Down Expand Up @@ -635,61 +633,69 @@ def __init__(self, http_client: HTTPClient, headers: Dict[str, str],
self.ctx = self._create_stream()

def __enter__(self):
self.stream = self.ctx.__enter__()
return self

def __exit__(self, exc_type, exc_value, exc_traceback):
if self.stream is not None:
self.stream.close()

self.ctx.__exit__(exc_type, exc_value, exc_traceback)
return True
return False

def __iter__(self):
return self

def __next__(self):
retryable = Retryable[Any](self.max_attempts, self.max_backoff,
if self.opts.max_attempts is not None:
max_attempts = self.opts.max_attempts
else:
max_attempts = self.max_attempts

retryable = Retryable[Any](max_attempts, self.max_backoff,
self._next_element)
return retryable.run().response

def _next_element(self):
try:
if not self.error and self.stream is not None:
if self.stream is None:
try:
self.stream = self.ctx.__enter__()
except Exception as ex:
self._retry_stream(ex)

if self.stream is not None:
event: Any = FaunaDecoder.decode(next(self.stream))
self.last_ts = event["ts"]
self.error = event["type"] == "error"
if event["type"] == "error":
# todo: parse error
raise StreamError
return event

raise StopIteration
except StreamTimeout as e:
if self.opts.retry_on_timeout:
self._retry_stream(e)
raise StopIteration

except NetworkError as e:
self._retry_stream(e)

def _retry_stream(self, e):
if self.stream is not None:
self.stream.close()
self.ctx = self._create_stream()
self.stream = self.ctx.__enter__()

self.stream = None

self.ctx.__exit__(None, None, None)

try:
self.ctx = self._create_stream()
except Exception as ex:
pass
raise RetryableFaunaException from e

def _create_stream(self):
if self.opts.idle_timeout:
timeout = self.opts.idle_timeout.total_seconds()
else:
timeout = None

data: Dict[str, Any] = {"token": self.token.token}
if self.last_ts is not None:
data["start_ts"] = self.last_ts

return self.http_client.stream(
url=self.endpoint,
headers=self.headers,
data=data,
timeout=timeout,
)
url=self.endpoint, headers=self.headers, data=data)

def close(self):
if self.stream is not None:
Expand Down
2 changes: 1 addition & 1 deletion fauna/errors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@
from .errors import ProtocolError, ServiceError
from .errors import AuthenticationError, AuthorizationError, QueryCheckError, QueryRuntimeError, \
QueryTimeoutError, ServiceInternalError, ServiceTimeoutError, ThrottlingError, ContendedTransactionError, \
InvalidRequestError, AbortError, RetryableFaunaException, StreamTimeout
InvalidRequestError, AbortError, RetryableFaunaException, StreamError
4 changes: 2 additions & 2 deletions fauna/errors/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ class ClientError(FaunaException):
pass


class StreamTimeout(FaunaException):
"""An error representing Straming timeouts."""
class StreamError(FaunaException):
"""An error representing Stream failure."""
pass


Expand Down
1 change: 0 additions & 1 deletion fauna/http/http_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ def stream(
url: str,
headers: Mapping[str, str],
data: Mapping[str, Any],
timeout: Optional[float],
) -> Iterator[Any]:
pass

Expand Down
12 changes: 4 additions & 8 deletions fauna/http/httpx_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

import httpx

from fauna.errors import ClientError, NetworkError, StreamTimeout
from fauna.errors import ClientError, NetworkError
from fauna.http.http_client import HTTPResponse, HTTPClient


Expand Down Expand Up @@ -100,21 +100,17 @@ def stream(
url: str,
headers: Mapping[str, str],
data: Mapping[str, Any],
timeout: Optional[float] = None,
) -> Iterator[Any]:
stream = self._c.stream(
"POST", url=url, headers=headers, json=data, timeout=timeout)
with stream as response:
with self._c.stream(
"POST", url=url, headers=headers, json=data) as response:
yield self._transform(response)

def _transform(self, response):
try:
for line in response.iter_lines():
yield json.loads(line)
except httpx.StreamError as e:
raise StopIteration
except httpx.ReadTimeout as e:
raise StreamTimeout("Stream timeout") from e
raise NetworkError("Stream timeout") from e
except (httpx.HTTPError, httpx.InvalidURL) as e:
raise NetworkError("Exception re-raised from HTTP request") from e

Expand Down
74 changes: 57 additions & 17 deletions tests/integration/test_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,32 @@
from datetime import timedelta

import pytest
import httpx
import fauna

from fauna import fql
from fauna.client import StreamOptions
from fauna.client import Client, StreamOptions
from fauna.http.httpx_client import HTTPXClient
from fauna.errors import NetworkError, RetryableFaunaException


def take(stream, count):
i = iter(stream)

while count > 0:
count -= 1
yield next(i)


def test_stream(client, a_collection):

opts = StreamOptions(
idle_timeout=timedelta(seconds=1), retry_on_timeout=False)
events = [[]]

def thread_fn():
stream = client.stream(
fql("${coll}.all().toStream()", coll=a_collection), opts)
stream = client.stream(fql("${coll}.all().toStream()", coll=a_collection))

with stream as iter:
events = [evt["type"] for evt in iter]

assert events == ["start", "add", "remove", "add"]
events[0] = [evt["type"] for evt in take(iter, 4)]

stream_thread = threading.Thread(target=thread_fn)
stream_thread.start()
Expand All @@ -29,26 +37,58 @@ def thread_fn():
client.query(fql("${coll}.create({}).id", coll=a_collection))

stream_thread.join()
assert events[0] == ["start", "add", "remove", "add"]


def test_retry_on_timeout(client, a_collection):
def test_close_method(client, a_collection):

opts = StreamOptions(
idle_timeout=timedelta(seconds=0.1), retry_on_timeout=True)
events = []

def thread_fn():
stream = client.stream(
fql("${coll}.all().toStream()", coll=a_collection), opts)
stream = client.stream(fql("${coll}.all().toStream()", coll=a_collection))

events = []
with stream as iter:
for evt in iter:
events.append(evt["type"])
if len(events) == 4:
iter.close()

assert events == ["start", "start", "start", "start"]
# close after 2 events
if len(events) == 2:
iter.close()

stream_thread = threading.Thread(target=thread_fn)
stream_thread.start()

client.query(fql("${coll}.create({}).id", coll=a_collection)).data
client.query(fql("${coll}.create({}).id", coll=a_collection)).data

stream_thread.join()
assert events == ["start", "add"]


def test_max_retries(a_collection):
httpx_client = httpx.Client(http1=False, http2=True)
client = Client(
http_client=HTTPXClient(httpx_client), max_attempts=3, max_backoff=1)

count = [0]

def stream_func(*args, **kwargs):
count[0] += 1
raise NetworkError('foo')

httpx_client.stream = stream_func

count[0] = 0
with pytest.raises(RetryableFaunaException):
with client.stream(fql("${coll}.all().toStream()",
coll=a_collection)) as iter:
events = [evt["type"] for evt in iter]
assert count[0] == 3

count[0] = 0
with pytest.raises(RetryableFaunaException):
opts = StreamOptions(max_attempts=5)
with client.stream(
fql("${coll}.all().toStream()", coll=a_collection), opts) as iter:
events = [evt["type"] for evt in iter]
assert count[0] == 5
21 changes: 12 additions & 9 deletions tests/unit/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import fauna
from fauna import fql
from fauna.client import Client, Header, QueryOptions, Endpoints
from fauna.errors import QueryCheckError, ProtocolError, QueryRuntimeError, NetworkError
from fauna.errors import QueryCheckError, ProtocolError, QueryRuntimeError, NetworkError, StreamError
from fauna.query.models import StreamToken
from fauna.http import HTTPXClient

Expand Down Expand Up @@ -445,15 +445,15 @@ def test_client_stream(subtests, httpx_mock: HTTPXMock):


def test_client_close_stream(subtests, httpx_mock: HTTPXMock):
response = [b'{"@int": "10"}\n', b'{"@long": "20"}\n']
response = [b'{"type": "start", "ts": 1}\n', b'{"type": "error", "ts": 2}\n']

httpx_mock.add_response(stream=IteratorStream(response))

with httpx.Client() as mockClient:
http_client = HTTPXClient(mockClient)
c = Client(http_client=http_client)
with c.stream(StreamToken("token")) as stream:
assert next(stream) == 10
assert next(stream) == {"type": "start", "ts": 1}
stream.close()

with pytest.raises(StopIteration):
Expand Down Expand Up @@ -493,10 +493,13 @@ def stream_iter():
httpx_mock.add_response(stream=IteratorStream(stream_iter()))

ret = []
with httpx.Client() as mockClient:
http_client = HTTPXClient(mockClient)
c = Client(http_client=http_client)
with c.stream(StreamToken("token")) as stream:
ret = [obj for obj in stream]

assert ret == [{"type": "start", "ts": 1}, {"type": "error", "ts": 2}]
with pytest.raises(StreamError):
with httpx.Client() as mockClient:
http_client = HTTPXClient(mockClient)
c = Client(http_client=http_client)
with c.stream(StreamToken("token")) as stream:
for obj in stream:
ret.append(obj)

assert ret == [{"type": "start", "ts": 1}]

0 comments on commit 8ab264e

Please sign in to comment.