Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

docs: [DOCS-3549] Rename Change Feeds to Event Feeds #200

Merged
merged 1 commit into from
Oct 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
280 changes: 140 additions & 140 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -285,138 +285,14 @@ for products in pages:
print(products)
```

## Event Streaming

The driver supports [Event Streaming](https://docs.fauna.com/fauna/current/learn/streaming).

### Start a stream

To get a stream token, append ``toStream()`` or ``changesOn()`` to a set from a
[supported source](https://docs.fauna.com/fauna/current/reference/streaming_reference/#supported-sources).


To start and subscribe to the stream, pass the stream token to
``stream()``:

```python
from fauna import fql
from fauna.client import Client

client = Client()

response = client.query(fql('''
let set = Product.all()
{
initialPage: set.pageSize(10),
streamToken: set.toStream()
}
'''))

initialPage = response.data['initialPage']
streamToken = response.data['streamToken']

client.stream(streamToken)
```
## Event Feeds (beta)

You can also pass a query that produces a stream token directly to
``stream()``:

```python
query = fql('Product.all().changesOn(.price, .stock)')

client.stream(query)
```

### Iterate on a stream

``stream()`` returns an iterator that emits events as they occur. You can
use a generator expression to iterate through the events:

```python
query = fql('Product.all().changesOn(.price, .stock)')
The driver supports [Event Feeds](https://docs.fauna.com/fauna/current/learn/track-changes/streaming/#event-feeds).

with client.stream(query) as stream:
for event in stream:
eventType = event['type']
if (eventType == 'add'):
print('Add event: ', event)
## ...
elif (eventType == 'update'):
print('Update event: ', event)
## ...
elif (eventType == 'remove'):
print('Remove event: ', event)
## ...
```

### Close a stream
### Request an Event Feed

Use ``close()`` to close a stream:

```python
query = fql('Product.all().changesOn(.price, .stock)')

count = 0
with client.stream(query) as stream:
for event in stream:
print('Stream event', event)
# ...
count+=1

if (count == 2):
stream.close()
```

### Error handling

If a non-retryable error occurs when opening or processing a stream, Fauna
raises a ``FaunaException``:

```python
from fauna import fql
from fauna.client import Client
from fauna.errors import FaunaException

client = Client()

try:
with client.stream(fql(
'Product.all().changesOn(.price, .stock)'
)) as stream:
for event in stream:
print(event)
# ...
except FaunaException as e:
print('error ocurred with stream: ', e)
```

### Stream options

The client configuration sets default options for the ``stream()``
method.

You can pass a ``StreamOptions`` object to override these defaults:

```python
options = StreamOptions(
max_attempts=3,
max_backoff=20,
start_ts=None,
cursor=None,
status_events=False,
)

client.stream(fql('Product.all().toStream()'), options)
```

## Change Feeds (beta)

The driver supports [Change Feeds](https://docs.fauna.com/fauna/current/learn/track-changes/streaming/#change-feeds).

### Request a Change Feed

A Change Feed asynchronously polls an [event stream](https://docs.fauna.com/fauna/current/learn/streaming),
represented by a stream token, for events.
An Event Feed asynchronously polls an [event stream](https://docs.fauna.com/fauna/current/learn/streaming),
represented by a stream token, for events.

To get a stream token, append ``toStream()`` or ``changesOn()`` to a set from a
[supported source](https://docs.fauna.com/fauna/current/reference/streaming_reference/#supported-sources).
Expand Down Expand Up @@ -453,7 +329,7 @@ You can also pass a query that produces a stream token directly to
client.change_feed(query)
```

### Iterate on a Change Feed
### Iterate on an Event Feed

``change_feed()`` returns an iterator that emits pages of events. You can use a
generator expression to iterate through the pages:
Expand All @@ -464,7 +340,7 @@ generator expression to iterate through the pages:

for page in feed:
print('Page stats: ', page.stats)

for event in page:
eventType = event['type']
if (eventType == 'add'):
Expand All @@ -490,20 +366,20 @@ Alternatively, you can iterate through events instead of pages with
## ...
```

The change feed iterator stops when there are no more events to poll.
The Event Feed iterator stops when there are no more events to poll.

### Error handling

If a non-retryable error occurs when opening or processing a change feed, Fauna
If a non-retryable error occurs when opening or processing an Event Feed, Fauna
raises a ``FaunaException``:

```python
from fauna import fql
from fauna.client import Client
from fauna.errors import FaunaException

client = Client()

try:
feed = client.change_feed(fql(
'Product.all().changesOn(.price, .stock)'
Expand All @@ -512,7 +388,7 @@ raises a ``FaunaException``:
print(event)
# ...
except FaunaException as e:
print('error ocurred with change feed: ', e)
print('error ocurred with event feed: ', e)
```

Errors can be raised at two different places:
Expand All @@ -527,15 +403,15 @@ processing. For example:
from fauna import fql
from fauna.client import Client
from fauna.errors import FaunaException

client = Client()

# Imagine if there are some products with details = null.
# The ones without details will fail due to the toUpperCase call.
feed = client.change_feed(fql(
'Product.all().map(.details.toUpperCase()).toStream()'
))

for page in feed:
try:
for event in page:
Expand All @@ -548,7 +424,7 @@ processing. For example:
print('error ocurred with event processing: ', e)
```

### Change Feed options
### Event Feed options

The client configuration sets default options for the ``change_feed()``
method.
Expand All @@ -568,6 +444,130 @@ options = ChangeFeedOptions(
client.change_feed(fql('Product.all().toStream()'), options)
```

## Event Streaming

The driver supports [Event Streaming](https://docs.fauna.com/fauna/current/learn/streaming).

### Start a stream

To get a stream token, append ``toStream()`` or ``changesOn()`` to a set from a
[supported source](https://docs.fauna.com/fauna/current/reference/streaming_reference/#supported-sources).


To start and subscribe to the stream, pass the stream token to
``stream()``:

```python
from fauna import fql
from fauna.client import Client

client = Client()

response = client.query(fql('''
let set = Product.all()
{
initialPage: set.pageSize(10),
streamToken: set.toStream()
}
'''))

initialPage = response.data['initialPage']
streamToken = response.data['streamToken']

client.stream(streamToken)
```

You can also pass a query that produces a stream token directly to
``stream()``:

```python
query = fql('Product.all().changesOn(.price, .stock)')

client.stream(query)
```

### Iterate on a stream

``stream()`` returns an iterator that emits events as they occur. You can
use a generator expression to iterate through the events:

```python
query = fql('Product.all().changesOn(.price, .stock)')

with client.stream(query) as stream:
for event in stream:
eventType = event['type']
if (eventType == 'add'):
print('Add event: ', event)
## ...
elif (eventType == 'update'):
print('Update event: ', event)
## ...
elif (eventType == 'remove'):
print('Remove event: ', event)
## ...
```

### Close a stream

Use ``close()`` to close a stream:

```python
query = fql('Product.all().changesOn(.price, .stock)')

count = 0
with client.stream(query) as stream:
for event in stream:
print('Stream event', event)
# ...
count+=1

if (count == 2):
stream.close()
```

### Error handling

If a non-retryable error occurs when opening or processing a stream, Fauna
raises a ``FaunaException``:

```python
from fauna import fql
from fauna.client import Client
from fauna.errors import FaunaException

client = Client()

try:
with client.stream(fql(
'Product.all().changesOn(.price, .stock)'
)) as stream:
for event in stream:
print(event)
# ...
except FaunaException as e:
print('error ocurred with stream: ', e)
```

### Stream options

The client configuration sets default options for the ``stream()``
method.

You can pass a ``StreamOptions`` object to override these defaults:

```python
options = StreamOptions(
max_attempts=3,
max_backoff=20,
start_ts=None,
cursor=None,
status_events=False,
)

client.stream(fql('Product.all().toStream()'), options)
```

## Setup

```bash
Expand Down
12 changes: 6 additions & 6 deletions fauna/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,12 @@ class StreamOptions:
@dataclass
class ChangeFeedOptions:
"""
A dataclass representing options available for a change feed.
A dataclass representing options available for an Event Feed.

* max_attempts - The maximum number of times to attempt a change feed query when a retryable exception is thrown.
* max_attempts - The maximum number of times to attempt an Event Feed query when a retryable exception is thrown.
* max_backoff - The maximum backoff in seconds for an individual retry.
* query_timeout - Controls the maximum amount of time Fauna will execute a query before returning a page of events.
* start_ts - The starting timestamp of the change feed, exclusive. If set, Fauna will return events starting after
* start_ts - The starting timestamp of the Event Feed, exclusive. If set, Fauna will return events starting after
the timestamp.
* cursor - The starting event cursor, exclusive. If set, Fauna will return events starting after the cursor.
* page_size - The desired number of events per page.
Expand Down Expand Up @@ -465,10 +465,10 @@ def change_feed(
opts: ChangeFeedOptions = ChangeFeedOptions()
) -> "ChangeFeedIterator":
"""
Opens a change feed in Fauna and returns an iterator that consume Fauna events.
Opens an Event Feed in Fauna and returns an iterator that consume Fauna events.

:param fql: A Query that returns a StreamToken or a StreamToken.
:param opts: (Optional) Change feed options.
:param opts: (Optional) Event Feed options.

:return: a :class:`ChangeFeedIterator`

Expand Down Expand Up @@ -658,7 +658,7 @@ def __iter__(self) -> Iterator[Any]:


class ChangeFeedIterator:
"""A class to provide an iterator on top of change feed pages."""
"""A class to provide an iterator on top of Event Feed pages."""

def __init__(self, http: HTTPClient, headers: Dict[str, str], endpoint: str,
max_attempts: int, max_backoff: int, opts: ChangeFeedOptions,
Expand Down
Loading