diff --git a/README.md b/README.md index c05bf04f..370b154c 100644 --- a/README.md +++ b/README.md @@ -244,105 +244,95 @@ except ServiceError as e: emit_stats(e.stats) # more error handling... ``` -## Streaming -Below are examples on how to get started with streaming in the python driver. For more information on streaming capabilities visit our [Streaming Documentation](https://docs.fauna.com/fauna/current/reference/streaming_reference/) +## Event Streaming -There are two ways a stream can be initiated with the python driver: -1. Obtaining a stream token by first issuing a fql query that returns a stream token and providing that to the client's stream method. -2. Providing the stream method with a fql query that returns a stream token - 1. In this case the stream method will first issue a request to obtain the stream token and then start the stream. +The driver supports `Event Streaming `_. -_Using a Stream Token_ -```python -import fauna +_Start a stream_ -from fauna import fql -from fauna.client import Client, StreamOptions +To get a stream token, append ``toStream()`` or ``changesOn()`` to a set from a +`supported source +`_. -client = Client() -response = client.query(fql(''' + +To start and subscribe to the stream, pass the stream token to +``Client.stream()``: + +```python + from fauna import fql + from fauna.client import Client + + client = Client() + + response = client.query(fql(''' let set = Product.all() - { + { initialPage: set.pageSize(10), - streamToken: set.toStream() { name, price, category } + streamToken: set.toStream() } ''')) + + initialPage = response.data['initialPage'] + streamToken = response.data['streamToken'] -initialPage = response.data["initialPage"] -streamToken = response.data["streamToken"] - -with client.stream(streamToken) as stream: - for event in stream: - print(event["type"]) - print(event["data"]) - eventType = event["type"] - if (eventType == "add"): - print("add event") - ## handle add event - elif (eventType == "update"): - print("update event") - ## handle update event - elif (eventType == "remove"): - print("remove event") - ## handle remove event + client.stream(streamToken) ``` -_Using the client stream method directly_ +You can also pass a query that produces a stream token directly to +``Client.stream()``: ```python -import fauna + query = fql('Product.all().changesOn(.price, .quantity)') -from fauna import fql -from fauna.client import Client + client.stream(query) +``` -client = Client() +_Iterate on a stream_ -with client.stream(fql( - 'Product.all().where(.price > 50).toStream() { name, price, category }' -)) as stream: - for event in stream: - print(event["type"]) - print(event["data"]) -``` +``Client.stream()`` returns an iterator that emits events as they occur. You can +use a generator expression to iterate through the events: -_Stream events iterator_ +```python +query = fql('Product.all().changesOn(.price, .quantity)') -The stream method returns an iterator that can be used to provide the events as they happen. See above examples for using the iterator to process events. +with client.stream(query) as stream: + for event in stream: + eventType = event['type'] + if (eventType == 'add'): + print('Add event: ', event) + ## ... + elif (eventType == 'update'): + print('Update event: ', event) + ## ... + elif (eventType == 'remove'): + print('Remove event: ', event) + ## ... +``` _Close a stream_ -Use the `.close()` method to close a stream. +Use ``.close()`` to close a stream: ```python -import fauna - -from fauna import fql -from fauna.client import Client - -client = Client() - -events = [] +query = fql('Product.all().changesOn(.price, .quantity)') -with client.stream(fql( - 'Product.all().where(.price > 50).toStream() { name, price, category }' -)) as stream: +count = 0 +with client.stream(query) as stream: for event in stream: - print(event["type"]) - print(event["data"]) - - events.append(event) - # Close the stream after 2 events - if len(events) == 2: - print("Closing the stream ...") - stream.close() + print('Stream event', event) + # ... + count+=1 + + if (count == 2): + stream.close() ``` -_Error Handling_ +_Error handling_ -If a non retryable error occurs as part of stream processing, or when attempting to open a stream, a FaunaException will be raised. This can be handled as follows: -```python -import fauna +If a non-retryable error occurs when opening or processing a stream, Fauna +raises a ``FaunaException``: +```python from fauna import fql from fauna.client import Client from fauna.errors import FaunaException @@ -351,13 +341,30 @@ client = Client() try: with client.stream(fql( - 'Product.all().where(.price > 50).toStream() { name, price, category }' + 'Product.all().changesOn(.price, .quantity)' )) as stream: for event in stream: - print(event["type"]) - print(event["data"]) + print(event) + # ... except FaunaException as e: - print("error ocurred with stream: ", e) + print('error ocurred with stream: ', e) +``` +_Stream options_ + +The client configuration sets default options for the ``Client.stream()`` +method. + +You can pass a ``StreamOptions`` object to override these defaults: + +```python +options = StreamOptions( + max_attempts=5, + max_backoff=1, + start_ts=1710968002310000, + status_events=True, + ) + +client.stream(fql('Product.all().toStream()'), options) ``` ## Setup