Skip to content

Commit

Permalink
run stream in a scoped database
Browse files Browse the repository at this point in the history
  • Loading branch information
marrony committed Mar 12, 2024
1 parent ec90253 commit 59a406b
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 15 deletions.
17 changes: 17 additions & 0 deletions tests/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,18 @@

from fauna import fql, Module
from fauna.client import Client
from fauna.client.utils import _Environment
from fauna.query import Query


def create_collection(name) -> Query:
return fql('Collection.create({ name: ${name} })', name=name)


def create_database(name) -> Query:
return fql('Database.create({ name: ${name} })', name=name)


@pytest.fixture
def suffix() -> str:
letters = string.ascii_lowercase
Expand All @@ -24,6 +29,18 @@ def client() -> Client:
return Client()


@pytest.fixture
def scoped_client(scoped_secret) -> Client:
return Client(secret=scoped_secret)


@pytest.fixture
def scoped_secret(client, suffix) -> str:
db_name = f"Test_{suffix}"
client.query(create_database(db_name))
return f"{_Environment.EnvFaunaSecret()}:{db_name}:admin"


@pytest.fixture
def a_collection(client, suffix) -> Module:
col_name = f"Test_{suffix}"
Expand Down
37 changes: 22 additions & 15 deletions tests/integration/test_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,33 +19,36 @@ def take(stream, count):
yield next(i)


def test_stream(client, a_collection):
def test_stream(scoped_client):
scoped_client.query(fql("Collection.create({name: 'Product'})"))
scoped_client.query(fql("Product.create({})"))

events = [[]]

def thread_fn():
stream = client.stream(fql("${coll}.all().toStream()", coll=a_collection))
stream = scoped_client.stream(fql("Product.all().toStream()"))

with stream as iter:
events[0] = [evt["type"] for evt in take(iter, 3)]

stream_thread = threading.Thread(target=thread_fn)
stream_thread.start()

id = client.query(fql("${coll}.create({}).id", coll=a_collection)).data
client.query(fql("${coll}.byId(${id})!.delete()", coll=a_collection, id=id))
client.query(fql("${coll}.create({}).id", coll=a_collection))
id = scoped_client.query(fql("Product.create({}).id")).data
scoped_client.query(fql("Product.byId(${id})!.delete()", id=id))
scoped_client.query(fql("Product.create({}).id"))

stream_thread.join()
assert events[0] == ["add", "remove", "add"]


def test_close_method(client, a_collection):
def test_close_method(scoped_client):
scoped_client.query(fql("Collection.create({name: 'Product'})"))

events = []

def thread_fn():
stream = client.stream(fql("${coll}.all().toStream()", coll=a_collection))
stream = scoped_client.stream(fql("Product.all().toStream()"))

with stream as iter:
for evt in iter:
Expand All @@ -58,17 +61,23 @@ def thread_fn():
stream_thread = threading.Thread(target=thread_fn)
stream_thread.start()

client.query(fql("${coll}.create({}).id", coll=a_collection)).data
client.query(fql("${coll}.create({}).id", coll=a_collection)).data
scoped_client.query(fql("Product.create({}).id")).data
scoped_client.query(fql("Product.create({}).id")).data

stream_thread.join()
assert events == ["add", "add"]


def test_max_retries(a_collection):
def test_max_retries(scoped_secret):
scoped_client = Client(secret=scoped_secret)
scoped_client.query(fql("Collection.create({name: 'Product'})"))

httpx_client = httpx.Client(http1=False, http2=True)
client = Client(
http_client=HTTPXClient(httpx_client), max_attempts=3, max_backoff=1)
secret=scoped_secret,
http_client=HTTPXClient(httpx_client),
max_attempts=3,
max_backoff=0)

count = [0]

Expand All @@ -80,15 +89,13 @@ def stream_func(*args, **kwargs):

count[0] = 0
with pytest.raises(RetryableFaunaException):
with client.stream(fql("${coll}.all().toStream()",
coll=a_collection)) as iter:
with client.stream(fql("Product.all().toStream()")) as iter:
events = [evt["type"] for evt in iter]
assert count[0] == 3

count[0] = 0
with pytest.raises(RetryableFaunaException):
opts = StreamOptions(max_attempts=5)
with client.stream(
fql("${coll}.all().toStream()", coll=a_collection), opts) as iter:
with client.stream(fql("Product.all().toStream()"), opts) as iter:
events = [evt["type"] for evt in iter]
assert count[0] == 5

0 comments on commit 59a406b

Please sign in to comment.