diff --git a/README.rst b/README.rst index 89ee1bfa..0617aa6c 100644 --- a/README.rst +++ b/README.rst @@ -307,10 +307,138 @@ Change the default items per page using FQL's ``.pageSize()`` method. Event Streaming ------------------ -`Event Streaming `_ is currently available in the beta version of the driver: +The driver supports `Event Streaming `_. -- `Beta Python driver `_ -- `Beta Python driver docs `_ +Start a stream +~~~~~~~~~~~~~~ + +To get a stream token, append `\`toStream()\` +`_ +or `\`changesOn()\` +`_ +to a set from a `supported source +`_. + + +To start and subscribe to the stream, pass the stream token to +`Client.stream()`: + +.. code-block:: python + + import fauna + + from fauna import fql + from fauna.client import Client, StreamOptions + + client = Client(secret='YOUR_FAUNA_SECRET') + + response = client.query(fql(''' + let set = Product.all() + { + initialPage: set.pageSize(10), + streamToken: set.toStream() + } + ''')) + + initialPage = response.data['initialPage'] + streamToken = response.data['streamToken'] + + client.stream(streamToken) + +You can also pass a query that produces a stream token directly to +`Client.stream()`: + +.. code-block:: python + query = fql('Product.all().changesOn(.price, .quantity)') + + client.stream(query) + +Iterate on a stream +~~~~~~~~~~~~~~~~~~~ + +`Client.stream() `returns an iterator that emits events as they occur. You can use +a generator expression to iterate through the events: + +.. code-block:: python + + query = fql('Product.all().changesOn(.price, .quantity)') + + with client.stream(query) as stream: + for event in stream: + eventType = event['type'] + if (eventType == 'add'): + print('Add event: ', event) + ## ... + elif (eventType == 'update'): + print('Update event: ', event) + ## ... + elif (eventType == 'remove'): + print('Remove event: ', event) + ## ... + +Close a stream +~~~~~~~~~~~~~~ + +Use `.close()` to close a stream: + +.. code-block:: python + + query = fql('Product.all().changesOn(.price, .quantity)') + + count = 0 + with client.stream(query) as stream: + for event in stream: + print('Stream event', event) + # ... + count+=1 + + if (count == 2): + stream.close() + +Error handling +~~~~~~~~~~~~~~ + +If a non-retryable error occurs when opening or processing a stream, Fauna +raises a `FaunaException`: + +.. code-block:: python + + import fauna + + from fauna import fql + from fauna.client import Client + from fauna.errors import FaunaException + + client = Client(secret='YOUR_FAUNA_SECRET') + + try: + with client.stream(fql( + 'Product.all().changesOn(.price, .quantity)' + )) as stream: + for event in stream: + print(event) + # ... + except FaunaException as e: + print('error ocurred with stream: ', e) + +Stream options +~~~~~~~~~~~~~~ + +The :ref:`Client configuration ` sets default options for +the `Client.stream()` method. + +You can pass an `StreamOptions` object to override these defaults: + +.. code-block:: python + + options = StreamOptions( + max_attempts=5, + max_backoff=1, + start_ts=1710968002310000, + status_events=True + ) + + client.stream(fql('Product.all().toStream()'), options) Setup -----