Skip to content

Commit

Permalink
consistent typed events
Browse files Browse the repository at this point in the history
  • Loading branch information
aboutphilippe committed Aug 19, 2024
1 parent c93f15e commit f710672
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 40 deletions.
35 changes: 30 additions & 5 deletions openai/functions/chat/completionsStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,19 @@ import { SendWorkflowEvent } from "@restackio/restack-sdk-ts/event";
import { openaiClient } from "../../utils/client";
import { openaiCost, Price } from "../../utils/cost";

export type StreamEvent = {
response: string;
isLast: boolean;
};

export type ToolCallEvent =
OpenAI.Chat.Completions.ChatCompletionChunk.Choice.Delta.ToolCall & {
function: {
name: string;
input: JSON;
};
};

export async function openaiChatCompletionsStream({
newMessage,
messages = [],
Expand Down Expand Up @@ -69,10 +82,21 @@ export async function openaiChatCompletionsStream({
await Promise.all(
toolCalls.map((toolCall) => {
if (toolEvent) {
const functionArguments = JSON.parse(
toolCall.function?.arguments ?? ""
);

const input: ToolCallEvent = {
...toolCall,
function: {
name: toolCall.function?.name,
input: functionArguments,
},
};
const workflowEvent = {
...workflow,
...toolEvent,
input: JSON.parse(toolCall.function?.arguments ?? ""),
input,
};
log.debug("toolEvent sendWorkflowEvent", { workflowEvent });
if (streamEvent) {
Expand All @@ -88,13 +112,14 @@ export async function openaiChatCompletionsStream({
finishReason === "stop"
) {
if (response.length) {
const input: StreamEvent = {
response,
isLast: finishReason === "stop",
};
const workflowEvent = {
...workflow,
...streamEvent,
input: {
response: response,
isLast: finishReason === "stop",
},
input,
};
log.debug("streamEvent sendWorkflowEvent", { workflowEvent });
if (streamEvent) {
Expand Down
2 changes: 1 addition & 1 deletion openai/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@restackio/integrations-openai",
"version": "0.0.2",
"version": "0.0.3",
"main": "dist/service.js",
"types": "dist/service.d.ts",
"scripts": {
Expand Down
50 changes: 33 additions & 17 deletions websocket/functions/listen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,23 @@ import {
WorkflowEvent,
} from "@restackio/restack-sdk-ts/event";

type WebsocketEvent = WorkflowEvent & {
name: string;
input: {
export type WebsocketEvent = {
streamSid: string;
media?: {
track: string;
payload: string;
};
data?: {
track: string;
[key: string]: any;
};
};

export type SendWebsocketEvent = WorkflowEvent & {
name: string;
input: WebsocketEvent;
};

export async function websocketListen({
streamSid,
mediaEvents,
Expand Down Expand Up @@ -51,16 +60,18 @@ export async function websocketListen({
if (!cleanedPayload) {
return;
}

const input: WebsocketEvent = {
streamSid: message.streamSid,
media: {
track: mediaEvent.event.input.track,
payload: cleanedPayload,
},
};
const workflowEvent = {
...workflow,
...mediaEvent,
input: {
streamSid,
payload: {
...message.media.payload,
track: mediaEvent.event.input.track,
},
},
input,
};
log.debug("mediaEvent sendWorkflowEvent", { workflowEvent });

Expand All @@ -71,16 +82,17 @@ export async function websocketListen({
if (dataEvents) {
dataEvents.forEach((dataEvent) => {
if (message.event === dataEvent.event.name) {
const input: WebsocketEvent = {
streamSid: message.streamSid,
data: {
...message.data,
track: dataEvent.event.input.track,
},
};
const workflowEvent = {
...workflow,
...dataEvent,
input: {
streamSid,
data: {
...message.data,
track: dataEvent.event.input.track,
},
},
input,
};
log.debug("dataEvent sendWorkflowEvent", { workflowEvent });

Expand All @@ -90,9 +102,13 @@ export async function websocketListen({
}
heartbeat(message.streamSid);
if (message.event === "stop") {
const input: WebsocketEvent = {
streamSid: message.streamSid,
};
const workflowEvent = {
...workflow,
...stopEvent,
input,
};
log.debug("stopEvent sendWorkflowEvent", { workflowEvent });

Expand Down
25 changes: 9 additions & 16 deletions websocket/functions/send.ts
Original file line number Diff line number Diff line change
@@ -1,29 +1,22 @@
import { websocketConnect } from "../utils/client";
import { WebsocketEvent } from "./listen";

export async function websocketSend({
streamSid,
eventName,
data,
media,
name,
input,
address,
}: {
streamSid: string;
eventName: string;
data?: {
track: string;
[key: string]: any;
};
media?: {
track: string;
payload: string;
};
name: string;
input: WebsocketEvent;
address?: string;
}) {
const ws = await websocketConnect({ address });

const { streamSid, data, media } = input;

const event = {
streamSid: streamSid,
event: eventName,
streamSid,
event: name,
data,
media,
};
Expand Down
2 changes: 1 addition & 1 deletion websocket/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@restackio/integrations-websocket",
"version": "0.0.1",
"version": "0.0.2",
"main": "dist/service.js",
"types": "dist/service.d.ts",
"scripts": {
Expand Down

0 comments on commit f710672

Please sign in to comment.