Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move streaming to GA #262

Merged
merged 16 commits into from
May 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 135 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ See the [Fauna Documentation](https://docs.fauna.com/fauna/current/) for additio
- [Query options](#query-options)
- [Query statistics](#query-statistics)
- [Pagination](#pagination)
- [Event Streaming (beta)](#event-streaming-beta)
- [Client configuration](#client-configuration)
- [Environment variables](#environment-variables)
- [Retry](#retry)
Expand All @@ -29,6 +28,11 @@ See the [Fauna Documentation](https://docs.fauna.com/fauna/current/) for additio
- [Query timeout](#query-timeout)
- [Client timeout](#client-timeout)
- [HTTP/2 session idle timeout](#http2-session-idle-timeout)
- [Event Streaming](#event-streaming)
- [Start a stream](#start-a-stream)
- [Iterate on a stream](#iterate-on-a-stream)
- [Close a stream](#close-a-stream)
- [Stream options](#stream-options)
- [Contributing](#contributing)
- [Set up the repo](#set-up-the-repo)
- [Run tests](#run-tests)
Expand Down Expand Up @@ -312,16 +316,6 @@ for await (const products of pages) {
client.close();
```


## Event Streaming (beta)

[Event Streaming](https://docs.fauna.com/fauna/current/learn/streaming) is
currently available in the beta version of the driver:

- [Beta JavaScript driver](https://www.npmjs.com/package/fauna/v/1.4.0-beta.0)
- [Beta JavaScript driver docs](https://github.com/fauna/fauna-js/tree/beta)


## Client configuration

The driver's `Client` instance comes with reasonable defaults that should be
Expand Down Expand Up @@ -443,6 +437,136 @@ const client = new Client({ http2_session_idle_ms: 6000 });
> **Warning**
> Setting `http2_session_idle_ms` to small values can lead to a race condition where requests cannot be transmitted before the session is closed, yielding `ERR_HTTP2_GOAWAY_SESSION` errors.

## Event Streaming

The driver supports [Event Streaming](https://docs.fauna.com/fauna/current/learn/streaming).

### Start a stream

To get a stream token, append
[`toStream()`](https://docs.fauna.com/fauna/current/reference/reference/schema_entities/set/tostream)
or
[`changesOn()`](https://docs.fauna.com/fauna/current/reference/reference/schema_entities/set/changeson)
to a set from a [supported
source](https://docs.fauna.com/fauna/current/reference/streaming_reference/#supported-sources).

To start and subscribe to the stream, pass the stream token to `Client.stream()`:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it a Client class or its instance client?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's an instance.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll work on getting this fixed. Thanks for raising @w01fgang!


```javascript
const response = await client.query(fql`
let set = Product.all()

{
initialPage: set.pageSize(10),
streamToken: set.toStream()
}
`);
const { initialPage, streamToken } = response.data;

client.stream(streamToken)
```

You can also pass a query that produces a stream token directly to `Client.stream()`:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here, should it be client.stream()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes — It's an instance here as well.


```javascript
const query = fql`Product.all().changesOn(.price, .quantity)`

client.stream(query)
```

### Iterate on a stream

You can iterate on the stream using an async loop:

```javascript
try {
for await (const event of stream) {
switch (event.type) {
case "update":
case "add":
case "remove":
console.log("Stream event:", event);
// ...
break;
}
}
} catch (error) {
// An error will be handled here if Fauna returns a terminal, "error" event, or
// if Fauna returns a non-200 response when trying to connect, or
// if the max number of retries on network errors is reached.

// ... handle fatal error
}
```

Or you can use a callback function:

```javascript
stream.start(
function onEvent(event) {
switch (event.type) {
case "update":
case "add":
case "remove":
console.log("Stream event:", event);
// ...
break;
}
},
function onFatalError(error) {
// An error will be handled here if Fauna returns a terminal, "error" event, or
// if Fauna returns a non-200 response when trying to connect, or
// if the max number of retries on network errors is reached.

// ... handle fatal error
}
);
```

### Close a stream

Use `<stream>.close()` to close a stream:

```javascript
const stream = await client.stream(fql`Product.all().toStream()`)

let count = 0;
for await (const event of stream) {
console.log("Stream event:", event);
// ...
count++;

// Close the stream after 2 events
if (count === 2) {
stream.close()
break;
}
}
```

### Stream options

The [client configuration](#client-configuration) sets default options for the
`Client.stream()` method.

You can pass an `options` object to override these defaults:

```javascript
const options = {
long_type: "number",
max_attempts: 5,
max_backoff: 1000,
secret: "YOUR_FAUNA_SECRET",
status_events: true,
};

client.stream(fql`Product.all().toStream()`, options)
```

For supported properties, see [Stream
options](https://docs.fauna.com/fauna/current/drivers/js-client#stream-options)
in the Fauna docs.


## Contributing

Expand Down
71 changes: 71 additions & 0 deletions __tests__/functional/stream-client-configuration.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import {
StreamClient,
StreamToken,
getDefaultHTTPClient,
StreamClientConfiguration,
} from "../../src";
import { getDefaultHTTPClientOptions } from "../client";

const defaultHttpClient = getDefaultHTTPClient(getDefaultHTTPClientOptions());
const defaultConfig: StreamClientConfiguration = {
secret: "secret",
long_type: "number",
max_attempts: 3,
max_backoff: 20,
httpStreamClient: defaultHttpClient,
};
const dummyStreamToken = new StreamToken("dummy");

describe("StreamClientConfiguration", () => {
it("can be instantiated directly with a token", () => {
new StreamClient(dummyStreamToken, defaultConfig);
});

it("can be instantiated directly with a lambda", async () => {
new StreamClient(() => Promise.resolve(dummyStreamToken), defaultConfig);
});

it.each`
fieldName
${"long_type"}
${"httpStreamClient"}
${"max_backoff"}
${"max_attempts"}
${"secret"}
`(
"throws a TypeError if $fieldName provided is undefined",
async ({ fieldName }: { fieldName: keyof StreamClientConfiguration }) => {
expect.assertions(1);

const config = { ...defaultConfig };
delete config[fieldName];
try {
new StreamClient(dummyStreamToken, config);
} catch (e: any) {
expect(e).toBeInstanceOf(TypeError);
}
},
);

it("throws a RangeError if 'max_backoff' is less than or equal to zero", async () => {
expect.assertions(1);

const config = { ...defaultConfig, max_backoff: 0 };
try {
new StreamClient(dummyStreamToken, config);
} catch (e: any) {
expect(e).toBeInstanceOf(RangeError);
}
});

it("throws a RangeError if 'max_attempts' is less than or equal to zero", async () => {
expect.assertions(1);

const config = { ...defaultConfig, max_attempts: 0 };
try {
new StreamClient(dummyStreamToken, config);
} catch (e: any) {
expect(e).toBeInstanceOf(RangeError);
}
});
});
16 changes: 14 additions & 2 deletions __tests__/integration/doc.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -138,11 +138,23 @@ describe("querying for doc types", () => {
expect(result.data.module).toBeInstanceOf(Module);
expect(result.data.document).toBeInstanceOf(Document);
expect(result.data.document.documentReference).toBeInstanceOf(
DocumentReference
DocumentReference,
);
expect(result.data.document.namedDocumentReference).toBeInstanceOf(
NamedDocumentReference
NamedDocumentReference,
);
expect(result.data.namedDocument).toBeInstanceOf(NamedDocument);
});

it("can set and read ttl", async () => {
const queryBuilder = fql`${testDoc}`;
const result = await client.query<Document>(queryBuilder);

expect(result.data.ttl).toBeUndefined();

const queryBuilderUpdate = fql`${testDoc}.update({ ttl: Time.now().add(1, "day") })`;
const resultUpdate = await client.query<Document>(queryBuilderUpdate);

expect(resultUpdate.data.ttl).toBeInstanceOf(TimeStub);
});
});
Loading
Loading