Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DOCS-2590] Move Streaming to GA #176

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 133 additions & 3 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -307,10 +307,140 @@ Change the default items per page using FQL's ``<set>.pageSize()`` method.
Event Streaming
------------------

`Event Streaming <https://docs.fauna.com/fauna/current/learn/streaming>`_ is currently available in the beta version of the driver:
The driver supports `Event Streaming <https://docs.fauna.com/fauna/current/learn/streaming>`_.

- `Beta Python driver <https://pypi.org/project/fauna/1.2.0b4/>`_
- `Beta Python driver docs <https://github.com/fauna/fauna-python/tree/beta>`_
Start a stream
~~~~~~~~~~~~~~

To get a stream token, append ``toStream()`` or ``changesOn()`` to a set from a
`supported source
<https://docs.fauna.com/fauna/current/reference/streaming_reference/#supported-sources>`_.


To start and subscribe to the stream, pass the stream token to
``Client.stream()``:

.. code-block:: python

import fauna

from fauna import fql
from fauna.client import Client, StreamOptions

client = Client()

response = client.query(fql('''
let set = Product.all()
{
initialPage: set.pageSize(10),
streamToken: set.toStream()
}
'''))

initialPage = response.data['initialPage']
streamToken = response.data['streamToken']

client.stream(streamToken)

You can also pass a query that produces a stream token directly to
``Client.stream()``:

.. code-block:: python

query = fql('Product.all().changesOn(.price, .quantity)')

client.stream(query)

Iterate on a stream
~~~~~~~~~~~~~~~~~~~

``Client.stream()`` returns an iterator that emits events as they occur. You can
use a generator expression to iterate through the events:

.. code-block:: python

query = fql('Product.all().changesOn(.price, .quantity)')

with client.stream(query) as stream:
for event in stream:
eventType = event['type']
if (eventType == 'add'):
print('Add event: ', event)
## ...
elif (eventType == 'update'):
print('Update event: ', event)
## ...
elif (eventType == 'remove'):
print('Remove event: ', event)
## ...

Close a stream
~~~~~~~~~~~~~~

Use ``<stream>.close()`` to close a stream:

.. code-block:: python

query = fql('Product.all().changesOn(.price, .quantity)')

count = 0
with client.stream(query) as stream:
for event in stream:
print('Stream event', event)
# ...
count+=1

if (count == 2):
stream.close()

Error handling
~~~~~~~~~~~~~~

If a non-retryable error occurs when opening or processing a stream, Fauna
raises a ``FaunaException``:

.. code-block:: python

import fauna

from fauna import fql
from fauna.client import Client
from fauna.errors import FaunaException

client = Client()

try:
with client.stream(fql(
'Product.all().changesOn(.price, .quantity)'
)) as stream:
for event in stream:
print(event)
# ...
except FaunaException as e:
print('error ocurred with stream: ', e)

Stream options
~~~~~~~~~~~~~~

The client configuration sets default options for the ``Client.stream()``
method.

You can pass a ``StreamOptions`` object to override these defaults:

.. code-block:: python

options = StreamOptions(
max_attempts=5,
max_backoff=1,
start_ts=1710968002310000,
status_events=True
)

client.stream(fql('Product.all().toStream()'), options)

For supported properties, see `Stream options
<https://docs.fauna.com/fauna/current/drivers/py-client#stream-options>`_
in the Fauna docs.

Setup
-----
Expand Down
Loading