From f710672b7e413dc87e196dafeb2c45a04b704fb4 Mon Sep 17 00:00:00 2001 From: aboutphilippe Date: Mon, 19 Aug 2024 13:15:48 +0200 Subject: [PATCH] consistent typed events --- openai/functions/chat/completionsStream.ts | 35 ++++++++++++--- openai/package.json | 2 +- websocket/functions/listen.ts | 50 ++++++++++++++-------- websocket/functions/send.ts | 25 ++++------- websocket/package.json | 2 +- 5 files changed, 74 insertions(+), 40 deletions(-) diff --git a/openai/functions/chat/completionsStream.ts b/openai/functions/chat/completionsStream.ts index 34020f7..bc49fc3 100644 --- a/openai/functions/chat/completionsStream.ts +++ b/openai/functions/chat/completionsStream.ts @@ -12,6 +12,19 @@ import { SendWorkflowEvent } from "@restackio/restack-sdk-ts/event"; import { openaiClient } from "../../utils/client"; import { openaiCost, Price } from "../../utils/cost"; +export type StreamEvent = { + response: string; + isLast: boolean; +}; + +export type ToolCallEvent = + OpenAI.Chat.Completions.ChatCompletionChunk.Choice.Delta.ToolCall & { + function: { + name: string; + input: JSON; + }; + }; + export async function openaiChatCompletionsStream({ newMessage, messages = [], @@ -69,10 +82,21 @@ export async function openaiChatCompletionsStream({ await Promise.all( toolCalls.map((toolCall) => { if (toolEvent) { + const functionArguments = JSON.parse( + toolCall.function?.arguments ?? "" + ); + + const input: ToolCallEvent = { + ...toolCall, + function: { + name: toolCall.function?.name, + input: functionArguments, + }, + }; const workflowEvent = { ...workflow, ...toolEvent, - input: JSON.parse(toolCall.function?.arguments ?? ""), + input, }; log.debug("toolEvent sendWorkflowEvent", { workflowEvent }); if (streamEvent) { @@ -88,13 +112,14 @@ export async function openaiChatCompletionsStream({ finishReason === "stop" ) { if (response.length) { + const input: StreamEvent = { + response, + isLast: finishReason === "stop", + }; const workflowEvent = { ...workflow, ...streamEvent, - input: { - response: response, - isLast: finishReason === "stop", - }, + input, }; log.debug("streamEvent sendWorkflowEvent", { workflowEvent }); if (streamEvent) { diff --git a/openai/package.json b/openai/package.json index 553bd07..ace23d8 100644 --- a/openai/package.json +++ b/openai/package.json @@ -1,6 +1,6 @@ { "name": "@restackio/integrations-openai", - "version": "0.0.2", + "version": "0.0.3", "main": "dist/service.js", "types": "dist/service.d.ts", "scripts": { diff --git a/websocket/functions/listen.ts b/websocket/functions/listen.ts index cee05ee..c320b42 100644 --- a/websocket/functions/listen.ts +++ b/websocket/functions/listen.ts @@ -10,14 +10,23 @@ import { WorkflowEvent, } from "@restackio/restack-sdk-ts/event"; -type WebsocketEvent = WorkflowEvent & { - name: string; - input: { +export type WebsocketEvent = { + streamSid: string; + media?: { + track: string; + payload: string; + }; + data?: { track: string; [key: string]: any; }; }; +export type SendWebsocketEvent = WorkflowEvent & { + name: string; + input: WebsocketEvent; +}; + export async function websocketListen({ streamSid, mediaEvents, @@ -51,16 +60,18 @@ export async function websocketListen({ if (!cleanedPayload) { return; } + + const input: WebsocketEvent = { + streamSid: message.streamSid, + media: { + track: mediaEvent.event.input.track, + payload: cleanedPayload, + }, + }; const workflowEvent = { ...workflow, ...mediaEvent, - input: { - streamSid, - payload: { - ...message.media.payload, - track: mediaEvent.event.input.track, - }, - }, + input, }; log.debug("mediaEvent sendWorkflowEvent", { workflowEvent }); @@ -71,16 +82,17 @@ export async function websocketListen({ if (dataEvents) { dataEvents.forEach((dataEvent) => { if (message.event === dataEvent.event.name) { + const input: WebsocketEvent = { + streamSid: message.streamSid, + data: { + ...message.data, + track: dataEvent.event.input.track, + }, + }; const workflowEvent = { ...workflow, ...dataEvent, - input: { - streamSid, - data: { - ...message.data, - track: dataEvent.event.input.track, - }, - }, + input, }; log.debug("dataEvent sendWorkflowEvent", { workflowEvent }); @@ -90,9 +102,13 @@ export async function websocketListen({ } heartbeat(message.streamSid); if (message.event === "stop") { + const input: WebsocketEvent = { + streamSid: message.streamSid, + }; const workflowEvent = { ...workflow, ...stopEvent, + input, }; log.debug("stopEvent sendWorkflowEvent", { workflowEvent }); diff --git a/websocket/functions/send.ts b/websocket/functions/send.ts index 27a0d30..7bccd35 100644 --- a/websocket/functions/send.ts +++ b/websocket/functions/send.ts @@ -1,29 +1,22 @@ import { websocketConnect } from "../utils/client"; +import { WebsocketEvent } from "./listen"; export async function websocketSend({ - streamSid, - eventName, - data, - media, + name, + input, address, }: { - streamSid: string; - eventName: string; - data?: { - track: string; - [key: string]: any; - }; - media?: { - track: string; - payload: string; - }; + name: string; + input: WebsocketEvent; address?: string; }) { const ws = await websocketConnect({ address }); + const { streamSid, data, media } = input; + const event = { - streamSid: streamSid, - event: eventName, + streamSid, + event: name, data, media, }; diff --git a/websocket/package.json b/websocket/package.json index edf387e..475cb6c 100644 --- a/websocket/package.json +++ b/websocket/package.json @@ -1,6 +1,6 @@ { "name": "@restackio/integrations-websocket", - "version": "0.0.1", + "version": "0.0.2", "main": "dist/service.js", "types": "dist/service.d.ts", "scripts": {