diff --git a/README.md b/README.md index 6238e2bd..1b446c40 100644 --- a/README.md +++ b/README.md @@ -291,14 +291,13 @@ The driver supports [Event Feeds](https://docs.fauna.com/fauna/current/learn/tra ### Request an Event Feed -An Event Feed asynchronously polls an [event stream](https://docs.fauna.com/fauna/current/learn/streaming), -represented by a stream token, for events. +An Event Feed asynchronously polls an [event source](https://docs.fauna.com/fauna/current/learn/streaming), +for paginated events. -To get a stream token, append ``toStream()`` or ``changesOn()`` to a set from a -[supported source](https://docs.fauna.com/fauna/current/reference/streaming_reference/#supported-sources). +To get an event source, append ``eventSource()`` or ``eventsOn()`` to a +[supported Set](https://docs.fauna.com/fauna/current/reference/streaming_reference/#sets). -To get paginated events for the stream, pass the stream token to -``change_feed()``: +To get paginated events for the source, pass the event source to ``feed()``: ```python from fauna import fql @@ -310,46 +309,45 @@ To get paginated events for the stream, pass the stream token to let set = Product.all() { initialPage: set.pageSize(10), - streamToken: set.toStream() + eventSource: set.eventSource() } ''')) - initialPage = response.data['initialPage'] - streamToken = response.data['streamToken'] + initial_page = response.data['initialPage'] + event_source = response.data['eventSource'] - client.change_feed(streamToken) + client.feed(event_source) ``` -You can also pass a query that produces a stream token directly to -``change_feed()``: +You can also pass a query that produces an event source directly to ``feed()``: ```python - query = fql('Product.all().changesOn(.price, .stock)') + query = fql('Product.all().eventsOn(.price, .stock)') - client.change_feed(query) + client.feed(query) ``` ### Iterate on an Event Feed -``change_feed()`` returns an iterator that emits pages of events. You can use a +``feed()`` returns an iterator that emits pages of events. You can use a generator expression to iterate through the pages: ```python - query = fql('Product.all().changesOn(.price, .stock)') - feed = client.change_feed(query) + query = fql('Product.all().eventsOn(.price, .stock)') + feed = client.feed(query) for page in feed: print('Page stats: ', page.stats) for event in page: - eventType = event['type'] - if (eventType == 'add'): + event_type = event['type'] + if (event_type == 'add'): print('Add event: ', event) ## ... - elif (eventType == 'update'): + elif (event_type == 'update'): print('Update event: ', event) ## ... - elif (eventType == 'remove'): + elif (event_type == 'remove'): print('Remove event: ', event) ## ... ``` @@ -358,11 +356,11 @@ Alternatively, you can iterate through events instead of pages with ``flatten()``: ```python - query = fql('Product.all().changesOn(.price, .stock)') - feed = client.change_feed(query) + query = fql('Product.all().eventsOn(.price, .stock)') + feed = client.feed(query) for event in feed.flatten(): - eventType = event['type'] + event_type = event['type'] ## ... ``` @@ -381,8 +379,8 @@ raises a ``FaunaException``: client = Client() try: - feed = client.change_feed(fql( - 'Product.all().changesOn(.price, .stock)' + feed = client.feed(fql( + 'Product.all().eventsOn(.price, .stock)' )) for event in feed.flatten(): print(event) @@ -393,7 +391,7 @@ raises a ``FaunaException``: Errors can be raised at two different places: -1. At the ``change_feed`` method call; +1. At the ``feed`` method call; 2. At the page iteration. This distinction allows for users to ignore errors originating from event @@ -408,8 +406,8 @@ processing. For example: # Imagine if there are some products with details = null. # The ones without details will fail due to the toUpperCase call. - feed = client.change_feed(fql( - 'Product.all().map(.details.toUpperCase()).toStream()' + feed = client.feed(fql( + 'Product.all().map(.details.toUpperCase()).eventSource()' )) for page in feed: @@ -426,13 +424,12 @@ processing. For example: ### Event Feed options -The client configuration sets default options for the ``change_feed()`` -method. +The client configuration sets default options for the ``feed()`` method. -You can pass a ``ChangeFeedOptions`` object to override these defaults: +You can pass a ``FeedOptions`` object to override these defaults: ```python -options = ChangeFeedOptions( +options = FeedOptions( max_attempts=3, max_backoff=20, query_timeout=timedelta(seconds=5), @@ -441,7 +438,7 @@ options = ChangeFeedOptions( start_ts=None, ) -client.change_feed(fql('Product.all().toStream()'), options) +client.feed(fql('Product.all().eventSource()'), options) ``` ## Event Streaming @@ -450,12 +447,11 @@ The driver supports [Event Streaming](https://docs.fauna.com/fauna/current/learn ### Start a stream -To get a stream token, append ``toStream()`` or ``changesOn()`` to a set from a -[supported source](https://docs.fauna.com/fauna/current/reference/streaming_reference/#supported-sources). +To get an event source, append ``eventSource()`` or ``eventsOn()`` to a +[supported Set](https://docs.fauna.com/fauna/current/reference/streaming_reference/#sets). -To start and subscribe to the stream, pass the stream token to -``stream()``: +To start and subscribe to the stream, pass the event source to ``stream()``: ```python from fauna import fql @@ -467,21 +463,21 @@ To start and subscribe to the stream, pass the stream token to let set = Product.all() { initialPage: set.pageSize(10), - streamToken: set.toStream() + eventSource: set.eventSource() } ''')) - initialPage = response.data['initialPage'] - streamToken = response.data['streamToken'] + initial_page = response.data['initialPage'] + event_source = response.data['eventSource'] - client.stream(streamToken) + client.stream(event_source) ``` -You can also pass a query that produces a stream token directly to +You can also pass a query that produces an event source directly to ``stream()``: ```python - query = fql('Product.all().changesOn(.price, .stock)') + query = fql('Product.all().eventsOn(.price, .stock)') client.stream(query) ``` @@ -492,18 +488,18 @@ You can also pass a query that produces a stream token directly to use a generator expression to iterate through the events: ```python -query = fql('Product.all().changesOn(.price, .stock)') +query = fql('Product.all().eventsOn(.price, .stock)') with client.stream(query) as stream: for event in stream: - eventType = event['type'] - if (eventType == 'add'): + event_type = event['type'] + if (event_type == 'add'): print('Add event: ', event) ## ... - elif (eventType == 'update'): + elif (event_type == 'update'): print('Update event: ', event) ## ... - elif (eventType == 'remove'): + elif (event_type == 'remove'): print('Remove event: ', event) ## ... ``` @@ -513,7 +509,7 @@ with client.stream(query) as stream: Use ``close()`` to close a stream: ```python -query = fql('Product.all().changesOn(.price, .stock)') +query = fql('Product.all().eventsOn(.price, .stock)') count = 0 with client.stream(query) as stream: @@ -540,7 +536,7 @@ client = Client() try: with client.stream(fql( - 'Product.all().changesOn(.price, .stock)' + 'Product.all().eventsOn(.price, .stock)' )) as stream: for event in stream: print(event) @@ -565,7 +561,7 @@ options = StreamOptions( status_events=False, ) -client.stream(fql('Product.all().toStream()'), options) +client.stream(fql('Product.all().eventSource()'), options) ``` ## Setup diff --git a/fauna/client/__init__.py b/fauna/client/__init__.py index 9770dd26..4f6e24a6 100644 --- a/fauna/client/__init__.py +++ b/fauna/client/__init__.py @@ -1,3 +1,3 @@ -from .client import Client, QueryOptions, StreamOptions, ChangeFeedOptions +from .client import Client, QueryOptions, StreamOptions, FeedOptions, FeedPage, FeedIterator from .endpoints import Endpoints from .headers import Header diff --git a/fauna/client/client.py b/fauna/client/client.py index fbf81d13..8fc5024e 100644 --- a/fauna/client/client.py +++ b/fauna/client/client.py @@ -70,7 +70,7 @@ class StreamOptions: @dataclass -class ChangeFeedOptions: +class FeedOptions: """ A dataclass representing options available for an Event Feed. @@ -425,7 +425,7 @@ def stream( """ Opens a Stream in Fauna and returns an iterator that consume Fauna events. - :param fql: A Query that returns a EventSource or a EventSource. + :param fql: An EventSource or a Query that returns an EventSource. :param opts: (Optional) Stream Options. :return: a :class:`StreamIterator` @@ -443,12 +443,12 @@ def stream( raise ClientError( "The 'cursor' configuration can only be used with an event source.") - token = self.query(fql).data + source = self.query(fql).data else: - token = fql + source = fql - if not isinstance(token, EventSource): - err_msg = f"'fql' must be a EventSource, or a Query that returns a EventSource but was a {type(token)}." + if not isinstance(source, EventSource): + err_msg = f"'fql' must be an EventSource, or a Query that returns an EventSource but was a {type(source)}." raise TypeError(err_msg) headers = self._headers.copy() @@ -456,20 +456,20 @@ def stream( headers[_Header.Authorization] = self._auth.bearer() return StreamIterator(self._session, headers, self._endpoint + "/stream/1", - self._max_attempts, self._max_backoff, opts, token) + self._max_attempts, self._max_backoff, opts, source) - def change_feed( + def feed( self, - fql: Union[EventSource, Query], - opts: ChangeFeedOptions = ChangeFeedOptions() - ) -> "ChangeFeedIterator": + source: Union[EventSource, Query], + opts: FeedOptions = FeedOptions(), + ) -> "FeedIterator": """ Opens an Event Feed in Fauna and returns an iterator that consume Fauna events. - :param fql: A Query that returns a EventSource or a EventSource. + :param source: An EventSource or a Query that returns an EventSource. :param opts: (Optional) Event Feed options. - :return: a :class:`ChangeFeedIterator` + :return: a :class:`FeedIterator` :raises ClientError: Invalid options provided :raises NetworkError: HTTP Request failed in transit @@ -479,13 +479,11 @@ def change_feed( :raises TypeError: Invalid param types """ - if isinstance(fql, Query): - token = self.query(fql).data - else: - token = fql + if isinstance(source, Query): + source = self.query(source).data - if not isinstance(token, EventSource): - err_msg = f"'fql' must be a EventSource, or a Query that returns a EventSource but was a {type(token)}." + if not isinstance(source, EventSource): + err_msg = f"'source' must be an EventSource, or a Query that returns an EventSource but was a {type(source)}." raise TypeError(err_msg) headers = self._headers.copy() @@ -498,10 +496,8 @@ def change_feed( elif self._query_timeout_ms is not None: headers[Header.QueryTimeoutMs] = str(self._query_timeout_ms) - return ChangeFeedIterator(self._session, headers, - self._endpoint + "/changefeed/1", - self._max_attempts, self._max_backoff, opts, - token) + return FeedIterator(self._session, headers, self._endpoint + "/feed/1", + self._max_attempts, self._max_backoff, opts, source) def _check_protocol(self, response_json: Any, status_code): # TODO: Logic to validate wire protocol belongs elsewhere. @@ -541,14 +537,14 @@ class StreamIterator: def __init__(self, http_client: HTTPClient, headers: Dict[str, str], endpoint: str, max_attempts: int, max_backoff: int, - opts: StreamOptions, token: EventSource): + opts: StreamOptions, source: EventSource): self._http_client = http_client self._headers = headers self._endpoint = endpoint self._max_attempts = max_attempts self._max_backoff = max_backoff self._opts = opts - self._token = token + self._source = source self._stream = None self.last_ts = None self.last_cursor = None @@ -627,7 +623,7 @@ def _retry_stream(self): raise RetryableFaunaException def _create_stream(self): - data: Dict[str, Any] = {"token": self._token.token} + data: Dict[str, Any] = {"token": self._source.token} if self.last_cursor is not None: data["cursor"] = self.last_cursor elif self._opts.cursor is not None: @@ -643,7 +639,7 @@ def close(self): self._stream.close() -class ChangeFeedPage: +class FeedPage: def __init__(self, events: List[Any], cursor: str, stats: QueryStats): self._events = events @@ -660,22 +656,22 @@ def __iter__(self) -> Iterator[Any]: yield event -class ChangeFeedIterator: +class FeedIterator: """A class to provide an iterator on top of Event Feed pages.""" def __init__(self, http: HTTPClient, headers: Dict[str, str], endpoint: str, - max_attempts: int, max_backoff: int, opts: ChangeFeedOptions, - token: EventSource): + max_attempts: int, max_backoff: int, opts: FeedOptions, + source: EventSource): self._http = http self._headers = headers self._endpoint = endpoint self._max_attempts = opts.max_attempts or max_attempts self._max_backoff = opts.max_backoff or max_backoff - self._request: Dict[str, Any] = {"token": token.token} + self._request: Dict[str, Any] = {"token": source.token} self._is_done = False if opts.start_ts is not None and opts.cursor is not None: - err_msg = "Only one of 'start_ts' or 'cursor' can be defined in the ChangeFeedOptions." + err_msg = "Only one of 'start_ts' or 'cursor' can be defined in the FeedOptions." raise TypeError(err_msg) if opts.page_size is not None: @@ -686,11 +682,11 @@ def __init__(self, http: HTTPClient, headers: Dict[str, str], endpoint: str, elif opts.start_ts is not None: self._request["start_ts"] = opts.start_ts - def __iter__(self) -> Iterator[ChangeFeedPage]: + def __iter__(self) -> Iterator[FeedPage]: self._is_done = False return self - def __next__(self) -> ChangeFeedPage: + def __next__(self) -> FeedPage: if self._is_done: raise StopIteration @@ -698,7 +694,7 @@ def __next__(self) -> ChangeFeedPage: self._next_page) return retryable.run().response - def _next_page(self) -> ChangeFeedPage: + def _next_page(self) -> FeedPage: with self._http.request( method="POST", url=self._endpoint, @@ -717,8 +713,8 @@ def _next_page(self) -> ChangeFeedPage: if "start_ts" in self._request: del self._request["start_ts"] - return ChangeFeedPage(decoded["events"], decoded["cursor"], - QueryStats(decoded["stats"])) + return FeedPage(decoded["events"], decoded["cursor"], + QueryStats(decoded["stats"])) def flatten(self) -> Iterator: """A generator that yields events instead of pages of events.""" diff --git a/fauna/encoding/decoder.py b/fauna/encoding/decoder.py index 6c485b68..c02bc6e8 100644 --- a/fauna/encoding/decoder.py +++ b/fauna/encoding/decoder.py @@ -64,7 +64,7 @@ def decode(obj: Any): - { "@ref": ... } decodes to a DocumentReference or NamedDocumentReference - { "@mod": ... } decodes to a Module - { "@set": ... } decodes to a Page - - { "@stream": ... } decodes to a EventSource + - { "@stream": ... } decodes to an EventSource - { "@bytes": ... } decodes to a bytearray :param obj: the object to decode diff --git a/fauna/query/models.py b/fauna/query/models.py index 57ad7443..e47148c9 100644 --- a/fauna/query/models.py +++ b/fauna/query/models.py @@ -13,11 +13,11 @@ def __getattr__(name): DeprecationWarning, stacklevel=2) return EventSource - return super.__getattr__(name) + return super.__getattr__(name) # pyright: ignore def __dir__(): - return super.__dir__() + "StreamToken" + return super.__dir__() + "StreamToken" # pyright: ignore class Page: diff --git a/tests/integration/test_change_feeds.py b/tests/integration/test_feeds.py similarity index 69% rename from tests/integration/test_change_feeds.py rename to tests/integration/test_feeds.py index 66532c03..f8cb4f2c 100644 --- a/tests/integration/test_change_feeds.py +++ b/tests/integration/test_feeds.py @@ -2,26 +2,26 @@ import pytest from fauna import fql -from fauna.client import ChangeFeedOptions +from fauna.client import FeedOptions from fauna.errors import AbortError -def test_change_feed_requires_stream(client, a_collection): +def test_feed_requires_stream(client, a_collection): with pytest.raises( TypeError, - match="'fql' must be a EventSource, or a Query that returns a EventSource but was a ." + match="'source' must be an EventSource, or a Query that returns an EventSource but was a ." ): - client.change_feed(fql("42")) + client.feed(fql("42")) -def test_change_feed_query(client, a_collection): - feed = client.change_feed(fql("${col}.all().toStream()", col=a_collection)) +def test_feed_query(client, a_collection): + feed = client.feed(fql("${col}.all().toStream()", col=a_collection)) _pull_one_event(client, a_collection, feed) -def test_change_feed_event_source(client, a_collection): +def test_feed_event_source(client, a_collection): source = client.query(fql("${col}.all().toStream()", col=a_collection)).data - feed = client.change_feed(source) + feed = client.feed(source) _pull_one_event(client, a_collection, feed) @@ -37,8 +37,8 @@ def _pull_one_event(client, col, feed): assert events[0]['data']['foo'] == 'bar' -def test_change_feeds_error_event(client, a_collection): - feed = client.change_feed( +def test_feeds_error_event(client, a_collection): + feed = client.feed( fql("${col}.all().map(_ => abort('oops')).toStream()", col=a_collection)) client.query(fql("${col}.create({ foo: 'bar' })", col=a_collection)) @@ -48,8 +48,8 @@ def test_change_feeds_error_event(client, a_collection): @pytest.mark.xfail(reason="pending core support") -def test_change_feeds_continue_after_an_error(client, a_collection): - feed = client.change_feed( +def test_feeds_continue_after_an_error(client, a_collection): + feed = client.feed( fql(''' ${col} .all() @@ -81,7 +81,7 @@ def test_change_feeds_continue_after_an_error(client, a_collection): assert events == [0, 2] -def test_change_feed_start_ts(client, a_collection): +def test_feed_start_ts(client, a_collection): source = client.query( fql("${col}.all().map(.n).toStream()", col=a_collection)).data @@ -91,15 +91,15 @@ def test_change_feed_start_ts(client, a_collection): # NB. Use a short page size to ensure that more than one roundtrip is made, # thus testing the interator's internal cursoring is correct. - first = next(client.change_feed(source).flatten()) - opts = ChangeFeedOptions(start_ts=first['txn_ts'], page_size=5) - feed = client.change_feed(source, opts) + first = next(client.feed(source).flatten()) + opts = FeedOptions(start_ts=first['txn_ts'], page_size=5) + feed = client.feed(source, opts) nums = [event['data'] for event in feed.flatten()] assert nums == list(range(1, 64)) -def test_change_feed_cursor(client, a_collection): +def test_feed_cursor(client, a_collection): source = client.query( fql("${col}.all().map(.n).toStream()", col=a_collection)).data @@ -107,9 +107,9 @@ def test_change_feed_cursor(client, a_collection): # NB. Use a short page size to ensure that more than one roundtrip is made, # thus testing the interator's internal cursoring is correct. - first = next(client.change_feed(source).flatten()) - opts = ChangeFeedOptions(cursor=first['cursor'], page_size=5) - feed = client.change_feed(source, opts) + first = next(client.feed(source).flatten()) + opts = FeedOptions(cursor=first['cursor'], page_size=5) + feed = client.feed(source, opts) nums = [event['data'] for event in feed.flatten()] assert nums == list(range(1, 64)) @@ -122,13 +122,12 @@ def test_rejects_when_both_start_ts_and_cursor_provided(scoped_client): source = response.data with pytest.raises(TypeError): - opts = ChangeFeedOptions(cursor="abc1234==", start_ts=response.txn_ts) - scoped_client.change_feed(source, opts) + opts = FeedOptions(cursor="abc1234==", start_ts=response.txn_ts) + scoped_client.feed(source, opts) -def test_change_feed_reusable_iterator(client, a_collection): - feed = client.change_feed( - fql("${col}.all().map(.n).toStream()", col=a_collection)) +def test_feed_reusable_iterator(client, a_collection): + feed = client.feed(fql("${col}.all().map(.n).toStream()", col=a_collection)) _create_docs(client, a_collection, 0, 5) nums = [event['data'] for event in feed.flatten()]