diff --git a/backend-python/routes/completion.py b/backend-python/routes/completion.py index 3ffad8b5..5ee7af25 100644 --- a/backend-python/routes/completion.py +++ b/backend-python/routes/completion.py @@ -1,15 +1,22 @@ import asyncio import json from threading import Lock -from typing import List, Union +from typing import List, Union, Literal from enum import Enum import base64 -import time +import time, re, random, string from fastapi import APIRouter, Request, status, HTTPException +from fastapi.encoders import jsonable_encoder from sse_starlette.sse import EventSourceResponse from pydantic import BaseModel, Field import tiktoken + +from routes.schema import ( + ChatCompletionMessageParam, + ChatCompletionToolParam, + ChatCompletionNamedToolChoiceParam, +) from utils.rwkv import * from utils.log import quick_log import global_var @@ -21,12 +28,7 @@ class Role(Enum): User = "user" Assistant = "assistant" System = "system" - - -class Message(BaseModel): - role: Role - content: str = Field(min_length=0) - raw: bool = Field(False, description="Whether to treat content as raw text") + Tool = "tool" default_stop = [ @@ -40,14 +42,19 @@ class Message(BaseModel): "\n\nA", "\n\nBot", "\n\nAlice", + "\n\nObservation", ] class ChatCompletionBody(ModelConfigBody): - messages: Union[List[Message], None] + messages: Union[List[ChatCompletionMessageParam], None] model: Union[str, None] = "rwkv" stream: bool = False stop: Union[str, List[str], None] = default_stop + tools: Union[List[ChatCompletionToolParam], None] = None + tool_choice: Union[ + Literal["none", "auto", "required"], ChatCompletionNamedToolChoiceParam + ] = "auto" user_name: Union[str, None] = Field( None, description="Internal user name", min_length=1 ) @@ -116,7 +123,7 @@ class CompletionBody(ModelConfigBody): async def eval_rwkv( model: AbstractRWKV, request: Request, - body: ModelConfigBody, + body: ModelConfigBody | ChatCompletionBody, prompt: str, stream: bool, stop: Union[str, List[str], None], @@ -236,16 +243,12 @@ async def eval_rwkv( } ) yield "[DONE]" - else: + else: # !stream yield { + "id": "", "object": "chat.completion" if chat_mode else "text_completion", - # "response": response, + "created": int(time.time()), "model": model.name, - "usage": { - "prompt_tokens": prompt_tokens, - "completion_tokens": completion_tokens, - "total_tokens": prompt_tokens + completion_tokens, - }, "choices": [ ( { @@ -264,6 +267,11 @@ async def eval_rwkv( } ) ], + "usage": { + "prompt_tokens": prompt_tokens, + "completion_tokens": completion_tokens, + "total_tokens": prompt_tokens + completion_tokens, + }, } @@ -352,17 +360,34 @@ def chat_template( ) system = "System" if body.system_name is None else body.system_name + tool = "Obersavtion" for message in body.messages: append_message: str = "" - if message.role == Role.User: - append_message = f"{user}{interface} " + message.content - elif message.role == Role.Assistant: - append_message = f"{bot}{interface} " + message.content - elif message.role == Role.System: - append_message = f"{system}{interface} " + message.content + match message.role: + case Role.User.value: + append_message = f"{user}{interface} " + message.content + case Role.Assistant.value: + if message.content is None: + if message.tool_calls and len(message.tool_calls) > 0: + name = message.tool_calls[0].function.name + arguments = json.loads(message.tool_calls[0].function.arguments) + arguments = ", ".join( + [f'"{k}"="{v}"' for k, v in arguments.items()] + ) + append_message = ( + f"{bot}{interface} " + + f"{name}\n```python\ntool_call({arguments})\n```" + ) + else: + continue + else: + append_message = f"{bot}{interface} " + message.content + case Role.System.value: + append_message = f"{system}{interface} " + message.content + case Role.Tool.value: + append_message = f"{tool}{interface} " + message.content completion_text += append_message + "\n\n" completion_text += f"{bot}{interface}" - return completion_text @@ -397,6 +422,307 @@ async def chat_completions(body: ChatCompletionBody, request: Request): # if not body.presystem: # body.stop.append("\n\n") + if body.tool_choice != "none" and body.tools is not None and len(body.tools) > 0: + return await chat_with_tools(model, body, request, completion_text) + else: + return await chat(model, body, request, completion_text) + + +async def chat_with_tools( + model: TextRWKV, body: ChatCompletionBody, request: Request, completion_text: str +): + system = "System" if body.system_name is None else body.system_name + interface = model.interface + tools = [tool.function for tool in body.tools] + tools_text = json.dumps(jsonable_encoder(tools), indent=2) + + # Function Call Prompts + tools_text = f"""\ +{system}{interface} You are a helpful assistant with access to the following functions. Use them if required -{tools_text} +""" + + completion_text = tools_text + "\n" + completion_text + + if body.stream: + response = async_generator_stream_respose(model, body, request, completion_text) + return EventSourceResponse(response) + else: + response = await chat(model, body, request, completion_text) + response = postprocess_response(response) + return response + + +async def async_generator_stream_respose( + model: TextRWKV, body: ChatCompletionBody, request: Request, completion_text: str +): + # NOTE: There is none of existing failure analysis. + + # Initialization + gen = eval_rwkv( + model, request, body, completion_text, body.stream, body.stop, True + ) # Get an asnyc generator handle + content: str = "" + function_id: str = "call_" + "".join( + random.sample(string.ascii_letters + string.digits, 24) + ) + flag_is_function_call_confirmed = False + flag_is_common_confirmed = False + + # Loop, there is only one existing endpoint. + done = False + stack_keyword_pairs = [["```", "```"], ["(", ")"], ['"', '"'], ["'", "'"]] + while True: + if done: + yield json.dumps( + { + "object": "chat.completion.chunk", + "model": model.name, + "choices": [ + {"index": 0, "delta": {}, "finish_reason": "tool_calls"} + ], + } + ) + yield "[DONE]" + + try: + response = await anext(gen) # Generate a delta response + if response == "[DONE]": + done = True + continue + except StopAsyncIteration: + # Too few inference result + if not flag_is_function_call_confirmed and not flag_is_common_confirmed: + response_decoded["choices"][0]["delta"]["content"] = content + yield json.dumps(response_decoded) + break # The EXPECTED endpoint of the loop and the function + + if flag_is_common_confirmed: + yield response + continue + + # Post process response + response_decoded = json.loads(response) # Decode string + if response_decoded["choices"][0]["delta"] == {}: + continue + delta_content = response_decoded["choices"][0]["delta"]["content"] + content += delta_content + + if flag_is_function_call_confirmed: + if "\n\n" in content: + done = True + continue + + for pair in stack_keyword_pairs: + if done: + break + for keyword in pair: + if keyword in delta_content: + stack.append(keyword) + if ( + pair[0] in stack + and pair[1] in stack + and stack.index(pair[0]) < stack.index(pair[1]) + ): + stack.remove(pair[0]) + stack.remove(pair[1]) + if "(" not in stack and ")" not in stack: + done = True + response_decoded["choices"][0]["delta"] = { + "tool_calls": [ + { + "index": 0, + "function": { + "arguments": ( + '"' + if delta_content.startswith('"') + else "" + ) + + "}", + }, + } + ] + } + yield json.dumps(response_decoded) + break + if done: + continue + + delta_content = delta_content.replace("=", ":") + # content = content.replace(r'"', r"\"") # XXX: Check whether to reserve. + response_decoded["choices"][0]["delta"]["content"] = None + response_decoded["choices"][0]["delta"] = { + "tool_calls": [ + { + "index": 0, + "function": { + "arguments": delta_content, + }, + } + ] + } + yield json.dumps(response_decoded) + continue + + if not flag_is_common_confirmed and not flag_is_common_confirmed: + """ + # Unconfirmed Response, check content field by the followings: + # Up to 4 line feeds: Common Response. + # Up to 60 characters: Common Response. + # Up to 44 charaters under markdown code block unclosed: Common Response. + # Feild "```Functionname\ntool_call(...)```" detected: Function Call Response. + # - There will be 2 responses generated. + # Default: Unsure Response. + # - Recheck with the next delta.content feild added. + """ + # Constant + LIMIT_LINE_FEEDS = 4 + LIMIT_CHARACTERS = 60 + LIMIT_FUNCTION_NAME_CHARACTERS = 44 + REGEX_BLOCKS_HEADERS = r"([\w]+)[\s]*```[\w\s]*tool_call\(" + + # Regex + regex_match_function_call_head: re.Match | None = re.search( + REGEX_BLOCKS_HEADERS, content + ) + + # Confirm Common Response + if regex_match_function_call_head is None and ( + content.count("\n") >= LIMIT_LINE_FEEDS + or len(content) > LIMIT_CHARACTERS + or ( + len(content) > LIMIT_FUNCTION_NAME_CHARACTERS + and "```" not in content + ) + ): + flag_is_common_confirmed = True + response_decoded["choices"][0]["delta"]["content"] = content + yield json.dumps(response_decoded) + del response_decoded + del content + continue + + # Confirm Function call Response + if regex_match_function_call_head is not None: + flag_is_function_call_confirmed = True + stack = ["```", "("] + + # Generate a blank content response + response_decoded["choices"][0]["delta"]["role"] = "assistant" + response_decoded["choices"][0]["delta"]["content"] = None + yield json.dumps(response_decoded) + + # Generate a function call details response + name = regex_match_function_call_head.group(1) + del response_decoded["choices"][0]["delta"]["role"] + del response_decoded["choices"][0]["delta"]["content"] + response_decoded["choices"][0]["delta"] = { + "tool_calls": [ + { + "index": 0, + "id": function_id, + "type": "function", + "function": { + "name": name, + "arguments": "", + }, + } + ] + } + yield json.dumps(response_decoded) + response_decoded["choices"][0]["delta"] = { + "tool_calls": [ + { + "index": 0, + "function": { + "arguments": "{" + + ('"' if delta_content.endswith('"') else ""), + }, + } + ] + } + yield json.dumps(response_decoded) + + # Reset content buffer + # content = feild_function_call_block.group(2) + continue + + # Default: Unsure Response + continue + # End of loop body + + +def postprocess_response(response: dict): + # NOTE: There is none of existing failure analysis. + REGEX_BLOCKS = r"([\w]+)[\s]*```[\w\s]*tool_call(.*?)\n*```" + REGEX_ARGS = r'[\'"]([^\'"]+)[\'"]\s*=\s*[\'"]([^\'"]+)[\'"]' + + regex_match = re.search( + REGEX_BLOCKS, response["choices"][0]["message"]["content"], re.DOTALL + ) + if regex_match is None: + return response + + name = regex_match.group(1) + function = regex_match.group(2).strip() + arguments = json.dumps(dict(re.findall(REGEX_ARGS, function))) + + tool_calls = [ + { + "id": "call_" + + "".join(random.sample(string.ascii_letters + string.digits, 24)), + "type": "function", + "function": { + "name": name, + "arguments": arguments, + }, + } + ] + + response["choices"][0]["message"]["tool_calls"] = tool_calls + response["choices"][0]["message"]["content"] = None + response["choices"][0]["logprobs"] = None + response["choices"][0]["finish_reason"] = "tool_calls" + + return response + + +# ----------------------------------- +# @Description: (reserved) post process multi-function-call responses +# ----------------------------------- +# def postprocess_response(response: dict): +# REGEX_BLOCKS = r'```[\w]*(.*?)```' +# REGEX_FUNCTIONS = r'(\w+)*\(' +# REGEX_ARGS = r'"([^"]+)"\s*=\s*"([^"]+)"' + +# tool_calls = [] +# blocks = re.findall(REGEX_BLOCKS, response["choices"][0]["message"]["content"], re.DOTALL) +# for block in blocks: +# functions = block.strip().split('\n') +# for function in functions: +# name = re.search(REGEX_FUNCTIONS, function).group(1) +# arguments = json.dumps(dict(re.findall(REGEX_ARGS, function))) +# tool_calls.append( +# { +# "id": "call_" + "".join(random.sample(string.ascii_letters + string.digits, 24)), +# "type": "function", +# "function": { +# "name": name, +# "arguments": arguments, +# } +# } +# ) + +# response["choices"][0]["message"]["tool_calls"] = tool_calls +# response["choices"][0]["message"]["content"] = None +# response["choices"][0]["logprobs"] = None +# response["choices"][0]["finish_reason"] = "tool_calls" + +# return response + + +async def chat( + model: TextRWKV, body: ChatCompletionBody, request: Request, completion_text: str +): if body.stream: return EventSourceResponse( eval_rwkv( diff --git a/backend-python/routes/schema.py b/backend-python/routes/schema.py new file mode 100644 index 00000000..d03e06f7 --- /dev/null +++ b/backend-python/routes/schema.py @@ -0,0 +1,97 @@ +from pydantic import BaseModel, Field +from typing import Literal, Optional, Union, List, Dict + + +class ChatCompletionMessageToolCallParamFunction(BaseModel): + arguments: str = Field( + description=""" + The arguments to call the function with, as generated by the model in JSON + format. Note that the model does not always generate valid JSON, and may + hallucinate parameters not defined by your function schema. Validate the + arguments in your code before calling your function. + """ + ) + name: str = Field(description="The name of the function to call.") + + +class ChatCompletionMessageToolCallParam(BaseModel): + id: str = Field(description="The ID of the tool call.") + function: ChatCompletionMessageToolCallParamFunction = Field( + description="The function that the model called." + ) + type: Literal["function"] + + +class ChatCompletionSystemMessageParam(BaseModel): + content: str + role: Literal["system"] + name: Optional[str] = None + raw: bool = Field(False, description="Whether to treat content as raw text") + + +class ChatCompletionUserMessageParam(BaseModel): + content: str + role: Literal["user"] + name: Optional[str] = None + raw: bool = Field(False, description="Whether to treat content as raw text") + + +class ChatCompletionAssistantMessageParam(BaseModel): + content: Optional[str] = None + role: Literal["assistant"] + name: Optional[str] = None + tool_calls: Optional[List[ChatCompletionMessageToolCallParam]] = Field( + description="The tool calls generated by the model, such as function calls." + ) + raw: bool = Field(False, description="Whether to treat content as raw text") + + +class ChatCompletionToolMessageParam(BaseModel): + content: str + role: Literal["tool"] + name: Optional[str] = None + tool_call_id: str = Field( + description="Tool call that this message is responding to." + ) + + +ChatCompletionMessageParam = Union[ + ChatCompletionSystemMessageParam, + ChatCompletionUserMessageParam, + ChatCompletionAssistantMessageParam, + ChatCompletionToolMessageParam, +] + + +class ResponseFormat(BaseModel): + type: Literal["text", "json_object"] + + +class ChatCompletionNamedToolChoiceParamFunction(BaseModel): + name: str + + +class ChatCompletionNamedToolChoiceParam(BaseModel): + function: ChatCompletionNamedToolChoiceParamFunction + type: Literal["function"] = "function" + + +FunctionParameters = Dict[str, object] + + +class FunctionDefinition(BaseModel): + name: str = Field( + min_length=1, + max_length=64, + description="The name of the function to be called. Must be a-z, A-Z, 0-9, or contain underscores and dashes, with a maximum length of 64.", + ) + description: str = Field( + None, + description="A description of what the function does, used by the model to choose when and how to call the function.", + ) + parameters: Union[FunctionParameters, None] = None + + +class ChatCompletionToolParam(BaseModel): + function: FunctionDefinition + type: Literal["function"] = "function" diff --git a/backend-python/tests/function_call.py b/backend-python/tests/function_call.py new file mode 100644 index 00000000..ed6a0b11 --- /dev/null +++ b/backend-python/tests/function_call.py @@ -0,0 +1,96 @@ +# https://platform.openai.com/docs/guides/function-calling + +from openai import OpenAI +import json + +client = OpenAI( + base_url="http://127.0.0.1:8000", + api_key="test", +) + + +# Example dummy function hard coded to return the same weather +# In production, this could be your backend API or an external API +def get_current_weather(location, unit="fahrenheit"): + """Get the current weather in a given location""" + if "tokyo" in location.lower(): + return json.dumps({"location": "Tokyo", "temperature": "10", "unit": unit}) + elif "san francisco" in location.lower(): + return json.dumps( + {"location": "San Francisco", "temperature": "72", "unit": unit} + ) + elif "paris" in location.lower(): + return json.dumps({"location": "Paris", "temperature": "22", "unit": unit}) + else: + return json.dumps({"location": location, "temperature": "unknown"}) + + +def run_conversation(): + # Step 1: send the conversation and available functions to the model + messages = [ + { + "role": "user", + "content": "What's the weather like in Paris?", + } + ] + tools = [ + { + "type": "function", + "function": { + "name": "get_current_weather", + "description": "Get the current weather in a given location", + "parameters": { + "type": "object", + "properties": { + "location": { + "type": "string", + "description": "The city and state, e.g. San Francisco, CA", + }, + "unit": {"type": "string", "enum": ["celsius", "fahrenheit"]}, + }, + "required": ["location"], + }, + }, + } + ] + response = client.chat.completions.create( + model="gpt-4o", + messages=messages, + tools=tools, + tool_choice="auto", # auto is default, but we'll be explicit + ) + response_message = response.choices[0].message + tool_calls = response_message.tool_calls + # Step 2: check if the model wanted to call a function + if tool_calls: + # Step 3: call the function + # Note: the JSON response may not always be valid; be sure to handle errors + available_functions = { + "get_current_weather": get_current_weather, + } # only one function in this example, but you can have multiple + messages.append(response_message) # extend conversation with assistant's reply + # Step 4: send the info for each function call and function response to the model + for tool_call in tool_calls: + function_name = tool_call.function.name + function_to_call = available_functions[function_name] + function_args = json.loads(tool_call.function.arguments) + function_response = function_to_call( + location=function_args.get("location"), + unit=function_args.get("unit"), + ) + messages.append( + { + "tool_call_id": tool_call.id, + "role": "tool", + "name": function_name, + "content": function_response, + } + ) # extend conversation with function response + second_response = client.chat.completions.create( + model="gpt-4o", + messages=messages, + ) # get a new response from the model where it can see the function response + return second_response.choices[0].message.content + + +print(run_conversation()) diff --git a/backend-python/tests/function_call_stream.py b/backend-python/tests/function_call_stream.py new file mode 100644 index 00000000..1dcef2fd --- /dev/null +++ b/backend-python/tests/function_call_stream.py @@ -0,0 +1,150 @@ +# Example of an OpenAI ChatCompletion request with stream=True +# https://platform.openai.com/docs/guides/chat +import time +import json +from openai import OpenAI +from collections import defaultdict + +# record the time before the request is sent +start_time = time.time() + + +# Example dummy function hard coded to return the same weather +# In production, this could be your backend API or an external API +def get_current_weather(location, unit="fahrenheit"): + """Get the current weather in a given location""" + if "tokyo" in location.lower(): + return json.dumps({"location": "Tokyo", "temperature": "10", "unit": unit}) + elif "san francisco" in location.lower(): + return json.dumps( + {"location": "San Francisco", "temperature": "72", "unit": unit} + ) + elif "paris" in location.lower(): + return json.dumps({"location": "Paris", "temperature": "22", "unit": unit}) + else: + return json.dumps({"location": location, "temperature": "unknown"}) + + +client = OpenAI( + base_url="http://127.0.0.1:8000", + api_key="test", +) + +messages = [ + { + "role": "user", + "content": "What's the weather like in Paris?", + } +] + +tools = [ + { + "type": "function", + "function": { + "name": "get_current_weather", + "description": "Get the current weather in a given location", + "parameters": { + "type": "object", + "properties": { + "location": { + "type": "string", + "description": "The city and state, e.g. San Francisco, CA", + }, + "unit": {"type": "string", "enum": ["celsius", "fahrenheit"]}, + }, + "required": ["location"], + }, + }, + } +] +response = client.chat.completions.create( + model="gpt-4o", + messages=messages, + tools=tools, + tool_choice="auto", # auto is default, but we'll be explicit + stream=True, +) + +# https://community.openai.com/t/has-anyone-managed-to-get-a-tool-call-working-when-stream-true/498867/11 +tool_calls = [] +index = 0 +start = True +for chunk in response: + print(chunk) + chunk_time = time.time() - start_time + + delta = chunk.choices[0].delta + if not delta: + break + if not delta.function_call and not delta.tool_calls: + if start: + continue + else: + break + start = False + if delta.function_call: + if index == len(tool_calls): + tool_calls.append(defaultdict(str)) + if delta.function_call.name: + tool_calls[index]["name"] = delta.function_call.name + if delta.function_call.arguments: + tool_calls[index]["arguments"] += delta.function_call.arguments + elif delta.tool_calls: + tool_call = delta.tool_calls[0] + index = tool_call.index + if index == len(tool_calls): + tool_calls.append(defaultdict(str)) + if tool_call.id: + tool_calls[index]["id"] = tool_call.id + if tool_call.function: + if tool_call.function.name: + tool_calls[index]["name"] = tool_call.function.name + if tool_call.function.arguments: + tool_calls[index]["arguments"] += tool_call.function.arguments + +print() +print(tool_calls) +print(f"Full response received {chunk_time:.2f} seconds after request") + +if tool_calls: + # Step 3: call the function + # Note: the JSON response may not always be valid; be sure to handle errors + available_functions = { + "get_current_weather": get_current_weather, + } # only one function in this example, but you can have multiple + # Step 4: send the info for each function call and function response to the model + for tool_call in tool_calls: + function_name = tool_call["name"] + function_to_call = available_functions[function_name] + function_args = json.loads(tool_call["arguments"]) + function_response = function_to_call( + location=function_args.get("location"), + unit=function_args.get("unit"), + ) + messages.append( + { + "role": "assistant", + "tool_calls": [ + { + "id": tool_call["id"], + "type": "function", + "function": { + "name": function_name, + "arguments": tool_call["arguments"], + }, + } + ], + } + ) # extend conversation with assistant's reply + messages.append( + { + "tool_call_id": tool_call["id"], + "role": "tool", + "content": function_response, + } + ) # extend conversation with function response + second_response = client.chat.completions.create( + model="gpt-4o", + messages=messages, + ) # get a new response from the model where it can see the function response + print(second_response.choices[0].message.content) diff --git a/backend-python/tests/postprocess_response.py b/backend-python/tests/postprocess_response.py new file mode 100644 index 00000000..8f612d3a --- /dev/null +++ b/backend-python/tests/postprocess_response.py @@ -0,0 +1,72 @@ +import re + + +def postprocess_response(s): + REGEX_BLOCKS = r"([\w]+)[\s]*```[\w]*(.*?)```" + REGEX_ARGS = r'"([^"]+)"\s*=\s*"([^"]+)"' + + name = re.search(REGEX_BLOCKS, s, re.DOTALL).group(1) + function = re.search(REGEX_BLOCKS, s, re.DOTALL).group(2).strip() + arguments = dict(re.findall(REGEX_ARGS, function)) + + print(f"Name:\n{name}") + print(f"Function:\n{function}") + print(f"arguments:\n{arguments}") + print() + + return + + +def postprocess_response_reserved(s): + REGEX_BLOCKS = r"```[\w]*(.*?)```" + REGEX_FUNCTIONS = r"(\w+)*\(" + REGEX_ARGS = r'"([^"]+)"\s*=\s*"([^"]+)"' + + blocks = re.findall(REGEX_BLOCKS, s, re.DOTALL) + print(f"Blocks:\n{blocks}") + for block in blocks: + functions = block.strip().split("\n") + print(f"Functions:\n{functions}") + print() + for function in functions: + name = re.search(REGEX_FUNCTIONS, function).group(1) + arguments = f"{dict(re.findall(REGEX_ARGS, function))}" + + print(function) + print(name) + print(arguments) + print() + + return + + +if __name__ == "__main__": + str = """ + some texts + some texts + some texts + some texts + + ```python\n + get_current_wether("location"= "Tokyo", "unit" ="None")\n + ``` + + some texts + some texts + some texts + some texts + """ + postprocess_response(str) + + str = """ get_exchange_rate +```python +tool_call("base_currency"= "func_as_param('Hello World!')", "target_currency"= "CNY") +```""" + postprocess_response(str) + + str = """\ +get_current_weather +```python\n +tool_call("location"= "Tokyo", "unit"= "None")\n +```""" + postprocess_response(str)