Skip to content

Commit

Permalink
Initial change feed implementation from #290
Browse files Browse the repository at this point in the history
  • Loading branch information
ecooper committed Oct 24, 2024
1 parent dbf5646 commit c948b55
Show file tree
Hide file tree
Showing 18 changed files with 1,152 additions and 24 deletions.
4 changes: 4 additions & 0 deletions .github/workflows/pr_validate.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,14 @@ jobs:
image: fauna/faunadb:latest
ports:
- 8443:8443
env:
FLAG_ACCOUNT_CHANGE_FEEDS: "true"
alt_core:
image: fauna/faunadb:latest
ports:
- 7443:8443
env:
FLAG_ACCOUNT_CHANGE_FEEDS: "true"
strategy:
matrix:
node: ["18", "20"]
Expand Down
133 changes: 129 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# The Official JavaScript Driver for [Fauna](https://fauna.com).
# Official JavaScript Driver for [Fauna v10](https://fauna.com) (current)

[![npm Version](https://img.shields.io/npm/v/fauna.svg?maxAge=21600)](https://www.npmjs.com/package/fauna)
[![License](https://img.shields.io/badge/license-MPL_2.0-blue.svg?maxAge=2592000)](https://raw.githubusercontent.com/fauna/fauna-js/main/LICENSE)
Expand Down Expand Up @@ -34,6 +34,11 @@ See the [Fauna Documentation](https://docs.fauna.com/fauna/current/) for additio
- [Iterate on a stream](#iterate-on-a-stream)
- [Close a stream](#close-a-stream)
- [Stream options](#stream-options)
- [Change Feeds](#change-feeds-beta)
- [Request a Change Feed](#request-a-change-feed)
- [Iterate on a Change Feed](#iterate-on-a-change-feed)
- [Error handling](#error-handling)
- [Change Feed options](#change-feed-options)
- [Contributing](#contributing)
- [Set up the repo](#set-up-the-repo)
- [Run tests](#run-tests)
Expand Down Expand Up @@ -68,14 +73,12 @@ Stable versions of:
- Safari 12.1+
- Edge 79+


## API reference

API reference documentation for the driver is available at
https://fauna.github.io/fauna-js/. The docs are generated using
[TypeDoc](https://typedoc.org/).


## Install

The driver is available on [npm](https://www.npmjs.com/package/fauna). You
Expand Down Expand Up @@ -595,9 +598,131 @@ For supported properties, see
[StreamClientConfiguration](https://fauna.github.io/fauna-js/latest/types/StreamClientConfiguration.html)
in the API reference.

## Change Feeds (beta)

The driver supports [Change Feeds](https://docs.fauna.com/fauna/current/learn/track-changes/streaming/#change-feeds).

### Request a Change Feed

A Change Feed asynchronously polls an [event stream](https://docs.fauna.com/fauna/current/learn/streaming),
represented by a stream token, for events.

To get a stream token, append `toStream()` or `changesOn()` to a set from a
[supported source](https://docs.fauna.com/fauna/current/reference/streaming_reference/#supported-sources).

To get paginated events for the stream, pass the stream token to
`changeFeed()`:

```javascript
const response = await client.query(fql`
let set = Product.all()
{
initialPage: set.pageSize(10),
streamToken: set.toStream()
}
`);
const { initialPage, streamToken } = response.data;

const changeFeed = client.changeFeed(streamToken);
```

You can also pass a query that produces a stream token directly to `changeFeed()`:

```javascript
const query = fql`Product.all().changesOn(.price, .stock)`;

const changeFeed = client.changeFeed(query);
```

### Iterate on a Change Feed

`changeFeed()` returns a `ChangeFeedClient` instance that can act as an `AsyncIterator`. You can use `for await...of` to iterate through all the pages:

```ts
const query = fql`Product.all().changesOn(.price, .stock)`;
const changeFeed = client.changeFeed(query);

for await (const page of changeFeed) {
console.log("Page stats", page.stats);

for (event in page.events) {
switch (event.type) {
case "update":
case "add":
case "remove":
console.log("Stream event:", event);
// ...
break;
}
}
}
```

Alternatively, use `flatten()` to get paginated results as a single, flat array:

```ts
const query = fql`Product.all().changesOn(.price, .stock)`;
const changeFeed = client.changeFeed(query);

for await (const event of changeFeed.flatten()) {
console.log("Stream event:", event);
}
```

### Error handling

Exceptions can be raised at two different places:

1. While fetching a page
1. While iterating a page's events

This distinction allows for you to ignore errors originating from event processing.
For example:

```ts
const changeFeed = client.changeFeed(fql`
Product.all().map(.details.toUpperCase()).toStream()
`);

try {
for await (const page of changeFeed) {
// Pages will stop at the first error encountered.
// Therefore, its safe to handle an event failures
// and then pull more pages.
try {
for (const event of page.events) {
console.log("Stream event:", event);
}
} catch (error: unknown) {
console.log("Stream event error:", error);
}
}
} catch (error: unknown) {
console.log("Non-retryable error:", error);
}
```

### Change Feed options

The client configuration sets the default options for `changeFeed()`. You can pass a `ChangeFeedClientConfiguration` object to override these defaults:

```ts
const options: ChangeFeedClientConfiguration = {
long_type: "number",
max_attempts: 5,
max_backoff: 1000,
secret: "FAUNA_SECRET",
cursor: undefined,
start_ts: undefined,
};

client.changeFeed(fql`Product.all().toStream()`, options);
```

## Contributing

Any contributions are from the community are greatly appreciated!
Any contributions from the community are greatly appreciated!

If you have a suggestion that would make this better, please fork the repo and create a pull request. You may also simply open an issue. We provide templates, so please complete those to the best of your ability.

Expand Down
87 changes: 87 additions & 0 deletions __tests__/functional/change-feed-client-configuration.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import {
StreamToken,
getDefaultHTTPClient,
ChangeFeedClientConfiguration,
ChangeFeedClient,
} from "../../src";
import { getDefaultHTTPClientOptions } from "../client";

const defaultHttpClient = getDefaultHTTPClient(getDefaultHTTPClientOptions());
const defaultConfig: ChangeFeedClientConfiguration = {
secret: "secret",
long_type: "number",
max_attempts: 3,
max_backoff: 20,
query_timeout_ms: 5000,
httpClient: defaultHttpClient,
};
const dummyStreamToken = new StreamToken("dummy");

describe("ChangeFeedClientConfiguration", () => {
it("can be instantiated directly with a token", () => {
new ChangeFeedClient(dummyStreamToken, defaultConfig);
});

it("can be instantiated directly with a lambda", async () => {
new ChangeFeedClient(
() => Promise.resolve(dummyStreamToken),
defaultConfig,
);
});

it("throws a RangeError if 'max_backoff' is less than or equal to zero", async () => {
expect.assertions(1);

const config = { ...defaultConfig, max_backoff: 0 };
try {
new ChangeFeedClient(dummyStreamToken, config);
} catch (e: any) {
expect(e).toBeInstanceOf(RangeError);
}
});

it.each`
fieldName
${"long_type"}
${"httpClient"}
${"max_backoff"}
${"max_attempts"}
${"query_timeout_ms"}
${"secret"}
`(
"throws a TypeError if $fieldName provided is undefined",
async ({
fieldName,
}: {
fieldName: keyof ChangeFeedClientConfiguration;
}) => {
expect.assertions(1);

const config = { ...defaultConfig };
delete config[fieldName];
try {
new ChangeFeedClient(dummyStreamToken, config);
} catch (e: any) {
expect(e).toBeInstanceOf(TypeError);
}
},
);

it("throws a RangeError if 'max_attempts' is less than or equal to zero", async () => {
expect.assertions(1);

const config = { ...defaultConfig, max_attempts: 0 };
try {
new ChangeFeedClient(dummyStreamToken, config);
} catch (e: any) {
expect(e).toBeInstanceOf(RangeError);
}
});

it("throws a TypeError is start_ts and cursor are both provided", async () => {
const config = { ...defaultConfig, start_ts: 1, cursor: "cursor" };
expect(() => {
new ChangeFeedClient(dummyStreamToken, config);
}).toThrow(TypeError);
});
});
Loading

0 comments on commit c948b55

Please sign in to comment.