diff --git a/README.md b/README.md index 6238e2bd..1b446c40 100644 --- a/README.md +++ b/README.md @@ -291,14 +291,13 @@ The driver supports [Event Feeds](https://docs.fauna.com/fauna/current/learn/tra ### Request an Event Feed -An Event Feed asynchronously polls an [event stream](https://docs.fauna.com/fauna/current/learn/streaming), -represented by a stream token, for events. +An Event Feed asynchronously polls an [event source](https://docs.fauna.com/fauna/current/learn/streaming), +for paginated events. -To get a stream token, append ``toStream()`` or ``changesOn()`` to a set from a -[supported source](https://docs.fauna.com/fauna/current/reference/streaming_reference/#supported-sources). +To get an event source, append ``eventSource()`` or ``eventsOn()`` to a +[supported Set](https://docs.fauna.com/fauna/current/reference/streaming_reference/#sets). -To get paginated events for the stream, pass the stream token to -``change_feed()``: +To get paginated events for the source, pass the event source to ``feed()``: ```python from fauna import fql @@ -310,46 +309,45 @@ To get paginated events for the stream, pass the stream token to let set = Product.all() { initialPage: set.pageSize(10), - streamToken: set.toStream() + eventSource: set.eventSource() } ''')) - initialPage = response.data['initialPage'] - streamToken = response.data['streamToken'] + initial_page = response.data['initialPage'] + event_source = response.data['eventSource'] - client.change_feed(streamToken) + client.feed(event_source) ``` -You can also pass a query that produces a stream token directly to -``change_feed()``: +You can also pass a query that produces an event source directly to ``feed()``: ```python - query = fql('Product.all().changesOn(.price, .stock)') + query = fql('Product.all().eventsOn(.price, .stock)') - client.change_feed(query) + client.feed(query) ``` ### Iterate on an Event Feed -``change_feed()`` returns an iterator that emits pages of events. You can use a +``feed()`` returns an iterator that emits pages of events. You can use a generator expression to iterate through the pages: ```python - query = fql('Product.all().changesOn(.price, .stock)') - feed = client.change_feed(query) + query = fql('Product.all().eventsOn(.price, .stock)') + feed = client.feed(query) for page in feed: print('Page stats: ', page.stats) for event in page: - eventType = event['type'] - if (eventType == 'add'): + event_type = event['type'] + if (event_type == 'add'): print('Add event: ', event) ## ... - elif (eventType == 'update'): + elif (event_type == 'update'): print('Update event: ', event) ## ... - elif (eventType == 'remove'): + elif (event_type == 'remove'): print('Remove event: ', event) ## ... ``` @@ -358,11 +356,11 @@ Alternatively, you can iterate through events instead of pages with ``flatten()``: ```python - query = fql('Product.all().changesOn(.price, .stock)') - feed = client.change_feed(query) + query = fql('Product.all().eventsOn(.price, .stock)') + feed = client.feed(query) for event in feed.flatten(): - eventType = event['type'] + event_type = event['type'] ## ... ``` @@ -381,8 +379,8 @@ raises a ``FaunaException``: client = Client() try: - feed = client.change_feed(fql( - 'Product.all().changesOn(.price, .stock)' + feed = client.feed(fql( + 'Product.all().eventsOn(.price, .stock)' )) for event in feed.flatten(): print(event) @@ -393,7 +391,7 @@ raises a ``FaunaException``: Errors can be raised at two different places: -1. At the ``change_feed`` method call; +1. At the ``feed`` method call; 2. At the page iteration. This distinction allows for users to ignore errors originating from event @@ -408,8 +406,8 @@ processing. For example: # Imagine if there are some products with details = null. # The ones without details will fail due to the toUpperCase call. - feed = client.change_feed(fql( - 'Product.all().map(.details.toUpperCase()).toStream()' + feed = client.feed(fql( + 'Product.all().map(.details.toUpperCase()).eventSource()' )) for page in feed: @@ -426,13 +424,12 @@ processing. For example: ### Event Feed options -The client configuration sets default options for the ``change_feed()`` -method. +The client configuration sets default options for the ``feed()`` method. -You can pass a ``ChangeFeedOptions`` object to override these defaults: +You can pass a ``FeedOptions`` object to override these defaults: ```python -options = ChangeFeedOptions( +options = FeedOptions( max_attempts=3, max_backoff=20, query_timeout=timedelta(seconds=5), @@ -441,7 +438,7 @@ options = ChangeFeedOptions( start_ts=None, ) -client.change_feed(fql('Product.all().toStream()'), options) +client.feed(fql('Product.all().eventSource()'), options) ``` ## Event Streaming @@ -450,12 +447,11 @@ The driver supports [Event Streaming](https://docs.fauna.com/fauna/current/learn ### Start a stream -To get a stream token, append ``toStream()`` or ``changesOn()`` to a set from a -[supported source](https://docs.fauna.com/fauna/current/reference/streaming_reference/#supported-sources). +To get an event source, append ``eventSource()`` or ``eventsOn()`` to a +[supported Set](https://docs.fauna.com/fauna/current/reference/streaming_reference/#sets). -To start and subscribe to the stream, pass the stream token to -``stream()``: +To start and subscribe to the stream, pass the event source to ``stream()``: ```python from fauna import fql @@ -467,21 +463,21 @@ To start and subscribe to the stream, pass the stream token to let set = Product.all() { initialPage: set.pageSize(10), - streamToken: set.toStream() + eventSource: set.eventSource() } ''')) - initialPage = response.data['initialPage'] - streamToken = response.data['streamToken'] + initial_page = response.data['initialPage'] + event_source = response.data['eventSource'] - client.stream(streamToken) + client.stream(event_source) ``` -You can also pass a query that produces a stream token directly to +You can also pass a query that produces an event source directly to ``stream()``: ```python - query = fql('Product.all().changesOn(.price, .stock)') + query = fql('Product.all().eventsOn(.price, .stock)') client.stream(query) ``` @@ -492,18 +488,18 @@ You can also pass a query that produces a stream token directly to use a generator expression to iterate through the events: ```python -query = fql('Product.all().changesOn(.price, .stock)') +query = fql('Product.all().eventsOn(.price, .stock)') with client.stream(query) as stream: for event in stream: - eventType = event['type'] - if (eventType == 'add'): + event_type = event['type'] + if (event_type == 'add'): print('Add event: ', event) ## ... - elif (eventType == 'update'): + elif (event_type == 'update'): print('Update event: ', event) ## ... - elif (eventType == 'remove'): + elif (event_type == 'remove'): print('Remove event: ', event) ## ... ``` @@ -513,7 +509,7 @@ with client.stream(query) as stream: Use ``close()`` to close a stream: ```python -query = fql('Product.all().changesOn(.price, .stock)') +query = fql('Product.all().eventsOn(.price, .stock)') count = 0 with client.stream(query) as stream: @@ -540,7 +536,7 @@ client = Client() try: with client.stream(fql( - 'Product.all().changesOn(.price, .stock)' + 'Product.all().eventsOn(.price, .stock)' )) as stream: for event in stream: print(event) @@ -565,7 +561,7 @@ options = StreamOptions( status_events=False, ) -client.stream(fql('Product.all().toStream()'), options) +client.stream(fql('Product.all().eventSource()'), options) ``` ## Setup diff --git a/fauna/client/__init__.py b/fauna/client/__init__.py index 9770dd26..4f6e24a6 100644 --- a/fauna/client/__init__.py +++ b/fauna/client/__init__.py @@ -1,3 +1,3 @@ -from .client import Client, QueryOptions, StreamOptions, ChangeFeedOptions +from .client import Client, QueryOptions, StreamOptions, FeedOptions, FeedPage, FeedIterator from .endpoints import Endpoints from .headers import Header diff --git a/fauna/client/client.py b/fauna/client/client.py index 7212acec..8fc5024e 100644 --- a/fauna/client/client.py +++ b/fauna/client/client.py @@ -11,8 +11,7 @@ from fauna.errors import FaunaError, ClientError, ProtocolError, \ RetryableFaunaException, NetworkError from fauna.http.http_client import HTTPClient -from fauna.query import Query, Page, fql -from fauna.query.models import StreamToken +from fauna.query import EventSource, Query, Page, fql DefaultHttpConnectTimeout = timedelta(seconds=5) DefaultHttpReadTimeout: Optional[timedelta] = None @@ -71,7 +70,7 @@ class StreamOptions: @dataclass -class ChangeFeedOptions: +class FeedOptions: """ A dataclass representing options available for an Event Feed. @@ -420,13 +419,13 @@ def _query( def stream( self, - fql: Union[StreamToken, Query], + fql: Union[EventSource, Query], opts: StreamOptions = StreamOptions() ) -> "StreamIterator": """ Opens a Stream in Fauna and returns an iterator that consume Fauna events. - :param fql: A Query that returns a StreamToken or a StreamToken. + :param fql: An EventSource or a Query that returns an EventSource. :param opts: (Optional) Stream Options. :return: a :class:`StreamIterator` @@ -442,14 +441,14 @@ def stream( if isinstance(fql, Query): if opts.cursor is not None: raise ClientError( - "The 'cursor' configuration can only be used with a stream token.") + "The 'cursor' configuration can only be used with an event source.") - token = self.query(fql).data + source = self.query(fql).data else: - token = fql + source = fql - if not isinstance(token, StreamToken): - err_msg = f"'fql' must be a StreamToken, or a Query that returns a StreamToken but was a {type(token)}." + if not isinstance(source, EventSource): + err_msg = f"'fql' must be an EventSource, or a Query that returns an EventSource but was a {type(source)}." raise TypeError(err_msg) headers = self._headers.copy() @@ -457,20 +456,20 @@ def stream( headers[_Header.Authorization] = self._auth.bearer() return StreamIterator(self._session, headers, self._endpoint + "/stream/1", - self._max_attempts, self._max_backoff, opts, token) + self._max_attempts, self._max_backoff, opts, source) - def change_feed( + def feed( self, - fql: Union[StreamToken, Query], - opts: ChangeFeedOptions = ChangeFeedOptions() - ) -> "ChangeFeedIterator": + source: Union[EventSource, Query], + opts: FeedOptions = FeedOptions(), + ) -> "FeedIterator": """ Opens an Event Feed in Fauna and returns an iterator that consume Fauna events. - :param fql: A Query that returns a StreamToken or a StreamToken. + :param source: An EventSource or a Query that returns an EventSource. :param opts: (Optional) Event Feed options. - :return: a :class:`ChangeFeedIterator` + :return: a :class:`FeedIterator` :raises ClientError: Invalid options provided :raises NetworkError: HTTP Request failed in transit @@ -480,13 +479,11 @@ def change_feed( :raises TypeError: Invalid param types """ - if isinstance(fql, Query): - token = self.query(fql).data - else: - token = fql + if isinstance(source, Query): + source = self.query(source).data - if not isinstance(token, StreamToken): - err_msg = f"'fql' must be a StreamToken, or a Query that returns a StreamToken but was a {type(token)}." + if not isinstance(source, EventSource): + err_msg = f"'source' must be an EventSource, or a Query that returns an EventSource but was a {type(source)}." raise TypeError(err_msg) headers = self._headers.copy() @@ -499,10 +496,8 @@ def change_feed( elif self._query_timeout_ms is not None: headers[Header.QueryTimeoutMs] = str(self._query_timeout_ms) - return ChangeFeedIterator(self._session, headers, - self._endpoint + "/changefeed/1", - self._max_attempts, self._max_backoff, opts, - token) + return FeedIterator(self._session, headers, self._endpoint + "/feed/1", + self._max_attempts, self._max_backoff, opts, source) def _check_protocol(self, response_json: Any, status_code): # TODO: Logic to validate wire protocol belongs elsewhere. @@ -542,14 +537,14 @@ class StreamIterator: def __init__(self, http_client: HTTPClient, headers: Dict[str, str], endpoint: str, max_attempts: int, max_backoff: int, - opts: StreamOptions, token: StreamToken): + opts: StreamOptions, source: EventSource): self._http_client = http_client self._headers = headers self._endpoint = endpoint self._max_attempts = max_attempts self._max_backoff = max_backoff self._opts = opts - self._token = token + self._source = source self._stream = None self.last_ts = None self.last_cursor = None @@ -628,7 +623,7 @@ def _retry_stream(self): raise RetryableFaunaException def _create_stream(self): - data: Dict[str, Any] = {"token": self._token.token} + data: Dict[str, Any] = {"token": self._source.token} if self.last_cursor is not None: data["cursor"] = self.last_cursor elif self._opts.cursor is not None: @@ -644,7 +639,7 @@ def close(self): self._stream.close() -class ChangeFeedPage: +class FeedPage: def __init__(self, events: List[Any], cursor: str, stats: QueryStats): self._events = events @@ -661,22 +656,22 @@ def __iter__(self) -> Iterator[Any]: yield event -class ChangeFeedIterator: +class FeedIterator: """A class to provide an iterator on top of Event Feed pages.""" def __init__(self, http: HTTPClient, headers: Dict[str, str], endpoint: str, - max_attempts: int, max_backoff: int, opts: ChangeFeedOptions, - token: StreamToken): + max_attempts: int, max_backoff: int, opts: FeedOptions, + source: EventSource): self._http = http self._headers = headers self._endpoint = endpoint self._max_attempts = opts.max_attempts or max_attempts self._max_backoff = opts.max_backoff or max_backoff - self._request: Dict[str, Any] = {"token": token.token} + self._request: Dict[str, Any] = {"token": source.token} self._is_done = False if opts.start_ts is not None and opts.cursor is not None: - err_msg = "Only one of 'start_ts' or 'cursor' can be defined in the ChangeFeedOptions." + err_msg = "Only one of 'start_ts' or 'cursor' can be defined in the FeedOptions." raise TypeError(err_msg) if opts.page_size is not None: @@ -687,11 +682,11 @@ def __init__(self, http: HTTPClient, headers: Dict[str, str], endpoint: str, elif opts.start_ts is not None: self._request["start_ts"] = opts.start_ts - def __iter__(self) -> Iterator[ChangeFeedPage]: + def __iter__(self) -> Iterator[FeedPage]: self._is_done = False return self - def __next__(self) -> ChangeFeedPage: + def __next__(self) -> FeedPage: if self._is_done: raise StopIteration @@ -699,7 +694,7 @@ def __next__(self) -> ChangeFeedPage: self._next_page) return retryable.run().response - def _next_page(self) -> ChangeFeedPage: + def _next_page(self) -> FeedPage: with self._http.request( method="POST", url=self._endpoint, @@ -718,8 +713,8 @@ def _next_page(self) -> ChangeFeedPage: if "start_ts" in self._request: del self._request["start_ts"] - return ChangeFeedPage(decoded["events"], decoded["cursor"], - QueryStats(decoded["stats"])) + return FeedPage(decoded["events"], decoded["cursor"], + QueryStats(decoded["stats"])) def flatten(self) -> Iterator: """A generator that yields events instead of pages of events.""" diff --git a/fauna/encoding/decoder.py b/fauna/encoding/decoder.py index 0536be39..c02bc6e8 100644 --- a/fauna/encoding/decoder.py +++ b/fauna/encoding/decoder.py @@ -4,7 +4,7 @@ from iso8601 import parse_date from fauna.query.models import Module, DocumentReference, Document, NamedDocument, NamedDocumentReference, Page, \ - NullDocument, StreamToken + NullDocument, EventSource class FaunaDecoder: @@ -45,7 +45,7 @@ class FaunaDecoder: +--------------------+---------------+ | Page | @set | +--------------------+---------------+ - | StreamToken | @stream | + | EventSource | @stream | +--------------------+---------------+ """ @@ -64,7 +64,7 @@ def decode(obj: Any): - { "@ref": ... } decodes to a DocumentReference or NamedDocumentReference - { "@mod": ... } decodes to a Module - { "@set": ... } decodes to a Page - - { "@stream": ... } decodes to a StreamToken + - { "@stream": ... } decodes to an EventSource - { "@bytes": ... } decodes to a bytearray :param obj: the object to decode @@ -176,6 +176,6 @@ def _decode_dict(dct: dict, escaped: bool): return Page(data=data, after=after) if "@stream" in dct: - return StreamToken(dct["@stream"]) + return EventSource(dct["@stream"]) return {k: FaunaDecoder._decode(v) for k, v in dct.items()} diff --git a/fauna/encoding/encoder.py b/fauna/encoding/encoder.py index 986b8a1d..becfeb84 100644 --- a/fauna/encoding/encoder.py +++ b/fauna/encoding/encoder.py @@ -3,7 +3,7 @@ from typing import Any, Optional, List, Union from fauna.query.models import DocumentReference, Module, Document, NamedDocument, NamedDocumentReference, NullDocument, \ - StreamToken + EventSource from fauna.query.query_builder import Query, Fragment, LiteralFragment, ValueFragment _RESERVED_TAGS = [ @@ -62,7 +62,7 @@ class FaunaEncoder: +-------------------------------+---------------+ | TemplateFragment | string | +-------------------------------+---------------+ - | StreamToken | string | + | EventSource | string | +-------------------------------+---------------+ """ @@ -82,7 +82,7 @@ def encode(obj: Any) -> Any: - Query encodes to { "fql": [...] } - ValueFragment encodes to { "value": } - LiteralFragment encodes to a string - - StreamToken encodes to a string + - EventSource encodes to a string :raises ValueError: If value cannot be encoded, cannot be encoded safely, or there's a circular reference. :param obj: the object to decode @@ -163,7 +163,7 @@ def from_query_interpolation_builder(obj: Query): return {"fql": [FaunaEncoder.from_fragment(f) for f in obj.fragments]} @staticmethod - def from_streamtoken(obj: StreamToken): + def from_streamtoken(obj: EventSource): return {"@stream": obj.token} @staticmethod @@ -208,7 +208,7 @@ def _encode(o: Any, _markers: Optional[List] = None): return FaunaEncoder._encode_dict(o, _markers) elif isinstance(o, Query): return FaunaEncoder.from_query_interpolation_builder(o) - elif isinstance(o, StreamToken): + elif isinstance(o, EventSource): return FaunaEncoder.from_streamtoken(o) else: raise ValueError(f"Object {o} of type {type(o)} cannot be encoded") diff --git a/fauna/query/__init__.py b/fauna/query/__init__.py index e93730a7..7b77319d 100644 --- a/fauna/query/__init__.py +++ b/fauna/query/__init__.py @@ -1,2 +1,2 @@ -from .models import Document, DocumentReference, NamedDocument, NamedDocumentReference, NullDocument, Module, Page +from .models import Document, DocumentReference, EventSource, NamedDocument, NamedDocumentReference, NullDocument, Module, Page from .query_builder import fql, Query diff --git a/fauna/query/models.py b/fauna/query/models.py index c496c058..e47148c9 100644 --- a/fauna/query/models.py +++ b/fauna/query/models.py @@ -1,8 +1,25 @@ +import warnings from collections.abc import Mapping from datetime import datetime from typing import Union, Iterator, Any, Optional, List +# NB. Override __getattr__ and __dir__ to deprecate StreamToken usages. Based +# on: https://peps.python.org/pep-0562/ +def __getattr__(name): + if name == "StreamToken": + warnings.warn( + "StreamToken is deprecated. Prefer fauna.query.EventSource instead.", + DeprecationWarning, + stacklevel=2) + return EventSource + return super.__getattr__(name) # pyright: ignore + + +def __dir__(): + return super.__dir__() + "StreamToken" # pyright: ignore + + class Page: """A class representing a Set in Fauna.""" @@ -36,14 +53,14 @@ def __ne__(self, other): return not self.__eq__(other) -class StreamToken: - """A class represeting a Stream in Fauna.""" +class EventSource: + """A class represeting an EventSource in Fauna.""" def __init__(self, token: str): self.token = token def __eq__(self, other): - return isinstance(other, StreamToken) and self.token == other.token + return isinstance(other, EventSource) and self.token == other.token def __hash__(self): return hash(self.token) diff --git a/tests/integration/test_change_feeds.py b/tests/integration/test_feeds.py similarity index 65% rename from tests/integration/test_change_feeds.py rename to tests/integration/test_feeds.py index 51b8e306..f8cb4f2c 100644 --- a/tests/integration/test_change_feeds.py +++ b/tests/integration/test_feeds.py @@ -2,26 +2,26 @@ import pytest from fauna import fql -from fauna.client import ChangeFeedOptions +from fauna.client import FeedOptions from fauna.errors import AbortError -def test_change_feed_requires_stream(client, a_collection): +def test_feed_requires_stream(client, a_collection): with pytest.raises( TypeError, - match="'fql' must be a StreamToken, or a Query that returns a StreamToken but was a ." + match="'source' must be an EventSource, or a Query that returns an EventSource but was a ." ): - client.change_feed(fql("42")) + client.feed(fql("42")) -def test_change_feed_query(client, a_collection): - feed = client.change_feed(fql("${col}.all().toStream()", col=a_collection)) +def test_feed_query(client, a_collection): + feed = client.feed(fql("${col}.all().toStream()", col=a_collection)) _pull_one_event(client, a_collection, feed) -def test_change_feed_token(client, a_collection): - token = client.query(fql("${col}.all().toStream()", col=a_collection)).data - feed = client.change_feed(token) +def test_feed_event_source(client, a_collection): + source = client.query(fql("${col}.all().toStream()", col=a_collection)).data + feed = client.feed(source) _pull_one_event(client, a_collection, feed) @@ -37,8 +37,8 @@ def _pull_one_event(client, col, feed): assert events[0]['data']['foo'] == 'bar' -def test_change_feeds_error_event(client, a_collection): - feed = client.change_feed( +def test_feeds_error_event(client, a_collection): + feed = client.feed( fql("${col}.all().map(_ => abort('oops')).toStream()", col=a_collection)) client.query(fql("${col}.create({ foo: 'bar' })", col=a_collection)) @@ -48,8 +48,8 @@ def test_change_feeds_error_event(client, a_collection): @pytest.mark.xfail(reason="pending core support") -def test_change_feeds_continue_after_an_error(client, a_collection): - feed = client.change_feed( +def test_feeds_continue_after_an_error(client, a_collection): + feed = client.feed( fql(''' ${col} .all() @@ -81,8 +81,8 @@ def test_change_feeds_continue_after_an_error(client, a_collection): assert events == [0, 2] -def test_change_feed_start_ts(client, a_collection): - token = client.query( +def test_feed_start_ts(client, a_collection): + source = client.query( fql("${col}.all().map(.n).toStream()", col=a_collection)).data # NB. Issue separate queries to ensure they get different txn times. @@ -91,25 +91,25 @@ def test_change_feed_start_ts(client, a_collection): # NB. Use a short page size to ensure that more than one roundtrip is made, # thus testing the interator's internal cursoring is correct. - first = next(client.change_feed(token).flatten()) - opts = ChangeFeedOptions(start_ts=first['txn_ts'], page_size=5) - feed = client.change_feed(token, opts) + first = next(client.feed(source).flatten()) + opts = FeedOptions(start_ts=first['txn_ts'], page_size=5) + feed = client.feed(source, opts) nums = [event['data'] for event in feed.flatten()] assert nums == list(range(1, 64)) -def test_change_feed_cursor(client, a_collection): - token = client.query( +def test_feed_cursor(client, a_collection): + source = client.query( fql("${col}.all().map(.n).toStream()", col=a_collection)).data _create_docs(client, a_collection, 0, 64) # NB. Use a short page size to ensure that more than one roundtrip is made, # thus testing the interator's internal cursoring is correct. - first = next(client.change_feed(token).flatten()) - opts = ChangeFeedOptions(cursor=first['cursor'], page_size=5) - feed = client.change_feed(token, opts) + first = next(client.feed(source).flatten()) + opts = FeedOptions(cursor=first['cursor'], page_size=5) + feed = client.feed(source, opts) nums = [event['data'] for event in feed.flatten()] assert nums == list(range(1, 64)) @@ -119,16 +119,15 @@ def test_rejects_when_both_start_ts_and_cursor_provided(scoped_client): scoped_client.query(fql("Collection.create({name: 'Product'})")) response = scoped_client.query(fql("Product.all().toStream()")) - stream_token = response.data + source = response.data with pytest.raises(TypeError): - opts = ChangeFeedOptions(cursor="abc1234==", start_ts=response.txn_ts) - scoped_client.change_feed(stream_token, opts) + opts = FeedOptions(cursor="abc1234==", start_ts=response.txn_ts) + scoped_client.feed(source, opts) -def test_change_feed_reusable_iterator(client, a_collection): - feed = client.change_feed( - fql("${col}.all().map(.n).toStream()", col=a_collection)) +def test_feed_reusable_iterator(client, a_collection): + feed = client.feed(fql("${col}.all().map(.n).toStream()", col=a_collection)) _create_docs(client, a_collection, 0, 5) nums = [event['data'] for event in feed.flatten()] diff --git a/tests/integration/test_stream.py b/tests/integration/test_stream.py index 2eb53a59..44e355b2 100644 --- a/tests/integration/test_stream.py +++ b/tests/integration/test_stream.py @@ -167,7 +167,7 @@ def thread_fn(): def test_providing_start_ts(scoped_client): scoped_client.query(fql("Collection.create({name: 'Product'})")) - stream_token = scoped_client.query(fql("Product.all().toStream()")).data + source = scoped_client.query(fql("Product.all().toStream()")).data createOne = scoped_client.query(fql("Product.create({})")) createTwo = scoped_client.query(fql("Product.create({})")) @@ -178,7 +178,7 @@ def test_providing_start_ts(scoped_client): def thread_fn(): # replay excludes the ts that was passed in, it provides events for all ts after the one provided - stream = scoped_client.stream(stream_token, + stream = scoped_client.stream(source, StreamOptions(start_ts=createOne.txn_ts)) barrier.wait() with stream as iter: @@ -201,12 +201,12 @@ def thread_fn(): def test_providing_cursor(scoped_client): scoped_client.query(fql("Collection.create({name: 'Product'})")) - stream_token = scoped_client.query(fql("Product.all().toStream()")).data + source = scoped_client.query(fql("Product.all().toStream()")).data create1 = scoped_client.query(fql("Product.create({ value: 1 })")) create2 = scoped_client.query(fql("Product.create({ value: 2 })")) cursor = None - with scoped_client.stream(stream_token) as iter: + with scoped_client.stream(source) as iter: for event in iter: assert event["type"] == "add" assert event["data"]["value"] == 1 @@ -214,7 +214,7 @@ def test_providing_cursor(scoped_client): break opts = StreamOptions(cursor=cursor) - with scoped_client.stream(stream_token, opts) as iter: + with scoped_client.stream(source, opts) as iter: for event in iter: assert event["type"] == "add" assert event["data"]["value"] == 2 @@ -224,7 +224,8 @@ def test_providing_cursor(scoped_client): def test_rejects_cursor_with_fql_query(scoped_client): with pytest.raises( ClientError, - match="The 'cursor' configuration can only be used with a stream token."): + match="The 'cursor' configuration can only be used with an event source." + ): opts = StreamOptions(cursor="abc1234==") scoped_client.stream(fql("Collection.create({name: 'Product'})"), opts) @@ -233,11 +234,11 @@ def test_rejects_when_both_start_ts_and_cursor_provided(scoped_client): scoped_client.query(fql("Collection.create({name: 'Product'})")) response = scoped_client.query(fql("Product.all().toStream()")) - stream_token = response.data + source = response.data with pytest.raises(TypeError): opts = StreamOptions(cursor="abc1234==", start_ts=response.txn_ts) - scoped_client.stream(stream_token, opts) + scoped_client.stream(source, opts) def test_handle_status_events(scoped_client): diff --git a/tests/unit/test_client.py b/tests/unit/test_client.py index ad8e6a55..8cfd636a 100644 --- a/tests/unit/test_client.py +++ b/tests/unit/test_client.py @@ -11,7 +11,7 @@ from fauna.client import Client, Header, QueryOptions, Endpoints, StreamOptions from fauna.errors import QueryCheckError, ProtocolError, QueryRuntimeError, NetworkError, AbortError from fauna.http import HTTPXClient -from fauna.query.models import StreamToken +from fauna.query import EventSource def test_client_defaults(monkeypatch): @@ -429,7 +429,7 @@ def test_client_stream(subtests, httpx_mock: HTTPXMock): with httpx.Client() as mockClient: http_client = HTTPXClient(mockClient) c = Client(http_client=http_client) - with c.stream(StreamToken("token")) as stream: + with c.stream(EventSource("token")) as stream: ret = [obj for obj in stream] assert stream.last_ts == 3 @@ -456,7 +456,7 @@ def test_client_close_stream(subtests, httpx_mock: HTTPXMock): with httpx.Client() as mockClient: http_client = HTTPXClient(mockClient) c = Client(http_client=http_client) - with c.stream(StreamToken("token")) as stream: + with c.stream(EventSource("token")) as stream: assert next(stream) == {"type": "add", "txn_ts": 2, "cursor": "b"} stream.close() @@ -484,7 +484,7 @@ def stream_iter1(): with httpx.Client() as mockClient: http_client = HTTPXClient(mockClient) c = Client(http_client=http_client) - with c.stream(StreamToken("token")) as stream: + with c.stream(EventSource("token")) as stream: ret = [obj for obj in stream] assert stream.last_ts == 4 @@ -508,7 +508,7 @@ def stream_iter(): with httpx.Client() as mockClient: http_client = HTTPXClient(mockClient) c = Client(http_client=http_client) - with c.stream(StreamToken("token")) as stream: + with c.stream(EventSource("token")) as stream: for obj in stream: ret.append(obj) @@ -533,7 +533,7 @@ def stream_iter(): with httpx.Client() as mockClient: http_client = HTTPXClient(mockClient) c = Client(http_client=http_client) - with c.stream(StreamToken("token")) as stream: + with c.stream(EventSource("token")) as stream: for obj in stream: ret.append(obj) @@ -565,7 +565,7 @@ def stream_iter(): with httpx.Client() as mockClient: http_client = HTTPXClient(mockClient) c = Client(http_client=http_client) - with c.stream(StreamToken("token"), + with c.stream(EventSource("token"), StreamOptions(status_events=True)) as stream: for obj in stream: ret.append(obj) diff --git a/tests/unit/test_encoding.py b/tests/unit/test_encoding.py index ba85b89f..effa267e 100644 --- a/tests/unit/test_encoding.py +++ b/tests/unit/test_encoding.py @@ -8,7 +8,7 @@ from fauna import fql from fauna.encoding import FaunaEncoder, FaunaDecoder from fauna.query.models import DocumentReference, NamedDocumentReference, Document, NamedDocument, Module, Page, \ - NullDocument, StreamToken + NullDocument, EventSource fixed_datetime = datetime.fromisoformat("2023-03-17T00:00:00+00:00") @@ -799,14 +799,14 @@ def test_encode_query_builder_sub_queries(subtests): def test_decode_stream(subtests): - with subtests.test(msg="decode @stream into StreamToken"): + with subtests.test(msg="decode @stream into EventSource"): test = {"@stream": "asdflkj"} decoded = FaunaDecoder.decode(test) - assert decoded == StreamToken("asdflkj") + assert decoded == EventSource("asdflkj") def test_encode_stream(subtests): - with subtests.test(msg="encode StreamToken into @stream"): + with subtests.test(msg="encode EventSource into @stream"): test = {"@stream": "asdflkj"} - encoded = FaunaEncoder.encode(StreamToken("asdflkj")) + encoded = FaunaEncoder.encode(EventSource("asdflkj")) assert encoded == test diff --git a/tests/unit/test_models.py b/tests/unit/test_models.py index 437555ab..fcc3ba05 100644 --- a/tests/unit/test_models.py +++ b/tests/unit/test_models.py @@ -1,4 +1,5 @@ import datetime +import pytest from fauna.query.models import Document, Module, NamedDocument, BaseReference, DocumentReference, \ NamedDocumentReference, Page @@ -131,3 +132,9 @@ def test_document_equality(subtests): other = 123 _ = d1 == other _ = d1 != other + + +def test_stream_token_deprecation(): + with pytest.deprecated_call(): + from fauna.query.models import StreamToken, EventSource + assert StreamToken == EventSource # same runtime.