Skip to content

Commit

Permalink
fix: improve cancel logic in openAi model
Browse files Browse the repository at this point in the history
The cancel of a request was not working correctly.
With the changes the cancelToken is better taken into account.
  • Loading branch information
eneufeld committed Jan 10, 2025
1 parent d8022a1 commit 2eed0cc
Showing 1 changed file with 25 additions and 7 deletions.
32 changes: 25 additions & 7 deletions packages/ai-openai/src/node/openai-language-model.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ export class OpenAiModel implements LanguageModel {
if (request.response_format?.type === 'json_schema' && this.supportsStructuredOutput()) {
return this.handleStructuredOutputRequest(openai, request);
}
if (cancellationToken?.isCancellationRequested) {
return { text: '' };
}

let runner: ChatCompletionStream;
const tools = this.createTools(request);
Expand All @@ -95,42 +98,57 @@ export class OpenAiModel implements LanguageModel {

let runnerEnd = false;

let resolve: (part: LanguageModelStreamResponsePart) => void;
let resolve: ((part: LanguageModelStreamResponsePart) => void) | undefined;
runner.on('error', error => {
console.error('Error in OpenAI chat completion stream:', error);
runnerEnd = true;
resolve({ content: error.message });
resolve?.({ content: error.message });
});
// we need to also listen for the emitted errors, as otherwise any error actually thrown by the API will not be caught
runner.emitted('error').then(error => {
console.error('Error in OpenAI chat completion stream:', error);
runnerEnd = true;
resolve({ content: error.message });
resolve?.({ content: error.message });
});
runner.emitted('abort').then(() => {
// do nothing, as the abort event is only emitted when the runner is aborted by us
// cancel async iterator
runnerEnd = true;
});
runner.on('message', message => {
if (message.role === 'tool') {
resolve({ tool_calls: [{ id: message.tool_call_id, finished: true, result: this.getCompletionContent(message) }] });
resolve?.({ tool_calls: [{ id: message.tool_call_id, finished: true, result: this.getCompletionContent(message) }] });
}
console.debug('Received Open AI message', JSON.stringify(message));
});
runner.once('end', () => {
runnerEnd = true;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
resolve(runner.finalChatCompletion as any);
resolve?.(runner.finalChatCompletion as any);
});
if (cancellationToken?.isCancellationRequested) {
return { text: '' };
}
const asyncIterator = {
async *[Symbol.asyncIterator](): AsyncIterator<LanguageModelStreamResponsePart> {
runner.on('chunk', chunk => {
if (chunk.choices[0]?.delta) {
if (cancellationToken?.isCancellationRequested) {
resolve = undefined;
return;
}
if (resolve && chunk.choices[0]?.delta) {
resolve({ ...chunk.choices[0]?.delta });
}
});
while (!runnerEnd) {
if (cancellationToken?.isCancellationRequested) {
throw new Error('Iterator canceled');
}
const promise = new Promise<LanguageModelStreamResponsePart>((res, rej) => {
resolve = res;
cancellationToken?.onCancellationRequested(() => {
rej(new Error('Canceled'));
runnerEnd = true; // Stop the iterator
});
});
yield promise;
}
Expand Down

0 comments on commit 2eed0cc

Please sign in to comment.