Skip to content

Commit

Permalink
add stream completion + fix conversation history
Browse files Browse the repository at this point in the history
  • Loading branch information
fegloff committed Mar 18, 2024
1 parent 08d51f2 commit 1950121
Show file tree
Hide file tree
Showing 4 changed files with 192 additions and 103 deletions.
191 changes: 96 additions & 95 deletions src/modules/llms/api/athropic.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import axios from 'axios'
// import { type Readable } from 'stream'
import axios, { type AxiosResponse } from 'axios'
import { type Readable } from 'stream'
import { GrammyError } from 'grammy'
import { pino } from 'pino'

import config from '../../../config'
import { type ChatConversation } from '../../types' // , type OnCallBackQueryData, type OnMessageContext,
import { type OnCallBackQueryData, type OnMessageContext, type ChatConversation } from '../../types'
import { type LlmCompletion } from './llmApi'
import { LlmsModelsEnum } from '../types'
// import { GrammyError } from 'grammy'
import { pino } from 'pino'

const logger = pino({
name: 'anthropic - llmsBot',
Expand Down Expand Up @@ -55,94 +55,95 @@ export const anthropicCompletion = async (
price: 0
}
}
// export const anthropicCompletion = async (
// conversation: ChatConversation[],
// model = LlmsModelsEnum.CLAUDE_OPUS,
// ctx: OnMessageContext | OnCallBackQueryData,
// msgId: number
// ): Promise<LlmCompletion> => {
// const data = {
// model,
// stream: true, // Set stream to true to receive the completion as a stream
// system: config.openAi.chatGpt.chatCompletionContext,
// max_tokens: +config.openAi.chatGpt.maxTokens,
// messages: conversation
// }
// let wordCount = 0
// let wordCountMinimum = 2
// const url = `${API_ENDPOINT}/anthropic/completions`
// if (!ctx.chat?.id) {
// throw new Error('Context chat id should not be empty after openAI streaming')
// }
// const response = await axios.post(url, data, { responseType: 'stream' })

// // Create a Readable stream from the response
// const completionStream: Readable = response.data

// // Read and process the stream
// let completion = ''
// let outputTokens = ''
// let inputTokens = ''
// completionStream.on('data', (chunk: any) => {
// const sendMessage = async (completion: string): Promise<void> => {
// await ctx.api
// .editMessageText(ctx.chat?.id, msgId, completion)
// .catch(async (e: any) => {
// if (e instanceof GrammyError) {
// if (e.error_code !== 400) {
// throw e
// } else {
// logger.error(e)
// }
// } else {
// throw e
// }
// })
// }
// const msg = chunk.toString()
// if (msg) {
// if (msg.startsWith('Input Token')) {
// inputTokens = msg.split('Input Token: ')[1]
// } else if (msg.startsWith('Text')) {
// wordCount++
// completion += msg.split('Text: ')[1]
// if (wordCount > wordCountMinimum) { // if (chunck === '.' && wordCount > wordCountMinimum) {
// if (wordCountMinimum < 64) {
// wordCountMinimum *= 2
// }
// completion = completion.replaceAll('...', '')
// completion += '...'
// wordCount = 0
// if (ctx.chat?.id) {
// await sendMessage(completion)
// }
// }
// } else if (msg.startsWith('Output Tokens')) {
// outputTokens = msg.split('Output Tokens: ')[1]
// }
// }
// })

// completionStream.on('end', () => {
// const totalOutputTokens = outputTokens // response.headers['x-openai-output-tokens']
// const totalInputTokens = inputTokens // response.headers['x-openai-input-tokens']
// console.log('FCO stream', completion)
// // You can also process the completion content here

// return {
// completion: {
// content: completion,
// role: 'assistant',
// model
// },
// usage: parseInt(totalOutputTokens, 10) + parseInt(totalInputTokens, 10),
// price: 0
// }
// })

// return {
// completion: undefined,
// usage: 0,
// price: 0
// }
// }
export const anthropicStreamCompletion = async (
conversation: ChatConversation[],
model = LlmsModelsEnum.CLAUDE_OPUS,
ctx: OnMessageContext | OnCallBackQueryData,
msgId: number,
limitTokens = true
): Promise<LlmCompletion> => {
const data = {
model,
stream: true, // Set stream to true to receive the completion as a stream
system: config.openAi.chatGpt.chatCompletionContext,
max_tokens: limitTokens ? +config.openAi.chatGpt.maxTokens : undefined,
messages: conversation.map(m => { return { content: m.content, role: m.role } })
}
let wordCount = 0
let wordCountMinimum = 2
const url = `${API_ENDPOINT}/anthropic/completions`
if (!ctx.chat?.id) {
throw new Error('Context chat id should not be empty after openAI streaming')
}
const response: AxiosResponse = await axios.post(url, data, { responseType: 'stream' })
// Create a Readable stream from the response
const completionStream: Readable = response.data
// Read and process the stream
let completion = ''
let outputTokens = ''
let inputTokens = ''
for await (const chunk of completionStream) {
const msg = chunk.toString()
if (msg) {
if (msg.startsWith('Input Token')) {
inputTokens = msg.split('Input Token: ')[1]
} else if (msg.startsWith('Text')) {
wordCount++
completion += msg.split('Text: ')[1]
if (wordCount > wordCountMinimum) { // if (chunck === '.' && wordCount > wordCountMinimum) {
if (wordCountMinimum < 64) {
wordCountMinimum *= 2
}
completion = completion.replaceAll('...', '')
completion += '...'
wordCount = 0
if (ctx.chat?.id) {
await ctx.api
.editMessageText(ctx.chat?.id, msgId, completion)
.catch(async (e: any) => {
if (e instanceof GrammyError) {
if (e.error_code !== 400) {
throw e
} else {
logger.error(e)
}
} else {
throw e
}
})
}
}
} else if (msg.startsWith('Output Tokens')) {
outputTokens = msg.split('Output Tokens: ')[1]
}
}
}
completion = completion.replaceAll('...', '')
await ctx.api
.editMessageText(ctx.chat?.id, msgId, completion)
.catch((e: any) => {
if (e instanceof GrammyError) {
if (e.error_code !== 400) {
throw e
} else {
logger.error(e)
}
} else {
throw e
}
})
const totalOutputTokens = outputTokens // response.headers['x-openai-output-tokens']
const totalInputTokens = inputTokens // response.headers['x-openai-input-tokens']
return {
completion: {
content: completion,
role: 'assistant',
model
},
usage: parseInt(totalOutputTokens, 10) + parseInt(totalInputTokens, 10),
price: 0,
inputTokens: parseInt(totalInputTokens, 10),
outputTokens: parseInt(totalOutputTokens, 10)
}
}
9 changes: 8 additions & 1 deletion src/modules/llms/api/llmApi.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ import axios from 'axios'
import config from '../../../config'
import { type ChatConversation } from '../../types'
import pino from 'pino'
import { LlmsModelsEnum } from '../types'
import { LlmsModels, LlmsModelsEnum } from '../types'
import { type ChatModel } from '../../open-ai/types'

const API_ENDPOINT = config.llms.apiEndpoint // config.llms.apiEndpoint // 'http://localhost:8080' // http://127.0.0.1:5000' // config.llms.apiEndpoint

Expand All @@ -18,6 +19,8 @@ export interface LlmCompletion {
completion: ChatConversation | undefined
usage: number
price: number
inputTokens?: number
outputTokens?: number
}

interface LlmAddUrlDocument {
Expand All @@ -33,6 +36,10 @@ interface QueryUrlDocument {
conversation?: ChatConversation[]
}

export const getChatModel = (modelName: string): ChatModel => {
return LlmsModels[modelName]
}

export const llmAddUrlDocument = async (args: LlmAddUrlDocument): Promise<string> => {
const data = { ...args }
const endpointUrl = `${API_ENDPOINT}/collections/document`
Expand Down
19 changes: 14 additions & 5 deletions src/modules/llms/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ import {
import { type ParseMode } from 'grammy/types'
import { LlmsModelsEnum } from './types'
import { type Message } from 'grammy/out/types'
import { llmAddUrlDocument } from './api/llmApi'
import { type LlmCompletion, getChatModel, llmAddUrlDocument } from './api/llmApi'
import { getChatModelPrice } from '../open-ai/api/openAi'
import config from '../../config'

export enum SupportedCommands {
bardF = 'bard',
Expand Down Expand Up @@ -213,11 +215,18 @@ export const hasPrefix = (prompt: string): string => {
)
}

export const getPromptPrice = (completion: string, data: ChatPayload): { price: number, promptTokens: number, completionTokens: number } => {
export const getPromptPrice = (completion: LlmCompletion, data: ChatPayload): { price: number, promptTokens: number, completionTokens: number } => {
const { ctx, model } = data
const modelPrice = getChatModel(model)
const price =
getChatModelPrice(modelPrice, true, completion.inputTokens ?? 0, completion.outputTokens ?? 0) *
config.openAi.chatGpt.priceAdjustment
ctx.session.llms.usage += completion.outputTokens ?? 0
ctx.session.llms.price += price
return {
price: 0,
promptTokens: 10,
completionTokens: 60
price,
promptTokens: completion.inputTokens ?? 0,
completionTokens: completion.outputTokens ?? 0
}
}

Expand Down
76 changes: 74 additions & 2 deletions src/modules/llms/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import { sleep } from '../sd-images/utils'
import {
addDocToCollection,
addUrlToCollection,
getPromptPrice,
hasBardPrefix,
hasClaudeOpusPrefix,
hasLlamaPrefix,
Expand All @@ -38,7 +39,7 @@ import * as Sentry from '@sentry/node'
import { now } from '../../utils/perf'
import { AxiosError } from 'axios'
import OpenAI from 'openai'
import { anthropicCompletion } from './api/athropic'
import { anthropicCompletion, anthropicStreamCompletion } from './api/athropic'
export class LlmsBot implements PayableBot {
public readonly module = 'LlmsBot'
private readonly logger: Logger
Expand Down Expand Up @@ -548,6 +549,72 @@ export class LlmsBot implements PayableBot {
ctx.transient.analytics.actualResponseTime = now()
}

private async completionGen (data: ChatPayload, msgId?: number, outputFormat = 'text'): Promise< { price: number, chat: ChatConversation[] }> {
const { conversation, ctx, model } = data
try {
if (!msgId) {
ctx.transient.analytics.firstResponseTime = now()
msgId = (
await ctx.reply('...', {
message_thread_id:
ctx.message?.message_thread_id ??
ctx.message?.reply_to_message?.message_thread_id
})
).message_id
}
if (outputFormat === 'text') {
const isTypingEnabled = config.openAi.chatGpt.isTypingEnabled
if (isTypingEnabled) {
ctx.chatAction = 'typing'
}
const completion = await anthropicStreamCompletion(
conversation,
model as LlmsModelsEnum,
ctx,
msgId,
true // telegram messages has a character limit
)
if (isTypingEnabled) {
ctx.chatAction = null
}
if (completion) {
ctx.transient.analytics.sessionState = RequestState.Success
ctx.transient.analytics.actualResponseTime = now()
const price = getPromptPrice(completion, data)
this.logger.info(
`streamChatCompletion result = tokens: ${price.promptTokens + price.completionTokens} | ${model} | price: ${price.price}¢` // }
)
conversation.push({
role: 'assistant',
content: completion.completion?.content ?? ''
})
return {
price: price.price,
chat: conversation
}
}
} else {
const response = await anthropicCompletion(conversation, model as LlmsModelsEnum)
conversation.push({
role: 'assistant',
content: response.completion?.content ?? ''
})
return {
price: response.price,
chat: conversation
}
}
return {
price: 0,
chat: conversation
}
} catch (e: any) {
Sentry.captureException(e)
ctx.chatAction = null
throw e
}
}

private async promptGen (data: ChatPayload): Promise<{ price: number, chat: ChatConversation[] }> {
const { conversation, ctx, model } = data
if (!ctx.chat?.id) {
Expand Down Expand Up @@ -686,7 +753,12 @@ export class LlmsBot implements PayableBot {
model: model ?? config.llms.model,
ctx
}
const result = await this.promptGen(payload)
let result: { price: number, chat: ChatConversation[] } = { price: 0, chat: [] }
if (model === LlmsModelsEnum.CLAUDE_OPUS || model === LlmsModelsEnum.CLAUDE_SONNET) {
result = await this.completionGen(payload) // , prompt.msgId, prompt.outputFormat)
} else {
result = await this.promptGen(payload)
}
ctx.session.llms.chatConversation = [...result.chat]
if (
!(await this.payments.pay(ctx as OnMessageContext, result.price))
Expand Down

0 comments on commit 1950121

Please sign in to comment.