From 6558da8dde2ace6aec613ae13de46c8dfe2ff15b Mon Sep 17 00:00:00 2001 From: Erick Pintor Date: Thu, 17 Oct 2024 14:57:54 -0300 Subject: [PATCH] rename StreamToken to EventSource --- fauna/client/client.py | 25 ++++++++++++------------- fauna/encoding/decoder.py | 8 ++++---- fauna/encoding/encoder.py | 10 +++++----- fauna/query/__init__.py | 2 +- fauna/query/models.py | 23 ++++++++++++++++++++--- tests/integration/test_change_feeds.py | 24 ++++++++++++------------ tests/integration/test_stream.py | 17 +++++++++-------- tests/unit/test_client.py | 14 +++++++------- tests/unit/test_encoding.py | 10 +++++----- tests/unit/test_models.py | 7 +++++++ 10 files changed, 82 insertions(+), 58 deletions(-) diff --git a/fauna/client/client.py b/fauna/client/client.py index 7212acec..fbf81d13 100644 --- a/fauna/client/client.py +++ b/fauna/client/client.py @@ -11,8 +11,7 @@ from fauna.errors import FaunaError, ClientError, ProtocolError, \ RetryableFaunaException, NetworkError from fauna.http.http_client import HTTPClient -from fauna.query import Query, Page, fql -from fauna.query.models import StreamToken +from fauna.query import EventSource, Query, Page, fql DefaultHttpConnectTimeout = timedelta(seconds=5) DefaultHttpReadTimeout: Optional[timedelta] = None @@ -420,13 +419,13 @@ def _query( def stream( self, - fql: Union[StreamToken, Query], + fql: Union[EventSource, Query], opts: StreamOptions = StreamOptions() ) -> "StreamIterator": """ Opens a Stream in Fauna and returns an iterator that consume Fauna events. - :param fql: A Query that returns a StreamToken or a StreamToken. + :param fql: A Query that returns a EventSource or a EventSource. :param opts: (Optional) Stream Options. :return: a :class:`StreamIterator` @@ -442,14 +441,14 @@ def stream( if isinstance(fql, Query): if opts.cursor is not None: raise ClientError( - "The 'cursor' configuration can only be used with a stream token.") + "The 'cursor' configuration can only be used with an event source.") token = self.query(fql).data else: token = fql - if not isinstance(token, StreamToken): - err_msg = f"'fql' must be a StreamToken, or a Query that returns a StreamToken but was a {type(token)}." + if not isinstance(token, EventSource): + err_msg = f"'fql' must be a EventSource, or a Query that returns a EventSource but was a {type(token)}." raise TypeError(err_msg) headers = self._headers.copy() @@ -461,13 +460,13 @@ def stream( def change_feed( self, - fql: Union[StreamToken, Query], + fql: Union[EventSource, Query], opts: ChangeFeedOptions = ChangeFeedOptions() ) -> "ChangeFeedIterator": """ Opens an Event Feed in Fauna and returns an iterator that consume Fauna events. - :param fql: A Query that returns a StreamToken or a StreamToken. + :param fql: A Query that returns a EventSource or a EventSource. :param opts: (Optional) Event Feed options. :return: a :class:`ChangeFeedIterator` @@ -485,8 +484,8 @@ def change_feed( else: token = fql - if not isinstance(token, StreamToken): - err_msg = f"'fql' must be a StreamToken, or a Query that returns a StreamToken but was a {type(token)}." + if not isinstance(token, EventSource): + err_msg = f"'fql' must be a EventSource, or a Query that returns a EventSource but was a {type(token)}." raise TypeError(err_msg) headers = self._headers.copy() @@ -542,7 +541,7 @@ class StreamIterator: def __init__(self, http_client: HTTPClient, headers: Dict[str, str], endpoint: str, max_attempts: int, max_backoff: int, - opts: StreamOptions, token: StreamToken): + opts: StreamOptions, token: EventSource): self._http_client = http_client self._headers = headers self._endpoint = endpoint @@ -666,7 +665,7 @@ class ChangeFeedIterator: def __init__(self, http: HTTPClient, headers: Dict[str, str], endpoint: str, max_attempts: int, max_backoff: int, opts: ChangeFeedOptions, - token: StreamToken): + token: EventSource): self._http = http self._headers = headers self._endpoint = endpoint diff --git a/fauna/encoding/decoder.py b/fauna/encoding/decoder.py index 0536be39..6c485b68 100644 --- a/fauna/encoding/decoder.py +++ b/fauna/encoding/decoder.py @@ -4,7 +4,7 @@ from iso8601 import parse_date from fauna.query.models import Module, DocumentReference, Document, NamedDocument, NamedDocumentReference, Page, \ - NullDocument, StreamToken + NullDocument, EventSource class FaunaDecoder: @@ -45,7 +45,7 @@ class FaunaDecoder: +--------------------+---------------+ | Page | @set | +--------------------+---------------+ - | StreamToken | @stream | + | EventSource | @stream | +--------------------+---------------+ """ @@ -64,7 +64,7 @@ def decode(obj: Any): - { "@ref": ... } decodes to a DocumentReference or NamedDocumentReference - { "@mod": ... } decodes to a Module - { "@set": ... } decodes to a Page - - { "@stream": ... } decodes to a StreamToken + - { "@stream": ... } decodes to a EventSource - { "@bytes": ... } decodes to a bytearray :param obj: the object to decode @@ -176,6 +176,6 @@ def _decode_dict(dct: dict, escaped: bool): return Page(data=data, after=after) if "@stream" in dct: - return StreamToken(dct["@stream"]) + return EventSource(dct["@stream"]) return {k: FaunaDecoder._decode(v) for k, v in dct.items()} diff --git a/fauna/encoding/encoder.py b/fauna/encoding/encoder.py index 986b8a1d..becfeb84 100644 --- a/fauna/encoding/encoder.py +++ b/fauna/encoding/encoder.py @@ -3,7 +3,7 @@ from typing import Any, Optional, List, Union from fauna.query.models import DocumentReference, Module, Document, NamedDocument, NamedDocumentReference, NullDocument, \ - StreamToken + EventSource from fauna.query.query_builder import Query, Fragment, LiteralFragment, ValueFragment _RESERVED_TAGS = [ @@ -62,7 +62,7 @@ class FaunaEncoder: +-------------------------------+---------------+ | TemplateFragment | string | +-------------------------------+---------------+ - | StreamToken | string | + | EventSource | string | +-------------------------------+---------------+ """ @@ -82,7 +82,7 @@ def encode(obj: Any) -> Any: - Query encodes to { "fql": [...] } - ValueFragment encodes to { "value": } - LiteralFragment encodes to a string - - StreamToken encodes to a string + - EventSource encodes to a string :raises ValueError: If value cannot be encoded, cannot be encoded safely, or there's a circular reference. :param obj: the object to decode @@ -163,7 +163,7 @@ def from_query_interpolation_builder(obj: Query): return {"fql": [FaunaEncoder.from_fragment(f) for f in obj.fragments]} @staticmethod - def from_streamtoken(obj: StreamToken): + def from_streamtoken(obj: EventSource): return {"@stream": obj.token} @staticmethod @@ -208,7 +208,7 @@ def _encode(o: Any, _markers: Optional[List] = None): return FaunaEncoder._encode_dict(o, _markers) elif isinstance(o, Query): return FaunaEncoder.from_query_interpolation_builder(o) - elif isinstance(o, StreamToken): + elif isinstance(o, EventSource): return FaunaEncoder.from_streamtoken(o) else: raise ValueError(f"Object {o} of type {type(o)} cannot be encoded") diff --git a/fauna/query/__init__.py b/fauna/query/__init__.py index e93730a7..7b77319d 100644 --- a/fauna/query/__init__.py +++ b/fauna/query/__init__.py @@ -1,2 +1,2 @@ -from .models import Document, DocumentReference, NamedDocument, NamedDocumentReference, NullDocument, Module, Page +from .models import Document, DocumentReference, EventSource, NamedDocument, NamedDocumentReference, NullDocument, Module, Page from .query_builder import fql, Query diff --git a/fauna/query/models.py b/fauna/query/models.py index c496c058..57ad7443 100644 --- a/fauna/query/models.py +++ b/fauna/query/models.py @@ -1,8 +1,25 @@ +import warnings from collections.abc import Mapping from datetime import datetime from typing import Union, Iterator, Any, Optional, List +# NB. Override __getattr__ and __dir__ to deprecate StreamToken usages. Based +# on: https://peps.python.org/pep-0562/ +def __getattr__(name): + if name == "StreamToken": + warnings.warn( + "StreamToken is deprecated. Prefer fauna.query.EventSource instead.", + DeprecationWarning, + stacklevel=2) + return EventSource + return super.__getattr__(name) + + +def __dir__(): + return super.__dir__() + "StreamToken" + + class Page: """A class representing a Set in Fauna.""" @@ -36,14 +53,14 @@ def __ne__(self, other): return not self.__eq__(other) -class StreamToken: - """A class represeting a Stream in Fauna.""" +class EventSource: + """A class represeting an EventSource in Fauna.""" def __init__(self, token: str): self.token = token def __eq__(self, other): - return isinstance(other, StreamToken) and self.token == other.token + return isinstance(other, EventSource) and self.token == other.token def __hash__(self): return hash(self.token) diff --git a/tests/integration/test_change_feeds.py b/tests/integration/test_change_feeds.py index 51b8e306..66532c03 100644 --- a/tests/integration/test_change_feeds.py +++ b/tests/integration/test_change_feeds.py @@ -9,7 +9,7 @@ def test_change_feed_requires_stream(client, a_collection): with pytest.raises( TypeError, - match="'fql' must be a StreamToken, or a Query that returns a StreamToken but was a ." + match="'fql' must be a EventSource, or a Query that returns a EventSource but was a ." ): client.change_feed(fql("42")) @@ -19,9 +19,9 @@ def test_change_feed_query(client, a_collection): _pull_one_event(client, a_collection, feed) -def test_change_feed_token(client, a_collection): - token = client.query(fql("${col}.all().toStream()", col=a_collection)).data - feed = client.change_feed(token) +def test_change_feed_event_source(client, a_collection): + source = client.query(fql("${col}.all().toStream()", col=a_collection)).data + feed = client.change_feed(source) _pull_one_event(client, a_collection, feed) @@ -82,7 +82,7 @@ def test_change_feeds_continue_after_an_error(client, a_collection): def test_change_feed_start_ts(client, a_collection): - token = client.query( + source = client.query( fql("${col}.all().map(.n).toStream()", col=a_collection)).data # NB. Issue separate queries to ensure they get different txn times. @@ -91,25 +91,25 @@ def test_change_feed_start_ts(client, a_collection): # NB. Use a short page size to ensure that more than one roundtrip is made, # thus testing the interator's internal cursoring is correct. - first = next(client.change_feed(token).flatten()) + first = next(client.change_feed(source).flatten()) opts = ChangeFeedOptions(start_ts=first['txn_ts'], page_size=5) - feed = client.change_feed(token, opts) + feed = client.change_feed(source, opts) nums = [event['data'] for event in feed.flatten()] assert nums == list(range(1, 64)) def test_change_feed_cursor(client, a_collection): - token = client.query( + source = client.query( fql("${col}.all().map(.n).toStream()", col=a_collection)).data _create_docs(client, a_collection, 0, 64) # NB. Use a short page size to ensure that more than one roundtrip is made, # thus testing the interator's internal cursoring is correct. - first = next(client.change_feed(token).flatten()) + first = next(client.change_feed(source).flatten()) opts = ChangeFeedOptions(cursor=first['cursor'], page_size=5) - feed = client.change_feed(token, opts) + feed = client.change_feed(source, opts) nums = [event['data'] for event in feed.flatten()] assert nums == list(range(1, 64)) @@ -119,11 +119,11 @@ def test_rejects_when_both_start_ts_and_cursor_provided(scoped_client): scoped_client.query(fql("Collection.create({name: 'Product'})")) response = scoped_client.query(fql("Product.all().toStream()")) - stream_token = response.data + source = response.data with pytest.raises(TypeError): opts = ChangeFeedOptions(cursor="abc1234==", start_ts=response.txn_ts) - scoped_client.change_feed(stream_token, opts) + scoped_client.change_feed(source, opts) def test_change_feed_reusable_iterator(client, a_collection): diff --git a/tests/integration/test_stream.py b/tests/integration/test_stream.py index 2eb53a59..44e355b2 100644 --- a/tests/integration/test_stream.py +++ b/tests/integration/test_stream.py @@ -167,7 +167,7 @@ def thread_fn(): def test_providing_start_ts(scoped_client): scoped_client.query(fql("Collection.create({name: 'Product'})")) - stream_token = scoped_client.query(fql("Product.all().toStream()")).data + source = scoped_client.query(fql("Product.all().toStream()")).data createOne = scoped_client.query(fql("Product.create({})")) createTwo = scoped_client.query(fql("Product.create({})")) @@ -178,7 +178,7 @@ def test_providing_start_ts(scoped_client): def thread_fn(): # replay excludes the ts that was passed in, it provides events for all ts after the one provided - stream = scoped_client.stream(stream_token, + stream = scoped_client.stream(source, StreamOptions(start_ts=createOne.txn_ts)) barrier.wait() with stream as iter: @@ -201,12 +201,12 @@ def thread_fn(): def test_providing_cursor(scoped_client): scoped_client.query(fql("Collection.create({name: 'Product'})")) - stream_token = scoped_client.query(fql("Product.all().toStream()")).data + source = scoped_client.query(fql("Product.all().toStream()")).data create1 = scoped_client.query(fql("Product.create({ value: 1 })")) create2 = scoped_client.query(fql("Product.create({ value: 2 })")) cursor = None - with scoped_client.stream(stream_token) as iter: + with scoped_client.stream(source) as iter: for event in iter: assert event["type"] == "add" assert event["data"]["value"] == 1 @@ -214,7 +214,7 @@ def test_providing_cursor(scoped_client): break opts = StreamOptions(cursor=cursor) - with scoped_client.stream(stream_token, opts) as iter: + with scoped_client.stream(source, opts) as iter: for event in iter: assert event["type"] == "add" assert event["data"]["value"] == 2 @@ -224,7 +224,8 @@ def test_providing_cursor(scoped_client): def test_rejects_cursor_with_fql_query(scoped_client): with pytest.raises( ClientError, - match="The 'cursor' configuration can only be used with a stream token."): + match="The 'cursor' configuration can only be used with an event source." + ): opts = StreamOptions(cursor="abc1234==") scoped_client.stream(fql("Collection.create({name: 'Product'})"), opts) @@ -233,11 +234,11 @@ def test_rejects_when_both_start_ts_and_cursor_provided(scoped_client): scoped_client.query(fql("Collection.create({name: 'Product'})")) response = scoped_client.query(fql("Product.all().toStream()")) - stream_token = response.data + source = response.data with pytest.raises(TypeError): opts = StreamOptions(cursor="abc1234==", start_ts=response.txn_ts) - scoped_client.stream(stream_token, opts) + scoped_client.stream(source, opts) def test_handle_status_events(scoped_client): diff --git a/tests/unit/test_client.py b/tests/unit/test_client.py index ad8e6a55..8cfd636a 100644 --- a/tests/unit/test_client.py +++ b/tests/unit/test_client.py @@ -11,7 +11,7 @@ from fauna.client import Client, Header, QueryOptions, Endpoints, StreamOptions from fauna.errors import QueryCheckError, ProtocolError, QueryRuntimeError, NetworkError, AbortError from fauna.http import HTTPXClient -from fauna.query.models import StreamToken +from fauna.query import EventSource def test_client_defaults(monkeypatch): @@ -429,7 +429,7 @@ def test_client_stream(subtests, httpx_mock: HTTPXMock): with httpx.Client() as mockClient: http_client = HTTPXClient(mockClient) c = Client(http_client=http_client) - with c.stream(StreamToken("token")) as stream: + with c.stream(EventSource("token")) as stream: ret = [obj for obj in stream] assert stream.last_ts == 3 @@ -456,7 +456,7 @@ def test_client_close_stream(subtests, httpx_mock: HTTPXMock): with httpx.Client() as mockClient: http_client = HTTPXClient(mockClient) c = Client(http_client=http_client) - with c.stream(StreamToken("token")) as stream: + with c.stream(EventSource("token")) as stream: assert next(stream) == {"type": "add", "txn_ts": 2, "cursor": "b"} stream.close() @@ -484,7 +484,7 @@ def stream_iter1(): with httpx.Client() as mockClient: http_client = HTTPXClient(mockClient) c = Client(http_client=http_client) - with c.stream(StreamToken("token")) as stream: + with c.stream(EventSource("token")) as stream: ret = [obj for obj in stream] assert stream.last_ts == 4 @@ -508,7 +508,7 @@ def stream_iter(): with httpx.Client() as mockClient: http_client = HTTPXClient(mockClient) c = Client(http_client=http_client) - with c.stream(StreamToken("token")) as stream: + with c.stream(EventSource("token")) as stream: for obj in stream: ret.append(obj) @@ -533,7 +533,7 @@ def stream_iter(): with httpx.Client() as mockClient: http_client = HTTPXClient(mockClient) c = Client(http_client=http_client) - with c.stream(StreamToken("token")) as stream: + with c.stream(EventSource("token")) as stream: for obj in stream: ret.append(obj) @@ -565,7 +565,7 @@ def stream_iter(): with httpx.Client() as mockClient: http_client = HTTPXClient(mockClient) c = Client(http_client=http_client) - with c.stream(StreamToken("token"), + with c.stream(EventSource("token"), StreamOptions(status_events=True)) as stream: for obj in stream: ret.append(obj) diff --git a/tests/unit/test_encoding.py b/tests/unit/test_encoding.py index ba85b89f..effa267e 100644 --- a/tests/unit/test_encoding.py +++ b/tests/unit/test_encoding.py @@ -8,7 +8,7 @@ from fauna import fql from fauna.encoding import FaunaEncoder, FaunaDecoder from fauna.query.models import DocumentReference, NamedDocumentReference, Document, NamedDocument, Module, Page, \ - NullDocument, StreamToken + NullDocument, EventSource fixed_datetime = datetime.fromisoformat("2023-03-17T00:00:00+00:00") @@ -799,14 +799,14 @@ def test_encode_query_builder_sub_queries(subtests): def test_decode_stream(subtests): - with subtests.test(msg="decode @stream into StreamToken"): + with subtests.test(msg="decode @stream into EventSource"): test = {"@stream": "asdflkj"} decoded = FaunaDecoder.decode(test) - assert decoded == StreamToken("asdflkj") + assert decoded == EventSource("asdflkj") def test_encode_stream(subtests): - with subtests.test(msg="encode StreamToken into @stream"): + with subtests.test(msg="encode EventSource into @stream"): test = {"@stream": "asdflkj"} - encoded = FaunaEncoder.encode(StreamToken("asdflkj")) + encoded = FaunaEncoder.encode(EventSource("asdflkj")) assert encoded == test diff --git a/tests/unit/test_models.py b/tests/unit/test_models.py index 437555ab..fcc3ba05 100644 --- a/tests/unit/test_models.py +++ b/tests/unit/test_models.py @@ -1,4 +1,5 @@ import datetime +import pytest from fauna.query.models import Document, Module, NamedDocument, BaseReference, DocumentReference, \ NamedDocumentReference, Page @@ -131,3 +132,9 @@ def test_document_equality(subtests): other = 123 _ = d1 == other _ = d1 != other + + +def test_stream_token_deprecation(): + with pytest.deprecated_call(): + from fauna.query.models import StreamToken, EventSource + assert StreamToken == EventSource # same runtime.