From 1950121829a1e3dc88db6c3b81f45594fd219efb Mon Sep 17 00:00:00 2001 From: fegloff Date: Mon, 18 Mar 2024 11:24:27 -0500 Subject: [PATCH] add stream completion + fix conversation history --- src/modules/llms/api/athropic.ts | 191 ++++++++++++++++--------------- src/modules/llms/api/llmApi.ts | 9 +- src/modules/llms/helpers.ts | 19 ++- src/modules/llms/index.ts | 76 +++++++++++- 4 files changed, 192 insertions(+), 103 deletions(-) diff --git a/src/modules/llms/api/athropic.ts b/src/modules/llms/api/athropic.ts index ad8fe34a..9eccd2c6 100644 --- a/src/modules/llms/api/athropic.ts +++ b/src/modules/llms/api/athropic.ts @@ -1,12 +1,12 @@ -import axios from 'axios' -// import { type Readable } from 'stream' +import axios, { type AxiosResponse } from 'axios' +import { type Readable } from 'stream' +import { GrammyError } from 'grammy' +import { pino } from 'pino' import config from '../../../config' -import { type ChatConversation } from '../../types' // , type OnCallBackQueryData, type OnMessageContext, +import { type OnCallBackQueryData, type OnMessageContext, type ChatConversation } from '../../types' import { type LlmCompletion } from './llmApi' import { LlmsModelsEnum } from '../types' -// import { GrammyError } from 'grammy' -import { pino } from 'pino' const logger = pino({ name: 'anthropic - llmsBot', @@ -55,94 +55,95 @@ export const anthropicCompletion = async ( price: 0 } } -// export const anthropicCompletion = async ( -// conversation: ChatConversation[], -// model = LlmsModelsEnum.CLAUDE_OPUS, -// ctx: OnMessageContext | OnCallBackQueryData, -// msgId: number -// ): Promise => { -// const data = { -// model, -// stream: true, // Set stream to true to receive the completion as a stream -// system: config.openAi.chatGpt.chatCompletionContext, -// max_tokens: +config.openAi.chatGpt.maxTokens, -// messages: conversation -// } -// let wordCount = 0 -// let wordCountMinimum = 2 -// const url = `${API_ENDPOINT}/anthropic/completions` -// if (!ctx.chat?.id) { -// throw new Error('Context chat id should not be empty after openAI streaming') -// } -// const response = await axios.post(url, data, { responseType: 'stream' }) - -// // Create a Readable stream from the response -// const completionStream: Readable = response.data - -// // Read and process the stream -// let completion = '' -// let outputTokens = '' -// let inputTokens = '' -// completionStream.on('data', (chunk: any) => { -// const sendMessage = async (completion: string): Promise => { -// await ctx.api -// .editMessageText(ctx.chat?.id, msgId, completion) -// .catch(async (e: any) => { -// if (e instanceof GrammyError) { -// if (e.error_code !== 400) { -// throw e -// } else { -// logger.error(e) -// } -// } else { -// throw e -// } -// }) -// } -// const msg = chunk.toString() -// if (msg) { -// if (msg.startsWith('Input Token')) { -// inputTokens = msg.split('Input Token: ')[1] -// } else if (msg.startsWith('Text')) { -// wordCount++ -// completion += msg.split('Text: ')[1] -// if (wordCount > wordCountMinimum) { // if (chunck === '.' && wordCount > wordCountMinimum) { -// if (wordCountMinimum < 64) { -// wordCountMinimum *= 2 -// } -// completion = completion.replaceAll('...', '') -// completion += '...' -// wordCount = 0 -// if (ctx.chat?.id) { -// await sendMessage(completion) -// } -// } -// } else if (msg.startsWith('Output Tokens')) { -// outputTokens = msg.split('Output Tokens: ')[1] -// } -// } -// }) - -// completionStream.on('end', () => { -// const totalOutputTokens = outputTokens // response.headers['x-openai-output-tokens'] -// const totalInputTokens = inputTokens // response.headers['x-openai-input-tokens'] -// console.log('FCO stream', completion) -// // You can also process the completion content here -// return { -// completion: { -// content: completion, -// role: 'assistant', -// model -// }, -// usage: parseInt(totalOutputTokens, 10) + parseInt(totalInputTokens, 10), -// price: 0 -// } -// }) - -// return { -// completion: undefined, -// usage: 0, -// price: 0 -// } -// } +export const anthropicStreamCompletion = async ( + conversation: ChatConversation[], + model = LlmsModelsEnum.CLAUDE_OPUS, + ctx: OnMessageContext | OnCallBackQueryData, + msgId: number, + limitTokens = true +): Promise => { + const data = { + model, + stream: true, // Set stream to true to receive the completion as a stream + system: config.openAi.chatGpt.chatCompletionContext, + max_tokens: limitTokens ? +config.openAi.chatGpt.maxTokens : undefined, + messages: conversation.map(m => { return { content: m.content, role: m.role } }) + } + let wordCount = 0 + let wordCountMinimum = 2 + const url = `${API_ENDPOINT}/anthropic/completions` + if (!ctx.chat?.id) { + throw new Error('Context chat id should not be empty after openAI streaming') + } + const response: AxiosResponse = await axios.post(url, data, { responseType: 'stream' }) + // Create a Readable stream from the response + const completionStream: Readable = response.data + // Read and process the stream + let completion = '' + let outputTokens = '' + let inputTokens = '' + for await (const chunk of completionStream) { + const msg = chunk.toString() + if (msg) { + if (msg.startsWith('Input Token')) { + inputTokens = msg.split('Input Token: ')[1] + } else if (msg.startsWith('Text')) { + wordCount++ + completion += msg.split('Text: ')[1] + if (wordCount > wordCountMinimum) { // if (chunck === '.' && wordCount > wordCountMinimum) { + if (wordCountMinimum < 64) { + wordCountMinimum *= 2 + } + completion = completion.replaceAll('...', '') + completion += '...' + wordCount = 0 + if (ctx.chat?.id) { + await ctx.api + .editMessageText(ctx.chat?.id, msgId, completion) + .catch(async (e: any) => { + if (e instanceof GrammyError) { + if (e.error_code !== 400) { + throw e + } else { + logger.error(e) + } + } else { + throw e + } + }) + } + } + } else if (msg.startsWith('Output Tokens')) { + outputTokens = msg.split('Output Tokens: ')[1] + } + } + } + completion = completion.replaceAll('...', '') + await ctx.api + .editMessageText(ctx.chat?.id, msgId, completion) + .catch((e: any) => { + if (e instanceof GrammyError) { + if (e.error_code !== 400) { + throw e + } else { + logger.error(e) + } + } else { + throw e + } + }) + const totalOutputTokens = outputTokens // response.headers['x-openai-output-tokens'] + const totalInputTokens = inputTokens // response.headers['x-openai-input-tokens'] + return { + completion: { + content: completion, + role: 'assistant', + model + }, + usage: parseInt(totalOutputTokens, 10) + parseInt(totalInputTokens, 10), + price: 0, + inputTokens: parseInt(totalInputTokens, 10), + outputTokens: parseInt(totalOutputTokens, 10) + } +} diff --git a/src/modules/llms/api/llmApi.ts b/src/modules/llms/api/llmApi.ts index 85aa70f2..0169495e 100644 --- a/src/modules/llms/api/llmApi.ts +++ b/src/modules/llms/api/llmApi.ts @@ -2,7 +2,8 @@ import axios from 'axios' import config from '../../../config' import { type ChatConversation } from '../../types' import pino from 'pino' -import { LlmsModelsEnum } from '../types' +import { LlmsModels, LlmsModelsEnum } from '../types' +import { type ChatModel } from '../../open-ai/types' const API_ENDPOINT = config.llms.apiEndpoint // config.llms.apiEndpoint // 'http://localhost:8080' // http://127.0.0.1:5000' // config.llms.apiEndpoint @@ -18,6 +19,8 @@ export interface LlmCompletion { completion: ChatConversation | undefined usage: number price: number + inputTokens?: number + outputTokens?: number } interface LlmAddUrlDocument { @@ -33,6 +36,10 @@ interface QueryUrlDocument { conversation?: ChatConversation[] } +export const getChatModel = (modelName: string): ChatModel => { + return LlmsModels[modelName] +} + export const llmAddUrlDocument = async (args: LlmAddUrlDocument): Promise => { const data = { ...args } const endpointUrl = `${API_ENDPOINT}/collections/document` diff --git a/src/modules/llms/helpers.ts b/src/modules/llms/helpers.ts index c1be8b9e..9d82dd72 100644 --- a/src/modules/llms/helpers.ts +++ b/src/modules/llms/helpers.ts @@ -8,7 +8,9 @@ import { import { type ParseMode } from 'grammy/types' import { LlmsModelsEnum } from './types' import { type Message } from 'grammy/out/types' -import { llmAddUrlDocument } from './api/llmApi' +import { type LlmCompletion, getChatModel, llmAddUrlDocument } from './api/llmApi' +import { getChatModelPrice } from '../open-ai/api/openAi' +import config from '../../config' export enum SupportedCommands { bardF = 'bard', @@ -213,11 +215,18 @@ export const hasPrefix = (prompt: string): string => { ) } -export const getPromptPrice = (completion: string, data: ChatPayload): { price: number, promptTokens: number, completionTokens: number } => { +export const getPromptPrice = (completion: LlmCompletion, data: ChatPayload): { price: number, promptTokens: number, completionTokens: number } => { + const { ctx, model } = data + const modelPrice = getChatModel(model) + const price = + getChatModelPrice(modelPrice, true, completion.inputTokens ?? 0, completion.outputTokens ?? 0) * + config.openAi.chatGpt.priceAdjustment + ctx.session.llms.usage += completion.outputTokens ?? 0 + ctx.session.llms.price += price return { - price: 0, - promptTokens: 10, - completionTokens: 60 + price, + promptTokens: completion.inputTokens ?? 0, + completionTokens: completion.outputTokens ?? 0 } } diff --git a/src/modules/llms/index.ts b/src/modules/llms/index.ts index 8c5a9a58..c6dd8bc0 100644 --- a/src/modules/llms/index.ts +++ b/src/modules/llms/index.ts @@ -19,6 +19,7 @@ import { sleep } from '../sd-images/utils' import { addDocToCollection, addUrlToCollection, + getPromptPrice, hasBardPrefix, hasClaudeOpusPrefix, hasLlamaPrefix, @@ -38,7 +39,7 @@ import * as Sentry from '@sentry/node' import { now } from '../../utils/perf' import { AxiosError } from 'axios' import OpenAI from 'openai' -import { anthropicCompletion } from './api/athropic' +import { anthropicCompletion, anthropicStreamCompletion } from './api/athropic' export class LlmsBot implements PayableBot { public readonly module = 'LlmsBot' private readonly logger: Logger @@ -548,6 +549,72 @@ export class LlmsBot implements PayableBot { ctx.transient.analytics.actualResponseTime = now() } + private async completionGen (data: ChatPayload, msgId?: number, outputFormat = 'text'): Promise< { price: number, chat: ChatConversation[] }> { + const { conversation, ctx, model } = data + try { + if (!msgId) { + ctx.transient.analytics.firstResponseTime = now() + msgId = ( + await ctx.reply('...', { + message_thread_id: + ctx.message?.message_thread_id ?? + ctx.message?.reply_to_message?.message_thread_id + }) + ).message_id + } + if (outputFormat === 'text') { + const isTypingEnabled = config.openAi.chatGpt.isTypingEnabled + if (isTypingEnabled) { + ctx.chatAction = 'typing' + } + const completion = await anthropicStreamCompletion( + conversation, + model as LlmsModelsEnum, + ctx, + msgId, + true // telegram messages has a character limit + ) + if (isTypingEnabled) { + ctx.chatAction = null + } + if (completion) { + ctx.transient.analytics.sessionState = RequestState.Success + ctx.transient.analytics.actualResponseTime = now() + const price = getPromptPrice(completion, data) + this.logger.info( + `streamChatCompletion result = tokens: ${price.promptTokens + price.completionTokens} | ${model} | price: ${price.price}ยข` // } + ) + conversation.push({ + role: 'assistant', + content: completion.completion?.content ?? '' + }) + return { + price: price.price, + chat: conversation + } + } + } else { + const response = await anthropicCompletion(conversation, model as LlmsModelsEnum) + conversation.push({ + role: 'assistant', + content: response.completion?.content ?? '' + }) + return { + price: response.price, + chat: conversation + } + } + return { + price: 0, + chat: conversation + } + } catch (e: any) { + Sentry.captureException(e) + ctx.chatAction = null + throw e + } + } + private async promptGen (data: ChatPayload): Promise<{ price: number, chat: ChatConversation[] }> { const { conversation, ctx, model } = data if (!ctx.chat?.id) { @@ -686,7 +753,12 @@ export class LlmsBot implements PayableBot { model: model ?? config.llms.model, ctx } - const result = await this.promptGen(payload) + let result: { price: number, chat: ChatConversation[] } = { price: 0, chat: [] } + if (model === LlmsModelsEnum.CLAUDE_OPUS || model === LlmsModelsEnum.CLAUDE_SONNET) { + result = await this.completionGen(payload) // , prompt.msgId, prompt.outputFormat) + } else { + result = await this.promptGen(payload) + } ctx.session.llms.chatConversation = [...result.chat] if ( !(await this.payments.pay(ctx as OnMessageContext, result.price))