Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft: Flow client #63

Open
wants to merge 31 commits into
base: next
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
7d32067
Use 5.0.1-alpha.0
mnemitz Oct 17, 2024
db0c3ac
Add Flow client package
mnemitz Oct 17, 2024
5111d6c
Fix output dir
mnemitz Oct 17, 2024
2178a91
Readme
mnemitz Oct 17, 2024
44adca7
Fix end session
mnemitz Oct 17, 2024
9b721dd
Bump version
mnemitz Oct 17, 2024
1fb65b5
Add events for socket pending states
mnemitz Oct 17, 2024
d26fa0d
Fix disconnect
mnemitz Oct 17, 2024
b2fe272
Expand socket state, add custom errors
mnemitz Oct 18, 2024
d542aa2
Bump version
mnemitz Oct 18, 2024
8f55daa
Consolidate module entrypoint, add keywords
mnemitz Oct 18, 2024
1a76674
bump version
mnemitz Oct 18, 2024
c0fdeb8
Make `connect` method private, expose all options through `startConve…
mnemitz Oct 21, 2024
0189aa7
Bump version
mnemitz Oct 21, 2024
a8ad3db
Bump version
mnemitz Oct 21, 2024
83dbd9e
Fix missing events
mnemitz Oct 21, 2024
691543c
Add custom event for Flow messages (not based on MessageEvent)
mnemitz Oct 21, 2024
bfdda42
Fix bad event creation
mnemitz Oct 21, 2024
539658d
Add binary type
mnemitz Oct 21, 2024
ae9b264
Support array buffer for ws audio
mnemitz Oct 21, 2024
a7071dc
Fix binary type
mnemitz Oct 21, 2024
cd743de
Fix handle check
mnemitz Oct 21, 2024
ee54c44
sendAudio accepts ArrayBufferLike
mnemitz Oct 21, 2024
fba4553
Add audio format argument to `startConversation`
mnemitz Oct 22, 2024
0092125
Add format script to workspace
mnemitz Oct 22, 2024
8dda677
Bump version
mnemitz Oct 22, 2024
3720d6a
Use Int16Array for AgentAudioEvent
mnemitz Oct 22, 2024
373486b
Flush agent audio after disconnect
mnemitz Oct 22, 2024
35aeaba
Bump version
mnemitz Oct 22, 2024
0495c20
Default buffer 10ms
mnemitz Oct 23, 2024
afe71db
Bump version
mnemitz Oct 23, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/batch-client/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@speechmatics/batch-client",
"version": "0.0.1-alpha.0",
"version": "5.0.1-alpha.0",
"description": "",
"main": "dist/index.js",
"module": "dist/index.mjs",
Expand Down
5 changes: 5 additions & 0 deletions packages/flow-client/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@

# @speechmatics/flow-client 🤖

> [!WARNING]
> This package is not ready for production use. Use as an example only until the official release.
24 changes: 24 additions & 0 deletions packages/flow-client/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
{
"name": "@speechmatics/flow-client",
"version": "0.0.1-alpha.18",
"description": "",
"main": "dist/index.js",
"files": ["dist/", "README.md"],
"browser": "dist/index.browser.js",
"module": "dist/index.browser.js",
"typings": "dist/index.d.ts",
"scripts": {
"build": "pnpm rollup -c",
"format": "biome format --write .",
"lint": "biome lint --write ."
},
"keywords": ["voice", "speech", "intelligence", "assistance", "chat", "API"],
"author": "",
"license": "MIT",
"dependencies": {
"typescript-event-target": "^1.1.1"
},
"devDependencies": {
"@rollup/plugin-inject": "^5.0.5"
}
}
62 changes: 62 additions & 0 deletions packages/flow-client/rollup.config.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import esbuild from 'rollup-plugin-esbuild';
import dts from 'rollup-plugin-dts';
import inject from '@rollup/plugin-inject';

import packageJSON from './package.json' assert { type: 'json' };
const name = packageJSON.main.replace(/\.js$/, '');

export default function rollup() {
const browserESM = {
plugins: [
esbuild({
define: {
SDK_VERSION: `'${packageJSON.version}'`,
},
}),
],
input: 'src/index.ts',
output: [
{
file: `${name}.browser.js`,
format: 'es',
sourcemap: true,
},
],
};

const nodeCJS = {
plugins: [
esbuild({
define: {
SDK_VERSION: `'${packageJSON.version}'`,
},
}),
inject({
WebSocket: ['ws', 'WebSocket'],
}),
],
input: 'src/index.ts',
output: [
{
file: `${name}.js`,
format: 'cjs',
},
],
};

const typeDefinitions = {
plugins: [
dts({
compilerOptions: {
removeComments: true,
},
}),
],
input: 'src/index.ts',
output: {
file: `${name}.d.ts`,
},
};

return [browserESM, nodeCJS, typeDefinitions];
}
287 changes: 287 additions & 0 deletions packages/flow-client/src/client.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,287 @@
import { TypedEventTarget } from 'typescript-event-target';
import {
AgentAudioEvent,
FlowIncomingMessageEvent,
type FlowClientEventMap,
type FlowClientIncomingMessage,
type FlowClientOutgoingMessage,
type StartConversationMessage,
} from './events';

export interface FlowClientOptions {
appId: string;
audioBufferingMs?: number;
websocketBinaryType?: 'blob' | 'arraybuffer';
}

export class FlowClient extends TypedEventTarget<FlowClientEventMap> {
public readonly appId: string;
private readonly audioBufferingMs: number;

// Buffer for audio received from server
private agentAudioQueue:
| { type: 'blob'; queue: Blob[] }
| { type: 'arraybuffer'; queue: ArrayBuffer[] };

constructor(
public readonly server: string,
{
appId,
audioBufferingMs = 10,
websocketBinaryType = 'blob',
}: FlowClientOptions,
) {
super();
this.appId = appId;
this.audioBufferingMs = audioBufferingMs;

this.agentAudioQueue = {
type: websocketBinaryType,
queue: [],
};
}

// active websocket
private ws: WebSocket | null = null;

private serverSeqNo = 0;
private clientSeqNo = 0;

get socketState() {
if (!this.ws) return undefined;
return {
[WebSocket.CONNECTING]: 'connecting' as const,
[WebSocket.OPEN]: 'open' as const,
[WebSocket.CLOSING]: 'closing' as const,
[WebSocket.CLOSED]: 'closed' as const,
}[this.ws.readyState];
}

private async connect(jwt: string, timeoutMs = 10_000) {
const socketState = this.socketState;
if (socketState && socketState !== 'closed') {
throw new SpeechmaticsFlowError(
`Cannot start connection while socket is ${socketState}`,
);
}

const waitForConnect = new Promise((resolve, reject) => {
const wsUrl = new URL('/v1/flow', this.server);
wsUrl.searchParams.append('jwt', jwt);
wsUrl.searchParams.append('sm-app', this.appId);

this.ws = new WebSocket(wsUrl.toString());
this.ws.binaryType = this.agentAudioQueue.type;

this.dispatchTypedEvent(
'socketInitialized',
new Event('socketInitialized'),
);

// Setup socket event listeners right away
this.setupSocketEventListeners();

this.addEventListener('socketOpen', resolve, { once: true });

this.addEventListener(
'socketError',
(e) => {
reject(
new SpeechmaticsFlowError('Error opening websocket', { cause: e }),
);
},
{ once: true },
);
});

await Promise.race([
waitForConnect,
rejectAfter(timeoutMs, 'websocket connect'),
]);
}

private setupSocketEventListeners() {
if (!this.ws) throw new SpeechmaticsFlowError('socket not initialized!');

this.ws.addEventListener('open', () => {
this.dispatchTypedEvent('socketOpen', new Event('socketOpen'));
});
this.ws.addEventListener('close', () =>
this.dispatchTypedEvent('socketClose', new Event('socketClose')),
);
this.ws.addEventListener('error', (e) =>
this.dispatchTypedEvent('socketError', new Event('socketError', e)),
);

this.ws.addEventListener('message', ({ data }) => {
// handle binary audio
if (data instanceof Blob || data instanceof ArrayBuffer) {
this.handleWebsocketAudio(data);
} else if (typeof data === 'string') {
this.handleWebsocketMessage(data);
} else {
throw new SpeechmaticsFlowError(`Unexpected message type: ${data}`);
}
});
}

private handleWebsocketAudio(data: Blob | ArrayBuffer) {
// send ack as soon as we receive audio
this.sendWebsocketMessage({
message: 'AudioReceived',
seq_no: ++this.serverSeqNo,
buffering: this.audioBufferingMs / 1000,
});

if (data instanceof Blob && this.agentAudioQueue.type === 'blob') {
this.agentAudioQueue.queue.push(data);
} else if (
data instanceof ArrayBuffer &&
this.agentAudioQueue.type === 'arraybuffer'
) {
this.agentAudioQueue.queue.push(data);
} else {
throw new SpeechmaticsFlowError(
`Could not process audio: expecting audio to be ${this.agentAudioQueue.type} but got ${data.constructor.name}`,
);
}

// Flush audio queue and dispatch play events after buffer delay
setTimeout(() => {
this.flushAgentAudioQueue();
}, this.audioBufferingMs);
}

private async flushAgentAudioQueue() {
while (this.agentAudioQueue.queue.length) {
const data = this.agentAudioQueue.queue.shift();
if (!data) continue;

const arrayBuffer =
data instanceof Blob ? await data.arrayBuffer() : data;

this.dispatchTypedEvent(
'agentAudio',
new AgentAudioEvent(new Int16Array(arrayBuffer)),
);
}
}

private handleWebsocketMessage(message: string) {
// We're intentionally not validating the message shape. It is design by contract
let data: FlowClientIncomingMessage;
try {
data = JSON.parse(message);
} catch (e) {
throw new SpeechmaticsFlowError('Failed to parse message as JSON');
}

if (data.message === 'AudioAdded') {
this.clientSeqNo = data.seq_no;
}

this.dispatchTypedEvent('message', new FlowIncomingMessageEvent(data));
}

private sendWebsocketMessage(message: FlowClientOutgoingMessage) {
if (this.socketState === 'open') {
this.ws?.send(JSON.stringify(message));
}
}

public sendAudio(pcm16Data: ArrayBufferLike) {
if (this.socketState === 'open') {
this.ws?.send(pcm16Data);
}
}

async startConversation(
jwt: string,
{
config,
audioFormat,
}: {
config: StartConversationMessage['conversation_config'];
audioFormat?: StartConversationMessage['audio_format'];
},
) {
await this.connect(jwt);

const waitForConversationStarted = new Promise<void>((resolve, reject) => {
const client = this;
this.addEventListener('message', function onStart({ data }) {
if (data.message === 'ConversationStarted') {
resolve();
client.removeEventListener('message', onStart);
} else if (data.message === 'Error') {
reject(
new SpeechmaticsFlowError('Error waiting for conversation start', {
cause: data,
}),
);
}
});

const conversation_config = {
...config,
template_variables: {
timezone: Intl.DateTimeFormat().resolvedOptions().timeZone,
...config.template_variables,
},
};

const startMessage: StartConversationMessage = {
message: 'StartConversation',
conversation_config,
audio_format: audioFormat ?? DEFAULT_AUDIO_FORMAT,
};
this.sendWebsocketMessage(startMessage);
});

await Promise.race([
waitForConversationStarted,
rejectAfter(10_000, 'conversation start'),
]);
}

endConversation() {
this.sendWebsocketMessage({
message: 'AudioEnded',
last_seq_no: this.clientSeqNo,
});
this.disconnectSocket();
this.flushAgentAudioQueue();
}

private disconnectSocket() {
this.dispatchTypedEvent('socketClosing', new Event('socketClosing'));
this.ws?.close();
}
}

function rejectAfter(timeoutMs: number, key: string) {
return new Promise((_, reject) => {
setTimeout(
() =>
reject(
new SpeechmaticsFlowError(
`Timed out after ${timeoutMs}s waiting for ${key}`,
),
),
timeoutMs,
);
});
}

export class SpeechmaticsFlowError extends Error {
constructor(message: string, options?: ErrorOptions) {
super(message, options);
this.name = 'SpeechmaticsFlowError';
}
}

const DEFAULT_AUDIO_FORMAT = {
type: 'raw',
encoding: 'pcm_s16le',
sample_rate: 16000,
} as const;
Loading