Skip to content

Commit

Permalink
rename StreamToken to EventSource
Browse files Browse the repository at this point in the history
  • Loading branch information
erickpintor committed Oct 17, 2024
1 parent 2f83d5f commit 6558da8
Show file tree
Hide file tree
Showing 10 changed files with 82 additions and 58 deletions.
25 changes: 12 additions & 13 deletions fauna/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@
from fauna.errors import FaunaError, ClientError, ProtocolError, \
RetryableFaunaException, NetworkError
from fauna.http.http_client import HTTPClient
from fauna.query import Query, Page, fql
from fauna.query.models import StreamToken
from fauna.query import EventSource, Query, Page, fql

DefaultHttpConnectTimeout = timedelta(seconds=5)
DefaultHttpReadTimeout: Optional[timedelta] = None
Expand Down Expand Up @@ -420,13 +419,13 @@ def _query(

def stream(
self,
fql: Union[StreamToken, Query],
fql: Union[EventSource, Query],
opts: StreamOptions = StreamOptions()
) -> "StreamIterator":
"""
Opens a Stream in Fauna and returns an iterator that consume Fauna events.
:param fql: A Query that returns a StreamToken or a StreamToken.
:param fql: A Query that returns a EventSource or a EventSource.
:param opts: (Optional) Stream Options.
:return: a :class:`StreamIterator`
Expand All @@ -442,14 +441,14 @@ def stream(
if isinstance(fql, Query):
if opts.cursor is not None:
raise ClientError(
"The 'cursor' configuration can only be used with a stream token.")
"The 'cursor' configuration can only be used with an event source.")

token = self.query(fql).data
else:
token = fql

if not isinstance(token, StreamToken):
err_msg = f"'fql' must be a StreamToken, or a Query that returns a StreamToken but was a {type(token)}."
if not isinstance(token, EventSource):
err_msg = f"'fql' must be a EventSource, or a Query that returns a EventSource but was a {type(token)}."
raise TypeError(err_msg)

headers = self._headers.copy()
Expand All @@ -461,13 +460,13 @@ def stream(

def change_feed(
self,
fql: Union[StreamToken, Query],
fql: Union[EventSource, Query],
opts: ChangeFeedOptions = ChangeFeedOptions()
) -> "ChangeFeedIterator":
"""
Opens an Event Feed in Fauna and returns an iterator that consume Fauna events.
:param fql: A Query that returns a StreamToken or a StreamToken.
:param fql: A Query that returns a EventSource or a EventSource.
:param opts: (Optional) Event Feed options.
:return: a :class:`ChangeFeedIterator`
Expand All @@ -485,8 +484,8 @@ def change_feed(
else:
token = fql

if not isinstance(token, StreamToken):
err_msg = f"'fql' must be a StreamToken, or a Query that returns a StreamToken but was a {type(token)}."
if not isinstance(token, EventSource):
err_msg = f"'fql' must be a EventSource, or a Query that returns a EventSource but was a {type(token)}."
raise TypeError(err_msg)

headers = self._headers.copy()
Expand Down Expand Up @@ -542,7 +541,7 @@ class StreamIterator:

def __init__(self, http_client: HTTPClient, headers: Dict[str, str],
endpoint: str, max_attempts: int, max_backoff: int,
opts: StreamOptions, token: StreamToken):
opts: StreamOptions, token: EventSource):
self._http_client = http_client
self._headers = headers
self._endpoint = endpoint
Expand Down Expand Up @@ -666,7 +665,7 @@ class ChangeFeedIterator:

def __init__(self, http: HTTPClient, headers: Dict[str, str], endpoint: str,
max_attempts: int, max_backoff: int, opts: ChangeFeedOptions,
token: StreamToken):
token: EventSource):
self._http = http
self._headers = headers
self._endpoint = endpoint
Expand Down
8 changes: 4 additions & 4 deletions fauna/encoding/decoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from iso8601 import parse_date

from fauna.query.models import Module, DocumentReference, Document, NamedDocument, NamedDocumentReference, Page, \
NullDocument, StreamToken
NullDocument, EventSource


class FaunaDecoder:
Expand Down Expand Up @@ -45,7 +45,7 @@ class FaunaDecoder:
+--------------------+---------------+
| Page | @set |
+--------------------+---------------+
| StreamToken | @stream |
| EventSource | @stream |
+--------------------+---------------+
"""
Expand All @@ -64,7 +64,7 @@ def decode(obj: Any):
- { "@ref": ... } decodes to a DocumentReference or NamedDocumentReference
- { "@mod": ... } decodes to a Module
- { "@set": ... } decodes to a Page
- { "@stream": ... } decodes to a StreamToken
- { "@stream": ... } decodes to a EventSource
- { "@bytes": ... } decodes to a bytearray
:param obj: the object to decode
Expand Down Expand Up @@ -176,6 +176,6 @@ def _decode_dict(dct: dict, escaped: bool):
return Page(data=data, after=after)

if "@stream" in dct:
return StreamToken(dct["@stream"])
return EventSource(dct["@stream"])

return {k: FaunaDecoder._decode(v) for k, v in dct.items()}
10 changes: 5 additions & 5 deletions fauna/encoding/encoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from typing import Any, Optional, List, Union

from fauna.query.models import DocumentReference, Module, Document, NamedDocument, NamedDocumentReference, NullDocument, \
StreamToken
EventSource
from fauna.query.query_builder import Query, Fragment, LiteralFragment, ValueFragment

_RESERVED_TAGS = [
Expand Down Expand Up @@ -62,7 +62,7 @@ class FaunaEncoder:
+-------------------------------+---------------+
| TemplateFragment | string |
+-------------------------------+---------------+
| StreamToken | string |
| EventSource | string |
+-------------------------------+---------------+
"""
Expand All @@ -82,7 +82,7 @@ def encode(obj: Any) -> Any:
- Query encodes to { "fql": [...] }
- ValueFragment encodes to { "value": <encoded_val> }
- LiteralFragment encodes to a string
- StreamToken encodes to a string
- EventSource encodes to a string
:raises ValueError: If value cannot be encoded, cannot be encoded safely, or there's a circular reference.
:param obj: the object to decode
Expand Down Expand Up @@ -163,7 +163,7 @@ def from_query_interpolation_builder(obj: Query):
return {"fql": [FaunaEncoder.from_fragment(f) for f in obj.fragments]}

@staticmethod
def from_streamtoken(obj: StreamToken):
def from_streamtoken(obj: EventSource):
return {"@stream": obj.token}

@staticmethod
Expand Down Expand Up @@ -208,7 +208,7 @@ def _encode(o: Any, _markers: Optional[List] = None):
return FaunaEncoder._encode_dict(o, _markers)
elif isinstance(o, Query):
return FaunaEncoder.from_query_interpolation_builder(o)
elif isinstance(o, StreamToken):
elif isinstance(o, EventSource):
return FaunaEncoder.from_streamtoken(o)
else:
raise ValueError(f"Object {o} of type {type(o)} cannot be encoded")
Expand Down
2 changes: 1 addition & 1 deletion fauna/query/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
from .models import Document, DocumentReference, NamedDocument, NamedDocumentReference, NullDocument, Module, Page
from .models import Document, DocumentReference, EventSource, NamedDocument, NamedDocumentReference, NullDocument, Module, Page
from .query_builder import fql, Query
23 changes: 20 additions & 3 deletions fauna/query/models.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,25 @@
import warnings
from collections.abc import Mapping
from datetime import datetime
from typing import Union, Iterator, Any, Optional, List


# NB. Override __getattr__ and __dir__ to deprecate StreamToken usages. Based
# on: https://peps.python.org/pep-0562/
def __getattr__(name):
if name == "StreamToken":
warnings.warn(
"StreamToken is deprecated. Prefer fauna.query.EventSource instead.",
DeprecationWarning,
stacklevel=2)
return EventSource
return super.__getattr__(name)


def __dir__():
return super.__dir__() + "StreamToken"


class Page:
"""A class representing a Set in Fauna."""

Expand Down Expand Up @@ -36,14 +53,14 @@ def __ne__(self, other):
return not self.__eq__(other)


class StreamToken:
"""A class represeting a Stream in Fauna."""
class EventSource:
"""A class represeting an EventSource in Fauna."""

def __init__(self, token: str):
self.token = token

def __eq__(self, other):
return isinstance(other, StreamToken) and self.token == other.token
return isinstance(other, EventSource) and self.token == other.token

def __hash__(self):
return hash(self.token)
Expand Down
24 changes: 12 additions & 12 deletions tests/integration/test_change_feeds.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
def test_change_feed_requires_stream(client, a_collection):
with pytest.raises(
TypeError,
match="'fql' must be a StreamToken, or a Query that returns a StreamToken but was a <class 'int'>."
match="'fql' must be a EventSource, or a Query that returns a EventSource but was a <class 'int'>."
):
client.change_feed(fql("42"))

Expand All @@ -19,9 +19,9 @@ def test_change_feed_query(client, a_collection):
_pull_one_event(client, a_collection, feed)


def test_change_feed_token(client, a_collection):
token = client.query(fql("${col}.all().toStream()", col=a_collection)).data
feed = client.change_feed(token)
def test_change_feed_event_source(client, a_collection):
source = client.query(fql("${col}.all().toStream()", col=a_collection)).data
feed = client.change_feed(source)
_pull_one_event(client, a_collection, feed)


Expand Down Expand Up @@ -82,7 +82,7 @@ def test_change_feeds_continue_after_an_error(client, a_collection):


def test_change_feed_start_ts(client, a_collection):
token = client.query(
source = client.query(
fql("${col}.all().map(.n).toStream()", col=a_collection)).data

# NB. Issue separate queries to ensure they get different txn times.
Expand All @@ -91,25 +91,25 @@ def test_change_feed_start_ts(client, a_collection):

# NB. Use a short page size to ensure that more than one roundtrip is made,
# thus testing the interator's internal cursoring is correct.
first = next(client.change_feed(token).flatten())
first = next(client.change_feed(source).flatten())
opts = ChangeFeedOptions(start_ts=first['txn_ts'], page_size=5)
feed = client.change_feed(token, opts)
feed = client.change_feed(source, opts)

nums = [event['data'] for event in feed.flatten()]
assert nums == list(range(1, 64))


def test_change_feed_cursor(client, a_collection):
token = client.query(
source = client.query(
fql("${col}.all().map(.n).toStream()", col=a_collection)).data

_create_docs(client, a_collection, 0, 64)

# NB. Use a short page size to ensure that more than one roundtrip is made,
# thus testing the interator's internal cursoring is correct.
first = next(client.change_feed(token).flatten())
first = next(client.change_feed(source).flatten())
opts = ChangeFeedOptions(cursor=first['cursor'], page_size=5)
feed = client.change_feed(token, opts)
feed = client.change_feed(source, opts)

nums = [event['data'] for event in feed.flatten()]
assert nums == list(range(1, 64))
Expand All @@ -119,11 +119,11 @@ def test_rejects_when_both_start_ts_and_cursor_provided(scoped_client):
scoped_client.query(fql("Collection.create({name: 'Product'})"))

response = scoped_client.query(fql("Product.all().toStream()"))
stream_token = response.data
source = response.data

with pytest.raises(TypeError):
opts = ChangeFeedOptions(cursor="abc1234==", start_ts=response.txn_ts)
scoped_client.change_feed(stream_token, opts)
scoped_client.change_feed(source, opts)


def test_change_feed_reusable_iterator(client, a_collection):
Expand Down
17 changes: 9 additions & 8 deletions tests/integration/test_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ def thread_fn():
def test_providing_start_ts(scoped_client):
scoped_client.query(fql("Collection.create({name: 'Product'})"))

stream_token = scoped_client.query(fql("Product.all().toStream()")).data
source = scoped_client.query(fql("Product.all().toStream()")).data

createOne = scoped_client.query(fql("Product.create({})"))
createTwo = scoped_client.query(fql("Product.create({})"))
Expand All @@ -178,7 +178,7 @@ def test_providing_start_ts(scoped_client):

def thread_fn():
# replay excludes the ts that was passed in, it provides events for all ts after the one provided
stream = scoped_client.stream(stream_token,
stream = scoped_client.stream(source,
StreamOptions(start_ts=createOne.txn_ts))
barrier.wait()
with stream as iter:
Expand All @@ -201,20 +201,20 @@ def thread_fn():
def test_providing_cursor(scoped_client):
scoped_client.query(fql("Collection.create({name: 'Product'})"))

stream_token = scoped_client.query(fql("Product.all().toStream()")).data
source = scoped_client.query(fql("Product.all().toStream()")).data
create1 = scoped_client.query(fql("Product.create({ value: 1 })"))
create2 = scoped_client.query(fql("Product.create({ value: 2 })"))

cursor = None
with scoped_client.stream(stream_token) as iter:
with scoped_client.stream(source) as iter:
for event in iter:
assert event["type"] == "add"
assert event["data"]["value"] == 1
cursor = event["cursor"]
break

opts = StreamOptions(cursor=cursor)
with scoped_client.stream(stream_token, opts) as iter:
with scoped_client.stream(source, opts) as iter:
for event in iter:
assert event["type"] == "add"
assert event["data"]["value"] == 2
Expand All @@ -224,7 +224,8 @@ def test_providing_cursor(scoped_client):
def test_rejects_cursor_with_fql_query(scoped_client):
with pytest.raises(
ClientError,
match="The 'cursor' configuration can only be used with a stream token."):
match="The 'cursor' configuration can only be used with an event source."
):
opts = StreamOptions(cursor="abc1234==")
scoped_client.stream(fql("Collection.create({name: 'Product'})"), opts)

Expand All @@ -233,11 +234,11 @@ def test_rejects_when_both_start_ts_and_cursor_provided(scoped_client):
scoped_client.query(fql("Collection.create({name: 'Product'})"))

response = scoped_client.query(fql("Product.all().toStream()"))
stream_token = response.data
source = response.data

with pytest.raises(TypeError):
opts = StreamOptions(cursor="abc1234==", start_ts=response.txn_ts)
scoped_client.stream(stream_token, opts)
scoped_client.stream(source, opts)


def test_handle_status_events(scoped_client):
Expand Down
Loading

0 comments on commit 6558da8

Please sign in to comment.