Skip to content

Commit

Permalink
Apply changes from #176
Browse files Browse the repository at this point in the history
  • Loading branch information
pnwpedro committed May 17, 2024
1 parent 92c6322 commit b7ab35d
Showing 1 changed file with 81 additions and 74 deletions.
155 changes: 81 additions & 74 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -244,105 +244,95 @@ except ServiceError as e:
emit_stats(e.stats)
# more error handling...
```
## Streaming
Below are examples on how to get started with streaming in the python driver. For more information on streaming capabilities visit our [Streaming Documentation](https://docs.fauna.com/fauna/current/reference/streaming_reference/)
## Event Streaming

There are two ways a stream can be initiated with the python driver:
1. Obtaining a stream token by first issuing a fql query that returns a stream token and providing that to the client's stream method.
2. Providing the stream method with a fql query that returns a stream token
1. In this case the stream method will first issue a request to obtain the stream token and then start the stream.
The driver supports `Event Streaming <https://docs.fauna.com/fauna/current/learn/streaming>`_.

_Using a Stream Token_
```python
import fauna
_Start a stream_

from fauna import fql
from fauna.client import Client, StreamOptions
To get a stream token, append ``toStream()`` or ``changesOn()`` to a set from a
`supported source
<https://docs.fauna.com/fauna/current/reference/streaming_reference/#supported-sources>`_.

client = Client()
response = client.query(fql('''

To start and subscribe to the stream, pass the stream token to
``Client.stream()``:

```python
from fauna import fql
from fauna.client import Client

client = Client()

response = client.query(fql('''
let set = Product.all()
{
{
initialPage: set.pageSize(10),
streamToken: set.toStream() { name, price, category }
streamToken: set.toStream()
}
'''))

initialPage = response.data['initialPage']
streamToken = response.data['streamToken']

initialPage = response.data["initialPage"]
streamToken = response.data["streamToken"]

with client.stream(streamToken) as stream:
for event in stream:
print(event["type"])
print(event["data"])
eventType = event["type"]
if (eventType == "add"):
print("add event")
## handle add event
elif (eventType == "update"):
print("update event")
## handle update event
elif (eventType == "remove"):
print("remove event")
## handle remove event
client.stream(streamToken)
```

_Using the client stream method directly_
You can also pass a query that produces a stream token directly to
``Client.stream()``:

```python
import fauna
query = fql('Product.all().changesOn(.price, .quantity)')

from fauna import fql
from fauna.client import Client
client.stream(query)
```

client = Client()
_Iterate on a stream_

with client.stream(fql(
'Product.all().where(.price > 50).toStream() { name, price, category }'
)) as stream:
for event in stream:
print(event["type"])
print(event["data"])
```
``Client.stream()`` returns an iterator that emits events as they occur. You can
use a generator expression to iterate through the events:

_Stream events iterator_
```python
query = fql('Product.all().changesOn(.price, .quantity)')

The stream method returns an iterator that can be used to provide the events as they happen. See above examples for using the iterator to process events.
with client.stream(query) as stream:
for event in stream:
eventType = event['type']
if (eventType == 'add'):
print('Add event: ', event)
## ...
elif (eventType == 'update'):
print('Update event: ', event)
## ...
elif (eventType == 'remove'):
print('Remove event: ', event)
## ...
```

_Close a stream_

Use the `<stream>.close()` method to close a stream.
Use ``<stream>.close()`` to close a stream:

```python
import fauna

from fauna import fql
from fauna.client import Client

client = Client()

events = []
query = fql('Product.all().changesOn(.price, .quantity)')

with client.stream(fql(
'Product.all().where(.price > 50).toStream() { name, price, category }'
)) as stream:
count = 0
with client.stream(query) as stream:
for event in stream:
print(event["type"])
print(event["data"])

events.append(event)
# Close the stream after 2 events
if len(events) == 2:
print("Closing the stream ...")
stream.close()
print('Stream event', event)
# ...
count+=1

if (count == 2):
stream.close()
```

_Error Handling_
_Error handling_

If a non retryable error occurs as part of stream processing, or when attempting to open a stream, a FaunaException will be raised. This can be handled as follows:
```python
import fauna
If a non-retryable error occurs when opening or processing a stream, Fauna
raises a ``FaunaException``:

```python
from fauna import fql
from fauna.client import Client
from fauna.errors import FaunaException
Expand All @@ -351,13 +341,30 @@ client = Client()

try:
with client.stream(fql(
'Product.all().where(.price > 50).toStream() { name, price, category }'
'Product.all().changesOn(.price, .quantity)'
)) as stream:
for event in stream:
print(event["type"])
print(event["data"])
print(event)
# ...
except FaunaException as e:
print("error ocurred with stream: ", e)
print('error ocurred with stream: ', e)
```
_Stream options_

The client configuration sets default options for the ``Client.stream()``
method.

You can pass a ``StreamOptions`` object to override these defaults:

```python
options = StreamOptions(
max_attempts=5,
max_backoff=1,
start_ts=1710968002310000,
status_events=True,
)

client.stream(fql('Product.all().toStream()'), options)
```

## Setup
Expand Down

0 comments on commit b7ab35d

Please sign in to comment.