From 987b21e916d494b8a255d8fa81d34e15d2b10fa9 Mon Sep 17 00:00:00 2001 From: Erick Pintor Date: Mon, 2 Sep 2024 12:48:07 -0300 Subject: [PATCH] ENG-6679: Introduce support for change feeds --- README.md | 126 ++++++++++++++++++++++ docker/feature-flags.json | 6 +- fauna/client/__init__.py | 2 +- fauna/client/client.py | 142 ++++++++++++++++++++++++- tests/integration/test_change_feeds.py | 112 +++++++++++++++++++ 5 files changed, 384 insertions(+), 4 deletions(-) create mode 100644 tests/integration/test_change_feeds.py diff --git a/README.md b/README.md index 2bfe4889..a726967d 100644 --- a/README.md +++ b/README.md @@ -389,6 +389,7 @@ try: except FaunaException as e: print('error ocurred with stream: ', e) ``` + ### Stream options The client configuration sets default options for the ``stream()`` @@ -408,6 +409,131 @@ options = StreamOptions( client.stream(fql('Product.all().toStream()'), options) ``` +## Change Feeds + + + +The driver supports Changefeeds. + +### Start a changefeed + +Changefeeds work by interactively polling from a [Fauna Stream](https://docs.fauna.com/fauna/current/learn/streaming). +To get a stream token, append ``toStream()`` or ``changesOn()`` to a set from a +[supported source](https://docs.fauna.com/fauna/current/reference/streaming_reference/#supported-sources). + +To create a changefeed, pass the stream token to ``change_feed()`` method: + +```python + from fauna import fql + from fauna.client import Client + + client = Client() + + response = client.query(fql(''' + let set = Product.all() + { + initialPage: set.pageSize(10), + streamToken: set.toStream() + } + ''')) + + initialPage = response.data['initialPage'] + streamToken = response.data['streamToken'] + + client.change_feed(streamToken) +``` + +You can also pass a query that produces a stream token directly to +``change_feed()``: + +```python + query = fql('Product.all().changesOn(.price, .quantity)') + + client.change_feed(query) +``` + +### Iterate on a changefeed + +``change_feed()`` returns an iterator that emits pates of events on every +iteration. You can use a generator expression to iterate through the pages: + +```python +query = fql('Product.all().changesOn(.price, .quantity)') + +with client.change_feed(query) as feed: + for page in feed: + print('Page stats: ', page.stats) + + for event in page.events: + eventType = event['type'] + if (eventType == 'add'): + print('Add event: ', event) + ## ... + elif (eventType == 'update'): + print('Update event: ', event) + ## ... + elif (eventType == 'remove'): + print('Remove event: ', event) + ## ... +``` + +Alternatively, you can iterate through events instead of pages with +``flatten()``: + +```python +query = fql('Product.all().changesOn(.price, .quantity)') + +with client.change_feed(query) as feed: + for event in feed.flatten(): + eventType = event['type'] + ## ... +``` + +The changefeed iterator will stop once there are no more events to poll. + +### Error handling + +If a non-retryable error occurs when opening or processing a changefeed, Fauna +raises a ``FaunaException``: + +```python +from fauna import fql +from fauna.client import Client +from fauna.errors import FaunaException + +client = Client() + +try: + with client.change_feed(fql( + 'Product.all().changesOn(.price, .quantity)' + )) as feed: + for event in feed.flatten(): + print(event) + # ... +except FaunaException as e: + print('error ocurred with stream: ', e) +``` + +### Changefeed options + +The client configuration sets default options for the ``change_feed()`` +method. + +You can pass a ``ChangeFeedOptions`` object to override these defaults: + +```python +options = ChangeFeedOptions( + max_attempts=3, + max_backoff=20, + query_timeout=timedelta(seconds=5), + page_size=None, + cursor=None, + start_ts=None, + ) + +client.change_feed(fql('Product.all().toStream()'), options) +``` + ## Setup ```bash diff --git a/docker/feature-flags.json b/docker/feature-flags.json index 2fc48a0a..739da189 100644 --- a/docker/feature-flags.json +++ b/docker/feature-flags.json @@ -9,7 +9,8 @@ "fqlx_typecheck_default": true, "persisted_fields": true, "changes_by_collection_index": true, - "fql2_streams": true + "fql2_streams": true, + "change_feeds": true } }, { @@ -20,7 +21,8 @@ "fqlx_typecheck_default": true, "persisted_fields": true, "changes_by_collection_index": true, - "fql2_streams": true + "fql2_streams": true, + "change_feeds": true } } ] diff --git a/fauna/client/__init__.py b/fauna/client/__init__.py index 73ad357a..9770dd26 100644 --- a/fauna/client/__init__.py +++ b/fauna/client/__init__.py @@ -1,3 +1,3 @@ -from .client import Client, QueryOptions, StreamOptions +from .client import Client, QueryOptions, StreamOptions, ChangeFeedOptions from .endpoints import Endpoints from .headers import Header diff --git a/fauna/client/client.py b/fauna/client/client.py index b7044bd0..6be2448e 100644 --- a/fauna/client/client.py +++ b/fauna/client/client.py @@ -1,6 +1,6 @@ from dataclasses import dataclass from datetime import timedelta -from typing import Any, Dict, Iterator, Mapping, Optional, Union +from typing import Any, Dict, Iterator, Mapping, Optional, Union, List import fauna from fauna.client.headers import _DriverEnvironment, _Header, _Auth, Header @@ -70,6 +70,27 @@ class StreamOptions: status_events: bool = False +@dataclass +class ChangeFeedOptions: + """ + A dataclass representing options available for a change feed. + + * max_attempts - The maximum number of times to attempt a change feed query when a retryable exception is thrown. + * max_backoff - The maximum backoff in seconds for an individual retry. + * query_timeout - Controls the maximum amount of time Fauna will execute a query before returning a page of events. + * start_ts - The starting timestamp of the change feed, exclusive. If set, Fauna will return events starting after + the timestamp. + * cursor - The starting event cursor, exclusive. If set, Fauna will return events starting after the cursor. + * page_size - The desired number of events per page. + """ + max_attempts: Optional[int] = None + max_backoff: Optional[int] = None + query_timeout: Optional[timedelta] = None + page_size: Optional[int] = None + start_ts: Optional[int] = None + cursor: Optional[str] = None + + class Client: def __init__( @@ -438,6 +459,49 @@ def stream( return StreamIterator(self._session, headers, self._endpoint + "/stream/1", self._max_attempts, self._max_backoff, opts, token) + def change_feed( + self, + fql: Union[StreamToken, Query], + opts: ChangeFeedOptions = ChangeFeedOptions() + ) -> "ChangeFeedIterator": + """ + Opens a change feed in Fauna and returns an iterator that consume Fauna events. + + :param fql: A Query that returns a StreamToken or a StreamToken. + :param opts: (Optional) Change feed options. + + :return: a :class:`ChangeFeedIterator` + + :raises ClientError: Invalid options provided + :raises NetworkError: HTTP Request failed in transit + :raises ProtocolError: HTTP error not from Fauna + :raises ServiceError: Fauna returned an error + :raises ValueError: Encoding and decoding errors + :raises TypeError: Invalid param types + """ + + if isinstance(fql, Query): + token = self.query(fql).data + else: + token = fql + + if not isinstance(token, StreamToken): + err_msg = f"'fql' must be a StreamToken, or a Query that returns a StreamToken but was a {type(token)}." + raise TypeError(err_msg) + + headers = self._headers.copy() + headers[_Header.Format] = "tagged" + headers[_Header.Authorization] = self._auth.bearer() + + if opts.query_timeout is not None: + query_timeout_ms = int(opts.query_timeout.total_seconds() * 1000) + headers[Header.QueryTimeoutMs] = str(query_timeout_ms) + elif self._query_timeout_ms is not None: + headers[Header.QueryTimeoutMs] = str(self._query_timeout_ms) + + return ChangeFeedIterator(self._session, headers, self._endpoint + "/changefeed/1", + self._max_attempts, self._max_backoff, opts, token) + def _check_protocol(self, response_json: Any, status_code): # TODO: Logic to validate wire protocol belongs elsewhere. should_raise = False @@ -574,6 +638,82 @@ def close(self): self._stream.close() +@dataclass +class ChangeFeedPage: + events: List[Any] + cursor: str + stats: QueryStats + + def __iter__(self) -> Iterator[Any]: + return iter(self.events) + + +class ChangeFeedIterator: + """A class to provide an iterator on top of change feed pages.""" + + def __init__(self, http: HTTPClient, headers: Dict[str, str], + endpoint: str, max_attempts: int, max_backoff: int, + opts: ChangeFeedOptions, token: StreamToken): + self._http = http + self._headers = headers + self._endpoint = endpoint + self._max_attempts = opts.max_attempts or max_attempts + self._max_backoff = opts.max_backoff or max_backoff + self._request = {"token": token.token} + self._is_done = False + + if opts.page_size is not None: + self._request["page_size"] = opts.page_size + + if opts.cursor is not None: + self._request["cursor"] = opts.cursor + elif opts.start_ts is not None: + self._request["start_ts"] = opts.start_ts + + def __iter__(self) -> Iterator[ChangeFeedPage]: + self._is_done = False + return self + + def __next__(self) -> ChangeFeedPage: + if self._is_done: + raise StopIteration + + retryable = Retryable[Any](self._max_attempts, self._max_backoff, self._next_page) + return retryable.run().response + + def _next_page(self) -> ChangeFeedPage: + with self._http.request( + method="POST", + url=self._endpoint, + headers=self._headers, + data=self._request, + ) as response: + status_code = response.status_code() + decoded = FaunaDecoder.decode(response.json()) + + if status_code > 399: + FaunaError.parse_error_and_throw(decoded, status_code) + + self._is_done = not decoded["has_next"] + self._request["cursor"] = decoded["cursor"] + + if "start_ts" in self._request: + del self._request["start_ts"] + + for event in decoded["events"]: + if event["type"] == "error": + FaunaError.parse_error_and_throw(event, 400) + + return ChangeFeedPage(events = decoded["events"], + cursor = decoded["cursor"], + stats = QueryStats(decoded["stats"])) + + def flatten(self) -> Iterator: + """A generator that yields events instead of pages of events.""" + for page in self: + for event in page: + yield event + class QueryIterator: """A class to provider an iterator on top of Fauna queries.""" diff --git a/tests/integration/test_change_feeds.py b/tests/integration/test_change_feeds.py new file mode 100644 index 00000000..e39a2a49 --- /dev/null +++ b/tests/integration/test_change_feeds.py @@ -0,0 +1,112 @@ +import time +import pytest + +from fauna import fql +from fauna.client import ChangeFeedOptions +from fauna.errors import AbortError + + +def test_change_feed_requires_stream(client, a_collection): + with pytest.raises(TypeError, match="'fql' must be a StreamToken, or a Query that returns a StreamToken but was a ."): + client.change_feed(fql("42")) + + +def test_change_feed_query(client, a_collection): + feed = client.change_feed(fql("${col}.all().toStream()", col=a_collection)) + _pull_one_event(client, a_collection, feed) + + +def test_change_feed_token(client, a_collection): + token = client.query(fql("${col}.all().toStream()", col=a_collection)).data + feed = client.change_feed(token) + _pull_one_event(client, a_collection, feed) + + +def _pull_one_event(client, col, feed): + client.query(fql("${col}.create({ foo: 'bar' })", col=col)) + + pages = list(feed) + assert len(pages) == 1 + assert len(pages[0].events) == 1 + assert pages[0].events[0]['type'] == 'add' + assert pages[0].events[0]['data']['foo'] == 'bar' + + +def test_change_feeds_error_event(client, a_collection): + feed = client.change_feed(fql( + "${col}.all().map(_ => abort('oops')).toStream()", + col=a_collection + )) + + client.query(fql( + "${col}.create({ foo: 'bar' })", + col=a_collection + )) + + with pytest.raises(AbortError): + list(feed.flatten()) + + +def test_change_feed_start_ts(client, a_collection): + token = client.query(fql( + "${col}.all().map(.n).toStream()", + col=a_collection + )).data + + # NB. Issue separate queries to ensure they get different txn times. + _create_docs(client, a_collection, 0, 1) + _create_docs(client, a_collection, 1, 64) + + # NB. Use a short page size to ensure that more than one roundtrip is made, + # thus testing the interator's internal cursoring is correct. + first = next(client.change_feed(token).flatten()) + opts = ChangeFeedOptions(start_ts=first['txn_ts'], page_size = 5) + feed = client.change_feed(token, opts) + + nums = [event['data'] for event in feed.flatten()] + assert nums == list(range(1, 64)) + + +def test_change_feed_cursor(client, a_collection): + token = client.query(fql( + "${col}.all().map(.n).toStream()", + col=a_collection + )).data + + _create_docs(client, a_collection, 0, 64) + + # NB. Use a short page size to ensure that more than one roundtrip is made, + # thus testing the interator's internal cursoring is correct. + first = next(client.change_feed(token).flatten()) + opts = ChangeFeedOptions(cursor=first['cursor'], page_size = 5) + feed = client.change_feed(token, opts) + + nums = [event['data'] for event in feed.flatten()] + assert nums == list(range(1, 64)) + + +def test_change_feed_reusable_iterator(client, a_collection): + feed = client.change_feed(fql( + "${col}.all().map(.n).toStream()", + col=a_collection + )) + + _create_docs(client, a_collection, 0, 5) + nums = [event['data'] for event in feed.flatten()] + assert nums == list(range(0, 5)) + + _create_docs(client, a_collection, 5, 10) + nums = [event['data'] for event in feed.flatten()] + assert nums == list(range(5, 10)) + + +def _create_docs(client, col, start=0, end=1): + client.query(fql(''' + Set + .sequence(${start}, ${end}) + .forEach(n => ${col}.create({ n: n })) + ''', + start=start, + end=end, + col=col, + ))