Skip to content

Commit

Permalink
close stream on event[type=error]
Browse files Browse the repository at this point in the history
  • Loading branch information
marrony committed Mar 5, 2024
1 parent 1bb9e5d commit 1189c61
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 1 deletion.
4 changes: 3 additions & 1 deletion fauna/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -618,6 +618,7 @@ def __init__(self, client, fql):
self.ctx = self.client._stream(self.fql, None)
self.stream = None
self.last_ts = 0
self.error = False

def __enter__(self):
self.stream = self.ctx.__enter__()
Expand All @@ -632,9 +633,10 @@ def __iter__(self):

def __next__(self):
try:
if self.stream is not None:
if not self.error and self.stream is not None:
event: Any = FaunaDecoder.decode(next(self.stream))
self.last_ts = event["ts"]
self.error = event["type"] == "error"
return event

raise StopIteration
Expand Down
19 changes: 19 additions & 0 deletions tests/unit/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -481,3 +481,22 @@ def stream_iter1():
ret = [obj for obj in stream]

assert ret == [{"type": "start", "ts": 1}, {"type": "start", "ts": 3}]


def test_client_close_stream_on_error(subtests, httpx_mock: HTTPXMock):

def stream_iter():
yield b'{"type": "start", "ts": 1}\n'
yield b'{"type": "error", "ts": 2}\n'
yield b'{"type": "start", "ts": 3}\n'

httpx_mock.add_response(stream=IteratorStream(stream_iter()))

ret = []
with httpx.Client() as mockClient:
http_client = HTTPXClient(mockClient)
c = Client(http_client=http_client)
with c.stream(StreamToken("token")) as stream:
ret = [obj for obj in stream]

assert ret == [{"type": "start", "ts": 1}, {"type": "error", "ts": 2}]

0 comments on commit 1189c61

Please sign in to comment.