Skip to content

Commit

Permalink
don't emit start event
Browse files Browse the repository at this point in the history
  • Loading branch information
marrony committed Mar 9, 2024
1 parent 8ab264e commit 6ddc55d
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 23 deletions.
4 changes: 4 additions & 0 deletions fauna/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -669,6 +669,10 @@ def _next_element(self):
if event["type"] == "error":
# todo: parse error
raise StreamError

if event["type"] == "start":
return self._next_element()

return event

raise StopIteration
Expand Down
6 changes: 3 additions & 3 deletions tests/integration/test_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def thread_fn():
stream = client.stream(fql("${coll}.all().toStream()", coll=a_collection))

with stream as iter:
events[0] = [evt["type"] for evt in take(iter, 4)]
events[0] = [evt["type"] for evt in take(iter, 3)]

stream_thread = threading.Thread(target=thread_fn)
stream_thread.start()
Expand All @@ -37,7 +37,7 @@ def thread_fn():
client.query(fql("${coll}.create({}).id", coll=a_collection))

stream_thread.join()
assert events[0] == ["start", "add", "remove", "add"]
assert events[0] == ["add", "remove", "add"]


def test_close_method(client, a_collection):
Expand All @@ -62,7 +62,7 @@ def thread_fn():
client.query(fql("${coll}.create({}).id", coll=a_collection)).data

stream_thread.join()
assert events == ["start", "add"]
assert events == ["add", "add"]


def test_max_retries(a_collection):
Expand Down
33 changes: 13 additions & 20 deletions tests/unit/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -419,8 +419,8 @@ def test_call_query_with_string():

def test_client_stream(subtests, httpx_mock: HTTPXMock):
response = [
b'{"type": "start", "ts": 1}\n', b'{"type": "start", "ts": 2}\n',
b'{"type": "start", "ts": 3}\n'
b'{"type": "start", "ts": 1}\n', b'{"type": "add", "ts": 2}\n',
b'{"type": "remove", "ts": 3}\n'
]

httpx_mock.add_response(stream=IteratorStream(response))
Expand All @@ -432,28 +432,19 @@ def test_client_stream(subtests, httpx_mock: HTTPXMock):
with c.stream(StreamToken("token")) as stream:
ret = [obj for obj in stream]

assert ret == [{
"type": "start",
"ts": 1
}, {
"type": "start",
"ts": 2
}, {
"type": "start",
"ts": 3
}]
assert ret == [{"type": "add", "ts": 2}, {"type": "remove", "ts": 3}]


def test_client_close_stream(subtests, httpx_mock: HTTPXMock):
response = [b'{"type": "start", "ts": 1}\n', b'{"type": "error", "ts": 2}\n']
response = [b'{"type": "start", "ts": 1}\n', b'{"type": "add", "ts": 2}\n']

httpx_mock.add_response(stream=IteratorStream(response))

with httpx.Client() as mockClient:
http_client = HTTPXClient(mockClient)
c = Client(http_client=http_client)
with c.stream(StreamToken("token")) as stream:
assert next(stream) == {"type": "start", "ts": 1}
assert next(stream) == {"type": "add", "ts": 2}
stream.close()

with pytest.raises(StopIteration):
Expand All @@ -464,11 +455,12 @@ def test_client_retry_stream(subtests, httpx_mock: HTTPXMock):

def stream_iter0():
yield b'{"type": "start", "ts": 1}\n'
yield b'{"type": "add", "ts": 2}\n'
raise NetworkError("Some network error")
yield b'{"type": "start", "ts": 2}\n'
yield b'{"type": "start", "ts": 3}\n'

def stream_iter1():
yield b'{"type": "start", "ts": 3}\n'
yield b'{"type": "start", "ts": 4}\n'

httpx_mock.add_response(stream=IteratorStream(stream_iter0()))
httpx_mock.add_response(stream=IteratorStream(stream_iter1()))
Expand All @@ -480,15 +472,16 @@ def stream_iter1():
with c.stream(StreamToken("token")) as stream:
ret = [obj for obj in stream]

assert ret == [{"type": "start", "ts": 1}, {"type": "start", "ts": 3}]
assert ret == [{"type": "add", "ts": 2}]


def test_client_close_stream_on_error(subtests, httpx_mock: HTTPXMock):

def stream_iter():
yield b'{"type": "start", "ts": 1}\n'
yield b'{"type": "error", "ts": 2}\n'
yield b'{"type": "start", "ts": 3}\n'
yield b'{"type": "add", "ts": 2}\n'
yield b'{"type": "error", "ts": 3}\n'
yield b'{"type": "start", "ts": 4}\n'

httpx_mock.add_response(stream=IteratorStream(stream_iter()))

Expand All @@ -502,4 +495,4 @@ def stream_iter():
for obj in stream:
ret.append(obj)

assert ret == [{"type": "start", "ts": 1}]
assert ret == [{"type": "add", "ts": 2}]

0 comments on commit 6ddc55d

Please sign in to comment.