diff --git a/README.md b/README.md index d7993b1c..6238e2bd 100644 --- a/README.md +++ b/README.md @@ -285,138 +285,14 @@ for products in pages: print(products) ``` -## Event Streaming - -The driver supports [Event Streaming](https://docs.fauna.com/fauna/current/learn/streaming). - -### Start a stream - -To get a stream token, append ``toStream()`` or ``changesOn()`` to a set from a -[supported source](https://docs.fauna.com/fauna/current/reference/streaming_reference/#supported-sources). - - -To start and subscribe to the stream, pass the stream token to -``stream()``: - -```python - from fauna import fql - from fauna.client import Client - - client = Client() - - response = client.query(fql(''' - let set = Product.all() - { - initialPage: set.pageSize(10), - streamToken: set.toStream() - } - ''')) - - initialPage = response.data['initialPage'] - streamToken = response.data['streamToken'] - - client.stream(streamToken) -``` +## Event Feeds (beta) -You can also pass a query that produces a stream token directly to -``stream()``: - -```python - query = fql('Product.all().changesOn(.price, .stock)') - - client.stream(query) -``` - -### Iterate on a stream - -``stream()`` returns an iterator that emits events as they occur. You can -use a generator expression to iterate through the events: - -```python -query = fql('Product.all().changesOn(.price, .stock)') +The driver supports [Event Feeds](https://docs.fauna.com/fauna/current/learn/track-changes/streaming/#event-feeds). -with client.stream(query) as stream: - for event in stream: - eventType = event['type'] - if (eventType == 'add'): - print('Add event: ', event) - ## ... - elif (eventType == 'update'): - print('Update event: ', event) - ## ... - elif (eventType == 'remove'): - print('Remove event: ', event) - ## ... -``` - -### Close a stream +### Request an Event Feed -Use ``close()`` to close a stream: - -```python -query = fql('Product.all().changesOn(.price, .stock)') - -count = 0 -with client.stream(query) as stream: - for event in stream: - print('Stream event', event) - # ... - count+=1 - - if (count == 2): - stream.close() -``` - -### Error handling - -If a non-retryable error occurs when opening or processing a stream, Fauna -raises a ``FaunaException``: - -```python -from fauna import fql -from fauna.client import Client -from fauna.errors import FaunaException - -client = Client() - -try: - with client.stream(fql( - 'Product.all().changesOn(.price, .stock)' - )) as stream: - for event in stream: - print(event) - # ... -except FaunaException as e: - print('error ocurred with stream: ', e) -``` - -### Stream options - -The client configuration sets default options for the ``stream()`` -method. - -You can pass a ``StreamOptions`` object to override these defaults: - -```python -options = StreamOptions( - max_attempts=3, - max_backoff=20, - start_ts=None, - cursor=None, - status_events=False, - ) - -client.stream(fql('Product.all().toStream()'), options) -``` - -## Change Feeds (beta) - -The driver supports [Change Feeds](https://docs.fauna.com/fauna/current/learn/track-changes/streaming/#change-feeds). - -### Request a Change Feed - -A Change Feed asynchronously polls an [event stream](https://docs.fauna.com/fauna/current/learn/streaming), -represented by a stream token, for events. +An Event Feed asynchronously polls an [event stream](https://docs.fauna.com/fauna/current/learn/streaming), +represented by a stream token, for events. To get a stream token, append ``toStream()`` or ``changesOn()`` to a set from a [supported source](https://docs.fauna.com/fauna/current/reference/streaming_reference/#supported-sources). @@ -453,7 +329,7 @@ You can also pass a query that produces a stream token directly to client.change_feed(query) ``` -### Iterate on a Change Feed +### Iterate on an Event Feed ``change_feed()`` returns an iterator that emits pages of events. You can use a generator expression to iterate through the pages: @@ -464,7 +340,7 @@ generator expression to iterate through the pages: for page in feed: print('Page stats: ', page.stats) - + for event in page: eventType = event['type'] if (eventType == 'add'): @@ -490,20 +366,20 @@ Alternatively, you can iterate through events instead of pages with ## ... ``` -The change feed iterator stops when there are no more events to poll. +The Event Feed iterator stops when there are no more events to poll. ### Error handling -If a non-retryable error occurs when opening or processing a change feed, Fauna +If a non-retryable error occurs when opening or processing an Event Feed, Fauna raises a ``FaunaException``: ```python from fauna import fql from fauna.client import Client from fauna.errors import FaunaException - + client = Client() - + try: feed = client.change_feed(fql( 'Product.all().changesOn(.price, .stock)' @@ -512,7 +388,7 @@ raises a ``FaunaException``: print(event) # ... except FaunaException as e: - print('error ocurred with change feed: ', e) + print('error ocurred with event feed: ', e) ``` Errors can be raised at two different places: @@ -527,15 +403,15 @@ processing. For example: from fauna import fql from fauna.client import Client from fauna.errors import FaunaException - + client = Client() - + # Imagine if there are some products with details = null. # The ones without details will fail due to the toUpperCase call. feed = client.change_feed(fql( 'Product.all().map(.details.toUpperCase()).toStream()' )) - + for page in feed: try: for event in page: @@ -548,7 +424,7 @@ processing. For example: print('error ocurred with event processing: ', e) ``` -### Change Feed options +### Event Feed options The client configuration sets default options for the ``change_feed()`` method. @@ -568,6 +444,130 @@ options = ChangeFeedOptions( client.change_feed(fql('Product.all().toStream()'), options) ``` +## Event Streaming + +The driver supports [Event Streaming](https://docs.fauna.com/fauna/current/learn/streaming). + +### Start a stream + +To get a stream token, append ``toStream()`` or ``changesOn()`` to a set from a +[supported source](https://docs.fauna.com/fauna/current/reference/streaming_reference/#supported-sources). + + +To start and subscribe to the stream, pass the stream token to +``stream()``: + +```python + from fauna import fql + from fauna.client import Client + + client = Client() + + response = client.query(fql(''' + let set = Product.all() + { + initialPage: set.pageSize(10), + streamToken: set.toStream() + } + ''')) + + initialPage = response.data['initialPage'] + streamToken = response.data['streamToken'] + + client.stream(streamToken) +``` + +You can also pass a query that produces a stream token directly to +``stream()``: + +```python + query = fql('Product.all().changesOn(.price, .stock)') + + client.stream(query) +``` + +### Iterate on a stream + +``stream()`` returns an iterator that emits events as they occur. You can +use a generator expression to iterate through the events: + +```python +query = fql('Product.all().changesOn(.price, .stock)') + +with client.stream(query) as stream: + for event in stream: + eventType = event['type'] + if (eventType == 'add'): + print('Add event: ', event) + ## ... + elif (eventType == 'update'): + print('Update event: ', event) + ## ... + elif (eventType == 'remove'): + print('Remove event: ', event) + ## ... +``` + +### Close a stream + +Use ``close()`` to close a stream: + +```python +query = fql('Product.all().changesOn(.price, .stock)') + +count = 0 +with client.stream(query) as stream: + for event in stream: + print('Stream event', event) + # ... + count+=1 + + if (count == 2): + stream.close() +``` + +### Error handling + +If a non-retryable error occurs when opening or processing a stream, Fauna +raises a ``FaunaException``: + +```python +from fauna import fql +from fauna.client import Client +from fauna.errors import FaunaException + +client = Client() + +try: + with client.stream(fql( + 'Product.all().changesOn(.price, .stock)' + )) as stream: + for event in stream: + print(event) + # ... +except FaunaException as e: + print('error ocurred with stream: ', e) +``` + +### Stream options + +The client configuration sets default options for the ``stream()`` +method. + +You can pass a ``StreamOptions`` object to override these defaults: + +```python +options = StreamOptions( + max_attempts=3, + max_backoff=20, + start_ts=None, + cursor=None, + status_events=False, + ) + +client.stream(fql('Product.all().toStream()'), options) +``` + ## Setup ```bash diff --git a/fauna/client/client.py b/fauna/client/client.py index d1cac5c8..bbdbf41a 100644 --- a/fauna/client/client.py +++ b/fauna/client/client.py @@ -73,12 +73,12 @@ class StreamOptions: @dataclass class ChangeFeedOptions: """ - A dataclass representing options available for a change feed. + A dataclass representing options available for an Event Feed. - * max_attempts - The maximum number of times to attempt a change feed query when a retryable exception is thrown. + * max_attempts - The maximum number of times to attempt an Event Feed query when a retryable exception is thrown. * max_backoff - The maximum backoff in seconds for an individual retry. * query_timeout - Controls the maximum amount of time Fauna will execute a query before returning a page of events. - * start_ts - The starting timestamp of the change feed, exclusive. If set, Fauna will return events starting after + * start_ts - The starting timestamp of the Event Feed, exclusive. If set, Fauna will return events starting after the timestamp. * cursor - The starting event cursor, exclusive. If set, Fauna will return events starting after the cursor. * page_size - The desired number of events per page. @@ -465,10 +465,10 @@ def change_feed( opts: ChangeFeedOptions = ChangeFeedOptions() ) -> "ChangeFeedIterator": """ - Opens a change feed in Fauna and returns an iterator that consume Fauna events. + Opens an Event Feed in Fauna and returns an iterator that consume Fauna events. :param fql: A Query that returns a StreamToken or a StreamToken. - :param opts: (Optional) Change feed options. + :param opts: (Optional) Event Feed options. :return: a :class:`ChangeFeedIterator` @@ -658,7 +658,7 @@ def __iter__(self) -> Iterator[Any]: class ChangeFeedIterator: - """A class to provide an iterator on top of change feed pages.""" + """A class to provide an iterator on top of Event Feed pages.""" def __init__(self, http: HTTPClient, headers: Dict[str, str], endpoint: str, max_attempts: int, max_backoff: int, opts: ChangeFeedOptions,