Skip to content

Commit

Permalink
Revert "Add option for user to handle status events (#159)"
Browse files Browse the repository at this point in the history
This reverts commit 5bb2db4.
  • Loading branch information
pnwpedro committed Apr 1, 2024
1 parent 690f96b commit 6475735
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 158 deletions.
9 changes: 1 addition & 8 deletions fauna/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,11 @@ class StreamOptions:
A dataclass representing options available for a stream.
* max_attempts - The maximum number of times to attempt a stream query when a retryable exception is thrown.
* max_backoff - The maximum backoff in seconds for an individual retry.
* status_events - Indicates if stream should include status events. Status events are periodic events that
* update the client with the latest valid timestamp (in the event of a dropped connection) as well as metrics
* about about the cost of maintaining the stream other than the cost of the received events.
* max_backoff - The maximum backoff in seconds for an individual retry
"""

max_attempts: Optional[int] = None
max_backoff: Optional[int] = None
status_events: bool = False


class Client:
Expand Down Expand Up @@ -525,9 +521,6 @@ def _next_element(self):
if event["type"] == "start":
return self._next_element()

if not self._opts.status_events and event["type"] == "status":
return self._next_element()

return event

raise StopIteration
Expand Down
73 changes: 1 addition & 72 deletions tests/integration/test_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

def test_stream(scoped_client):
scoped_client.query(fql("Collection.create({name: 'Product'})"))
scoped_client.query(fql("Product.create({})"))

events = []

Expand Down Expand Up @@ -131,75 +132,3 @@ def stream_func(*args, **kwargs):
with client.stream(fql("Product.all().toStream()"), opts) as iter:
events = [evt["type"] for evt in iter]
assert count[0] == 5


def test_last_ts_is_monotonic(scoped_client):
scoped_client.query(fql("Collection.create({name: 'Product'})"))

events = []

def thread_fn():
stream = scoped_client.stream(fql("Product.all().toStream()"))

with stream as iter:
last_ts = 0

for evt in iter:
assert iter.last_ts > last_ts

last_ts = iter.last_ts

events.append(evt["type"])

# close after 3 events
if len(events) == 3:
iter.close()

stream_thread = threading.Thread(target=thread_fn)
stream_thread.start()

# adds a delay so the thread can open the stream,
# otherwise we could miss some events
time.sleep(0.5)

scoped_client.query(fql("Product.create({})"))
scoped_client.query(fql("Product.create({})"))
scoped_client.query(fql("Product.create({})"))
scoped_client.query(fql("Product.create({})"))

stream_thread.join()
assert events == ["add", "add", "add"]


@pytest.mark.xfail(reason="not currently supported by core")
def test_handle_status_events(scoped_client):
scoped_client.query(fql("Collection.create({name: 'Product'})"))

events = []

def thread_fn():
opts = StreamOptions(status_events=True)
stream = scoped_client.stream(fql("Product.all().toStream()"), opts)

with stream as iter:
for evt in iter:
events.append(evt["type"])

# close after 3 events
if len(events) == 3:
iter.close()

stream_thread = threading.Thread(target=thread_fn)
stream_thread.start()

# adds a delay so the thread can open the stream,
# otherwise we could miss some events
time.sleep(0.5)

scoped_client.query(fql("Product.create({})"))
scoped_client.query(fql("Product.create({})"))
scoped_client.query(fql("Product.create({})"))
scoped_client.query(fql("Product.create({})"))

stream_thread.join()
assert events == ["status", "add", "add"]
86 changes: 8 additions & 78 deletions tests/unit/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

import fauna
from fauna import fql
from fauna.client import Client, Header, QueryOptions, Endpoints, StreamOptions
from fauna.client import Client, Header, QueryOptions, Endpoints
from fauna.errors import QueryCheckError, ProtocolError, QueryRuntimeError, NetworkError, AbortError
from fauna.query.models import StreamToken
from fauna.http import HTTPXClient
Expand Down Expand Up @@ -419,7 +419,7 @@ def test_call_query_with_string():

def test_client_stream(subtests, httpx_mock: HTTPXMock):
response = [
b'{"type": "status", "txn_ts": 1}\n', b'{"type": "add", "txn_ts": 2}\n',
b'{"type": "start", "txn_ts": 1}\n', b'{"type": "add", "txn_ts": 2}\n',
b'{"type": "remove", "txn_ts": 3}\n'
]

Expand All @@ -432,14 +432,12 @@ def test_client_stream(subtests, httpx_mock: HTTPXMock):
with c.stream(StreamToken("token")) as stream:
ret = [obj for obj in stream]

assert stream.last_ts == 3

assert ret == [{"type": "add", "txn_ts": 2}, {"type": "remove", "txn_ts": 3}]


def test_client_close_stream(subtests, httpx_mock: HTTPXMock):
response = [
b'{"type": "status", "txn_ts": 1}\n', b'{"type": "add", "txn_ts": 2}\n'
b'{"type": "start", "txn_ts": 1}\n', b'{"type": "add", "txn_ts": 2}\n'
]

httpx_mock.add_response(stream=IteratorStream(response))
Expand All @@ -451,22 +449,20 @@ def test_client_close_stream(subtests, httpx_mock: HTTPXMock):
assert next(stream) == {"type": "add", "txn_ts": 2}
stream.close()

assert stream.last_ts == 2

with pytest.raises(StopIteration):
next(stream)


def test_client_retry_stream(subtests, httpx_mock: HTTPXMock):

def stream_iter0():
yield b'{"type": "status", "txn_ts": 1}\n'
yield b'{"type": "start", "txn_ts": 1}\n'
yield b'{"type": "add", "txn_ts": 2}\n'
raise NetworkError("Some network error")
yield b'{"type": "status", "txn_ts": 3}\n'
yield b'{"type": "start", "txn_ts": 3}\n'

def stream_iter1():
yield b'{"type": "status", "txn_ts": 4}\n'
yield b'{"type": "start", "txn_ts": 4}\n'

httpx_mock.add_response(stream=IteratorStream(stream_iter0()))
httpx_mock.add_response(stream=IteratorStream(stream_iter1()))
Expand All @@ -478,18 +474,16 @@ def stream_iter1():
with c.stream(StreamToken("token")) as stream:
ret = [obj for obj in stream]

assert stream.last_ts == 4

assert ret == [{"type": "add", "txn_ts": 2}]


def test_client_close_stream_on_error(subtests, httpx_mock: HTTPXMock):

def stream_iter():
yield b'{"type": "status", "txn_ts": 1}\n'
yield b'{"type": "start", "txn_ts": 1}\n'
yield b'{"type": "add", "txn_ts": 2}\n'
yield b'{"type": "error", "txn_ts": 3, "error": {"message": "message", "code": "abort"}}\n'
yield b'{"type": "status", "txn_ts": 4}\n'
yield b'{"type": "start", "txn_ts": 4}\n'

httpx_mock.add_response(stream=IteratorStream(stream_iter()))

Expand All @@ -503,68 +497,4 @@ def stream_iter():
for obj in stream:
ret.append(obj)

assert stream.last_ts == 5

assert ret == [{"type": "add", "txn_ts": 2}]


def test_client_ignore_start_event(subtests, httpx_mock: HTTPXMock):

def stream_iter():
yield b'{"type": "start", "txn_ts": 1}\n'
yield b'{"type": "status", "txn_ts": 2}\n'
yield b'{"type": "add", "txn_ts": 3}\n'
yield b'{"type": "remove", "txn_ts": 4}\n'
yield b'{"type": "status", "txn_ts": 5}\n'

httpx_mock.add_response(stream=IteratorStream(stream_iter()))

ret = []

with httpx.Client() as mockClient:
http_client = HTTPXClient(mockClient)
c = Client(http_client=http_client)
with c.stream(StreamToken("token")) as stream:
for obj in stream:
ret.append(obj)

assert stream.last_ts == 5

assert ret == [{"type": "add", "txn_ts": 3}, {"type": "remove", "txn_ts": 4}]


def test_client_handle_status_events(subtests, httpx_mock: HTTPXMock):

def stream_iter():
yield b'{"type": "status", "txn_ts": 1}\n'
yield b'{"type": "add", "txn_ts": 2}\n'
yield b'{"type": "remove", "txn_ts": 3}\n'
yield b'{"type": "status", "txn_ts": 4}\n'

httpx_mock.add_response(stream=IteratorStream(stream_iter()))

ret = []

with httpx.Client() as mockClient:
http_client = HTTPXClient(mockClient)
c = Client(http_client=http_client)
with c.stream(StreamToken("token"),
StreamOptions(status_events=True)) as stream:
for obj in stream:
ret.append(obj)

assert stream.last_ts == 4

assert ret == [{
"type": "status",
"txn_ts": 1
}, {
"type": "add",
"txn_ts": 2
}, {
"type": "remove",
"txn_ts": 3
}, {
"type": "status",
"txn_ts": 4
}]

0 comments on commit 6475735

Please sign in to comment.