Skip to content

Commit

Permalink
Remove sleeps from streaming tests (#193)
Browse files Browse the repository at this point in the history
  • Loading branch information
erickpintor authored Sep 24, 2024
1 parent db5c395 commit b70d00a
Showing 1 changed file with 19 additions and 24 deletions.
43 changes: 19 additions & 24 deletions tests/integration/test_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@
def test_stream(scoped_client):
scoped_client.query(fql("Collection.create({name: 'Product'})"))

barrier = threading.Barrier(2)
events = []

def thread_fn():
stream = scoped_client.stream(fql("Product.all().toStream()"))
barrier.wait()

with stream as iter:
for evt in iter:
Expand All @@ -28,10 +30,7 @@ def thread_fn():

stream_thread = threading.Thread(target=thread_fn)
stream_thread.start()

# adds a delay so the thread can open the stream,
# otherwise we could miss some events
time.sleep(0.5)
barrier.wait()

id = scoped_client.query(fql("Product.create({}).id")).data
scoped_client.query(fql("Product.byId(${id})!.delete()", id=id))
Expand All @@ -45,10 +44,12 @@ def thread_fn():
def test_close_method(scoped_client):
scoped_client.query(fql("Collection.create({name: 'Product'})"))

barrier = threading.Barrier(2)
events = []

def thread_fn():
stream = scoped_client.stream(fql("Product.all().toStream()"))
barrier.wait()

with stream as iter:
for evt in iter:
Expand All @@ -60,10 +61,7 @@ def thread_fn():

stream_thread = threading.Thread(target=thread_fn)
stream_thread.start()

# adds a delay so the thread can open the stream,
# otherwise we could miss some events
time.sleep(0.5)
barrier.wait()

scoped_client.query(fql("Product.create({})"))
scoped_client.query(fql("Product.create({})"))
Expand All @@ -75,8 +73,11 @@ def thread_fn():
def test_error_on_stream(scoped_client):
scoped_client.query(fql("Collection.create({name: 'Product'})"))

barrier = threading.Barrier(2)

def thread_fn():
stream = scoped_client.stream(fql("Product.all().map(.foo / 0).toStream()"))
barrier.wait()

with pytest.raises(QueryRuntimeError):
with stream as iter:
Expand All @@ -85,10 +86,7 @@ def thread_fn():

stream_thread = threading.Thread(target=thread_fn)
stream_thread.start()

# adds a delay so the thread can open the stream,
# otherwise we could miss some events
time.sleep(0.5)
barrier.wait()

scoped_client.query(fql("Product.create({foo: 10})"))
scoped_client.query(fql("Product.create({foo: 10})"))
Expand Down Expand Up @@ -132,10 +130,12 @@ def stream_func(*args, **kwargs):
def test_last_ts_is_monotonic(scoped_client):
scoped_client.query(fql("Collection.create({name: 'Product'})"))

barrier = threading.Barrier(2)
events = []

def thread_fn():
stream = scoped_client.stream(fql("Product.all().toStream()"))
barrier.wait()

with stream as iter:
last_ts = 0
Expand All @@ -153,10 +153,7 @@ def thread_fn():

stream_thread = threading.Thread(target=thread_fn)
stream_thread.start()

# adds a delay so the thread can open the stream,
# otherwise we could miss some events
time.sleep(0.5)
barrier.wait()

scoped_client.query(fql("Product.create({})"))
scoped_client.query(fql("Product.create({})"))
Expand All @@ -176,12 +173,14 @@ def test_providing_start_ts(scoped_client):
createTwo = scoped_client.query(fql("Product.create({})"))
createThree = scoped_client.query(fql("Product.create({})"))

barrier = threading.Barrier(2)
events = []

def thread_fn():
# replay excludes the ts that was passed in, it provides events for all ts after the one provided
stream = scoped_client.stream(stream_token,
StreamOptions(start_ts=createOne.txn_ts))
barrier.wait()
with stream as iter:
for event in iter:
events.append(event)
Expand All @@ -190,10 +189,7 @@ def thread_fn():

stream_thread = threading.Thread(target=thread_fn)
stream_thread.start()

# adds a delay so the thread can open the stream,
# otherwise we could miss some events
time.sleep(0.5)
barrier.wait()

createFour = scoped_client.query(fql("Product.create({})"))
stream_thread.join()
Expand Down Expand Up @@ -237,11 +233,13 @@ def test_rejects_cursor_with_fql_query(scoped_client):
def test_handle_status_events(scoped_client):
scoped_client.query(fql("Collection.create({name: 'Product'})"))

barrier = threading.Barrier(2)
events = []

def thread_fn():
opts = StreamOptions(status_events=True)
stream = scoped_client.stream(fql("Product.all().toStream()"), opts)
barrier.wait()

with stream as iter:
for evt in iter:
Expand All @@ -253,10 +251,7 @@ def thread_fn():

stream_thread = threading.Thread(target=thread_fn)
stream_thread.start()

# adds a delay so the thread can open the stream,
# otherwise we could miss some events
time.sleep(0.5)
barrier.wait()

scoped_client.query(fql("Product.create({})"))
scoped_client.query(fql("Product.create({})"))
Expand Down

0 comments on commit b70d00a

Please sign in to comment.