diff --git a/README.md b/README.md index 2bfe4889..6c3c7a1f 100644 --- a/README.md +++ b/README.md @@ -389,6 +389,7 @@ try: except FaunaException as e: print('error ocurred with stream: ', e) ``` + ### Stream options The client configuration sets default options for the ``stream()`` @@ -408,6 +409,167 @@ options = StreamOptions( client.stream(fql('Product.all().toStream()'), options) ``` +## Change Feeds + + + +The driver supports Change Feeds. + +### Request a Change Feed + +A Change Feed asynchronously polls an [event stream](https://docs.fauna.com/fauna/current/learn/streaming), +represented by a stream token, for events. + +To get a stream token, append ``toStream()`` or ``changesOn()`` to a set from a +[supported source](https://docs.fauna.com/fauna/current/reference/streaming_reference/#supported-sources). + +To get paginated events for the stream, pass the stream token to +``change_feed()``: + +```python + from fauna import fql + from fauna.client import Client + + client = Client() + + response = client.query(fql(''' + let set = Product.all() + { + initialPage: set.pageSize(10), + streamToken: set.toStream() + } + ''')) + + initialPage = response.data['initialPage'] + streamToken = response.data['streamToken'] + + client.change_feed(streamToken) +``` + +You can also pass a query that produces a stream token directly to +``change_feed()``: + +```python + query = fql('Product.all().changesOn(.price, .quantity)') + + client.change_feed(query) +``` + +### Iterate on a Change Feed + +``change_feed()`` returns an iterator that emits pages of events. You can use a +generator expression to iterate through the pages: + +```python +query = fql('Product.all().changesOn(.price, .quantity)') + +with client.change_feed(query) as feed: + for page in feed: + print('Page stats: ', page.stats) + + for event in page: + eventType = event['type'] + if (eventType == 'add'): + print('Add event: ', event) + ## ... + elif (eventType == 'update'): + print('Update event: ', event) + ## ... + elif (eventType == 'remove'): + print('Remove event: ', event) + ## ... +``` + +Alternatively, you can iterate through events instead of pages with +``flatten()``: + +```python +query = fql('Product.all().changesOn(.price, .quantity)') + +with client.change_feed(query) as feed: + for event in feed.flatten(): + eventType = event['type'] + ## ... +``` + +The change feed iterator stops when there are no more events to poll. + +### Error handling + +If a non-retryable error occurs when opening or processing a change feed, Fauna +raises a ``FaunaException``: + +```python +from fauna import fql +from fauna.client import Client +from fauna.errors import FaunaException + +client = Client() + +try: + feed = client.change_feed(fql( + 'Product.all().changesOn(.price, .quantity)' + )) + for event in feed.flatten(): + print(event) + # ... +except FaunaException as e: + print('error ocurred with change feed: ', e) +``` + +Errors can be raised at two different places: + +1. At the ``change_feed`` method call; +2. At the page iteration. + +This distinction allows for users to ignore errors originating from event +processing. For example: + +```python +from fauna import fql +from fauna.client import Client +from fauna.errors import FaunaException + +client = Client() + +# Imagine if there are some products with details = null. +# The ones without details will fail due to the toUpperCase call. +feed = client.change_feed(fql( + 'Product.all().map(.details.toUpperCase()).toStream()' +)) + +for page in feed: + try: + for event in page: + print(event) + # ... + except FaunaException as e: + # Pages will stop at the first error encountered. + # Therefore, its safe to handle an event failures + # and then pull more pages. + print('error ocurred with event processing: ', e) +``` + +### Change Feed options + +The client configuration sets default options for the ``change_feed()`` +method. + +You can pass a ``ChangeFeedOptions`` object to override these defaults: + +```python +options = ChangeFeedOptions( + max_attempts=3, + max_backoff=20, + query_timeout=timedelta(seconds=5), + page_size=None, + cursor=None, + start_ts=None, + ) + +client.change_feed(fql('Product.all().toStream()'), options) +``` + ## Setup ```bash diff --git a/docker/feature-flags.json b/docker/feature-flags.json index 2fc48a0a..739da189 100644 --- a/docker/feature-flags.json +++ b/docker/feature-flags.json @@ -9,7 +9,8 @@ "fqlx_typecheck_default": true, "persisted_fields": true, "changes_by_collection_index": true, - "fql2_streams": true + "fql2_streams": true, + "change_feeds": true } }, { @@ -20,7 +21,8 @@ "fqlx_typecheck_default": true, "persisted_fields": true, "changes_by_collection_index": true, - "fql2_streams": true + "fql2_streams": true, + "change_feeds": true } } ] diff --git a/fauna/client/__init__.py b/fauna/client/__init__.py index 73ad357a..9770dd26 100644 --- a/fauna/client/__init__.py +++ b/fauna/client/__init__.py @@ -1,3 +1,3 @@ -from .client import Client, QueryOptions, StreamOptions +from .client import Client, QueryOptions, StreamOptions, ChangeFeedOptions from .endpoints import Endpoints from .headers import Header diff --git a/fauna/client/client.py b/fauna/client/client.py index b7044bd0..5b31eedb 100644 --- a/fauna/client/client.py +++ b/fauna/client/client.py @@ -1,6 +1,6 @@ from dataclasses import dataclass from datetime import timedelta -from typing import Any, Dict, Iterator, Mapping, Optional, Union +from typing import Any, Dict, Iterator, Mapping, Optional, Union, List import fauna from fauna.client.headers import _DriverEnvironment, _Header, _Auth, Header @@ -70,6 +70,27 @@ class StreamOptions: status_events: bool = False +@dataclass +class ChangeFeedOptions: + """ + A dataclass representing options available for a change feed. + + * max_attempts - The maximum number of times to attempt a change feed query when a retryable exception is thrown. + * max_backoff - The maximum backoff in seconds for an individual retry. + * query_timeout - Controls the maximum amount of time Fauna will execute a query before returning a page of events. + * start_ts - The starting timestamp of the change feed, exclusive. If set, Fauna will return events starting after + the timestamp. + * cursor - The starting event cursor, exclusive. If set, Fauna will return events starting after the cursor. + * page_size - The desired number of events per page. + """ + max_attempts: Optional[int] = None + max_backoff: Optional[int] = None + query_timeout: Optional[timedelta] = None + page_size: Optional[int] = None + start_ts: Optional[int] = None + cursor: Optional[str] = None + + class Client: def __init__( @@ -438,6 +459,51 @@ def stream( return StreamIterator(self._session, headers, self._endpoint + "/stream/1", self._max_attempts, self._max_backoff, opts, token) + def change_feed( + self, + fql: Union[StreamToken, Query], + opts: ChangeFeedOptions = ChangeFeedOptions() + ) -> "ChangeFeedIterator": + """ + Opens a change feed in Fauna and returns an iterator that consume Fauna events. + + :param fql: A Query that returns a StreamToken or a StreamToken. + :param opts: (Optional) Change feed options. + + :return: a :class:`ChangeFeedIterator` + + :raises ClientError: Invalid options provided + :raises NetworkError: HTTP Request failed in transit + :raises ProtocolError: HTTP error not from Fauna + :raises ServiceError: Fauna returned an error + :raises ValueError: Encoding and decoding errors + :raises TypeError: Invalid param types + """ + + if isinstance(fql, Query): + token = self.query(fql).data + else: + token = fql + + if not isinstance(token, StreamToken): + err_msg = f"'fql' must be a StreamToken, or a Query that returns a StreamToken but was a {type(token)}." + raise TypeError(err_msg) + + headers = self._headers.copy() + headers[_Header.Format] = "tagged" + headers[_Header.Authorization] = self._auth.bearer() + + if opts.query_timeout is not None: + query_timeout_ms = int(opts.query_timeout.total_seconds() * 1000) + headers[Header.QueryTimeoutMs] = str(query_timeout_ms) + elif self._query_timeout_ms is not None: + headers[Header.QueryTimeoutMs] = str(self._query_timeout_ms) + + return ChangeFeedIterator(self._session, headers, + self._endpoint + "/changefeed/1", + self._max_attempts, self._max_backoff, opts, + token) + def _check_protocol(self, response_json: Any, status_code): # TODO: Logic to validate wire protocol belongs elsewhere. should_raise = False @@ -574,6 +640,86 @@ def close(self): self._stream.close() +class ChangeFeedPage: + + def __init__(self, events: List[Any], cursor: str, stats: QueryStats): + self._events = events + self.cursor = cursor + self.stats = stats + + def __len__(self): + return len(self._events) + + def __iter__(self) -> Iterator[Any]: + for event in self._events: + if event["type"] == "error": + FaunaError.parse_error_and_throw(event, 400) + yield event + + +class ChangeFeedIterator: + """A class to provide an iterator on top of change feed pages.""" + + def __init__(self, http: HTTPClient, headers: Dict[str, str], endpoint: str, + max_attempts: int, max_backoff: int, opts: ChangeFeedOptions, + token: StreamToken): + self._http = http + self._headers = headers + self._endpoint = endpoint + self._max_attempts = opts.max_attempts or max_attempts + self._max_backoff = opts.max_backoff or max_backoff + self._request = {"token": token.token} + self._is_done = False + + if opts.page_size is not None: + self._request["page_size"] = str(opts.page_size) + + if opts.cursor is not None: + self._request["cursor"] = opts.cursor + elif opts.start_ts is not None: + self._request["start_ts"] = str(opts.start_ts) + + def __iter__(self) -> Iterator[ChangeFeedPage]: + self._is_done = False + return self + + def __next__(self) -> ChangeFeedPage: + if self._is_done: + raise StopIteration + + retryable = Retryable[Any](self._max_attempts, self._max_backoff, + self._next_page) + return retryable.run().response + + def _next_page(self) -> ChangeFeedPage: + with self._http.request( + method="POST", + url=self._endpoint, + headers=self._headers, + data=self._request, + ) as response: + status_code = response.status_code() + decoded: Any = FaunaDecoder.decode(response.json()) + + if status_code > 399: + FaunaError.parse_error_and_throw(decoded, status_code) + + self._is_done = not decoded["has_next"] + self._request["cursor"] = decoded["cursor"] + + if "start_ts" in self._request: + del self._request["start_ts"] + + return ChangeFeedPage(decoded["events"], decoded["cursor"], + QueryStats(decoded["stats"])) + + def flatten(self) -> Iterator: + """A generator that yields events instead of pages of events.""" + for page in self: + for event in page: + yield event + + class QueryIterator: """A class to provider an iterator on top of Fauna queries.""" diff --git a/tests/integration/test_change_feeds.py b/tests/integration/test_change_feeds.py new file mode 100644 index 00000000..1056bd2e --- /dev/null +++ b/tests/integration/test_change_feeds.py @@ -0,0 +1,142 @@ +import time +import pytest + +from fauna import fql +from fauna.client import ChangeFeedOptions +from fauna.errors import AbortError + + +def test_change_feed_requires_stream(client, a_collection): + with pytest.raises( + TypeError, + match="'fql' must be a StreamToken, or a Query that returns a StreamToken but was a ." + ): + client.change_feed(fql("42")) + + +def test_change_feed_query(client, a_collection): + feed = client.change_feed(fql("${col}.all().toStream()", col=a_collection)) + _pull_one_event(client, a_collection, feed) + + +def test_change_feed_token(client, a_collection): + token = client.query(fql("${col}.all().toStream()", col=a_collection)).data + feed = client.change_feed(token) + _pull_one_event(client, a_collection, feed) + + +def _pull_one_event(client, col, feed): + client.query(fql("${col}.create({ foo: 'bar' })", col=col)) + + pages = list(feed) + assert len(pages) == 1 + assert len(pages[0]) == 1 + + events = list(pages[0]) + assert events[0]['type'] == 'add' + assert events[0]['data']['foo'] == 'bar' + + +def test_change_feeds_error_event(client, a_collection): + feed = client.change_feed( + fql("${col}.all().map(_ => abort('oops')).toStream()", col=a_collection)) + + client.query(fql("${col}.create({ foo: 'bar' })", col=a_collection)) + + with pytest.raises(AbortError): + list(feed.flatten()) + + +@pytest.mark.xfail(reason="pending core support") +def test_change_feeds_continue_after_an_error(client, a_collection): + feed = client.change_feed( + fql(''' + ${col} + .all() + .map(doc => + if (doc.n == 1) abort('oops') + else doc + ) + .toStream() + ''', + col=a_collection)) + + client.query( + fql(''' + Set + .sequence(0, 3) + .forEach(n => ${col}.create({ n: n })) + ''', + col=a_collection)) + + events = [] + + for page in feed: + try: + for event in page: + events.append(event['data']['n']) + except AbortError: + pass + + assert events == [0, 2] + + +def test_change_feed_start_ts(client, a_collection): + token = client.query( + fql("${col}.all().map(.n).toStream()", col=a_collection)).data + + # NB. Issue separate queries to ensure they get different txn times. + _create_docs(client, a_collection, 0, 1) + _create_docs(client, a_collection, 1, 64) + + # NB. Use a short page size to ensure that more than one roundtrip is made, + # thus testing the interator's internal cursoring is correct. + first = next(client.change_feed(token).flatten()) + opts = ChangeFeedOptions(start_ts=first['txn_ts'], page_size=5) + feed = client.change_feed(token, opts) + + nums = [event['data'] for event in feed.flatten()] + assert nums == list(range(1, 64)) + + +def test_change_feed_cursor(client, a_collection): + token = client.query( + fql("${col}.all().map(.n).toStream()", col=a_collection)).data + + _create_docs(client, a_collection, 0, 64) + + # NB. Use a short page size to ensure that more than one roundtrip is made, + # thus testing the interator's internal cursoring is correct. + first = next(client.change_feed(token).flatten()) + opts = ChangeFeedOptions(cursor=first['cursor'], page_size=5) + feed = client.change_feed(token, opts) + + nums = [event['data'] for event in feed.flatten()] + assert nums == list(range(1, 64)) + + +def test_change_feed_reusable_iterator(client, a_collection): + feed = client.change_feed( + fql("${col}.all().map(.n).toStream()", col=a_collection)) + + _create_docs(client, a_collection, 0, 5) + nums = [event['data'] for event in feed.flatten()] + assert nums == list(range(0, 5)) + + _create_docs(client, a_collection, 5, 10) + nums = [event['data'] for event in feed.flatten()] + assert nums == list(range(5, 10)) + + +def _create_docs(client, col, start=0, end=1): + client.query( + fql( + ''' + Set + .sequence(${start}, ${end}) + .forEach(n => ${col}.create({ n: n })) + ''', + start=start, + end=end, + col=col, + ))