diff --git a/fauna/client/client.py b/fauna/client/client.py index e04bc9ce..0275668a 100644 --- a/fauna/client/client.py +++ b/fauna/client/client.py @@ -669,6 +669,10 @@ def _next_element(self): if event["type"] == "error": # todo: parse error raise StreamError + + if event["type"] == "start": + return self._next_element() + return event raise StopIteration diff --git a/tests/integration/test_stream.py b/tests/integration/test_stream.py index 087f14e5..12bf16b6 100644 --- a/tests/integration/test_stream.py +++ b/tests/integration/test_stream.py @@ -27,7 +27,7 @@ def thread_fn(): stream = client.stream(fql("${coll}.all().toStream()", coll=a_collection)) with stream as iter: - events[0] = [evt["type"] for evt in take(iter, 4)] + events[0] = [evt["type"] for evt in take(iter, 3)] stream_thread = threading.Thread(target=thread_fn) stream_thread.start() @@ -37,7 +37,7 @@ def thread_fn(): client.query(fql("${coll}.create({}).id", coll=a_collection)) stream_thread.join() - assert events[0] == ["start", "add", "remove", "add"] + assert events[0] == ["add", "remove", "add"] def test_close_method(client, a_collection): @@ -62,7 +62,7 @@ def thread_fn(): client.query(fql("${coll}.create({}).id", coll=a_collection)).data stream_thread.join() - assert events == ["start", "add"] + assert events == ["add", "add"] def test_max_retries(a_collection): diff --git a/tests/unit/test_client.py b/tests/unit/test_client.py index 5a82e6a7..1b94251e 100644 --- a/tests/unit/test_client.py +++ b/tests/unit/test_client.py @@ -419,8 +419,8 @@ def test_call_query_with_string(): def test_client_stream(subtests, httpx_mock: HTTPXMock): response = [ - b'{"type": "start", "ts": 1}\n', b'{"type": "start", "ts": 2}\n', - b'{"type": "start", "ts": 3}\n' + b'{"type": "start", "ts": 1}\n', b'{"type": "add", "ts": 2}\n', + b'{"type": "remove", "ts": 3}\n' ] httpx_mock.add_response(stream=IteratorStream(response)) @@ -432,20 +432,11 @@ def test_client_stream(subtests, httpx_mock: HTTPXMock): with c.stream(StreamToken("token")) as stream: ret = [obj for obj in stream] - assert ret == [{ - "type": "start", - "ts": 1 - }, { - "type": "start", - "ts": 2 - }, { - "type": "start", - "ts": 3 - }] + assert ret == [{"type": "add", "ts": 2}, {"type": "remove", "ts": 3}] def test_client_close_stream(subtests, httpx_mock: HTTPXMock): - response = [b'{"type": "start", "ts": 1}\n', b'{"type": "error", "ts": 2}\n'] + response = [b'{"type": "start", "ts": 1}\n', b'{"type": "add", "ts": 2}\n'] httpx_mock.add_response(stream=IteratorStream(response)) @@ -453,7 +444,7 @@ def test_client_close_stream(subtests, httpx_mock: HTTPXMock): http_client = HTTPXClient(mockClient) c = Client(http_client=http_client) with c.stream(StreamToken("token")) as stream: - assert next(stream) == {"type": "start", "ts": 1} + assert next(stream) == {"type": "add", "ts": 2} stream.close() with pytest.raises(StopIteration): @@ -464,11 +455,12 @@ def test_client_retry_stream(subtests, httpx_mock: HTTPXMock): def stream_iter0(): yield b'{"type": "start", "ts": 1}\n' + yield b'{"type": "add", "ts": 2}\n' raise NetworkError("Some network error") - yield b'{"type": "start", "ts": 2}\n' + yield b'{"type": "start", "ts": 3}\n' def stream_iter1(): - yield b'{"type": "start", "ts": 3}\n' + yield b'{"type": "start", "ts": 4}\n' httpx_mock.add_response(stream=IteratorStream(stream_iter0())) httpx_mock.add_response(stream=IteratorStream(stream_iter1())) @@ -480,15 +472,16 @@ def stream_iter1(): with c.stream(StreamToken("token")) as stream: ret = [obj for obj in stream] - assert ret == [{"type": "start", "ts": 1}, {"type": "start", "ts": 3}] + assert ret == [{"type": "add", "ts": 2}] def test_client_close_stream_on_error(subtests, httpx_mock: HTTPXMock): def stream_iter(): yield b'{"type": "start", "ts": 1}\n' - yield b'{"type": "error", "ts": 2}\n' - yield b'{"type": "start", "ts": 3}\n' + yield b'{"type": "add", "ts": 2}\n' + yield b'{"type": "error", "ts": 3}\n' + yield b'{"type": "start", "ts": 4}\n' httpx_mock.add_response(stream=IteratorStream(stream_iter())) @@ -502,4 +495,4 @@ def stream_iter(): for obj in stream: ret.append(obj) - assert ret == [{"type": "start", "ts": 1}] + assert ret == [{"type": "add", "ts": 2}]