From 4003f18a8528eb6a1a805c5700f4efb6e6efc13a Mon Sep 17 00:00:00 2001 From: Abigail Hartman Date: Wed, 31 Jul 2024 17:21:00 -0700 Subject: [PATCH] Fix unclosed client session (#1032) Signed-off-by: Benjamin <18634262+beplay@users.noreply.github.com> Co-authored-by: Benjamin <18634262+beplay@users.noreply.github.com> --- app.py | 118 +++++++++++++++++++++++++++++++-------------------------- 1 file changed, 64 insertions(+), 54 deletions(-) diff --git a/app.py b/app.py index dba4f1277c..7c3a86c646 100644 --- a/app.py +++ b/app.py @@ -4,6 +4,7 @@ import logging import uuid import httpx +import asyncio from quart import ( Blueprint, Quart, @@ -12,6 +13,7 @@ request, send_from_directory, render_template, + current_app, ) from openai import AsyncAzureOpenAI @@ -36,11 +38,24 @@ bp = Blueprint("routes", __name__, static_folder="static", template_folder="static") +cosmos_db_ready = asyncio.Event() + def create_app(): app = Quart(__name__) app.register_blueprint(bp) app.config["TEMPLATES_AUTO_RELOAD"] = True + + @app.before_serving + async def init(): + try: + app.cosmos_conversation_client = await init_cosmosdb_client() + cosmos_db_ready.set() + except Exception as e: + logging.exception("Failed to initialize CosmosDB client") + app.cosmos_conversation_client = None + raise e + return app @@ -96,8 +111,9 @@ async def assets(path): # Initialize Azure OpenAI Client -def init_openai_client(): +async def init_openai_client(): azure_openai_client = None + try: # API version check if ( @@ -128,9 +144,11 @@ def init_openai_client(): ad_token_provider = None if not aoai_api_key: logging.debug("No AZURE_OPENAI_KEY found, using Azure Entra ID auth") - ad_token_provider = get_bearer_token_provider( - DefaultAzureCredential(), "https://cognitiveservices.azure.com/.default" - ) + async with DefaultAzureCredential() as credential: + ad_token_provider = get_bearer_token_provider( + credential, + "https://cognitiveservices.azure.com/.default" + ) # Deployment deployment = app_settings.azure_openai.model @@ -155,7 +173,7 @@ def init_openai_client(): raise e -def init_cosmosdb_client(): +async def init_cosmosdb_client(): cosmos_conversation_client = None if app_settings.chat_history: try: @@ -164,7 +182,9 @@ def init_cosmosdb_client(): ) if not app_settings.chat_history.account_key: - credential = DefaultAzureCredential() + async with DefaultAzureCredential() as cred: + credential = cred + else: credential = app_settings.chat_history.account_key @@ -314,7 +334,7 @@ async def send_chat_request(request_body, request_headers): model_args = prepare_model_args(request_body, request_headers) try: - azure_openai_client = init_openai_client() + azure_openai_client = await init_openai_client() raw_response = await azure_openai_client.chat.completions.with_raw_response.create(**model_args) response = raw_response.parse() apim_request_id = raw_response.headers.get("apim-request-id") @@ -393,6 +413,7 @@ def get_frontend_settings(): ## Conversation History API ## @bp.route("/history/generate", methods=["POST"]) async def add_conversation(): + await cosmos_db_ready.wait() authenticated_user = get_authenticated_user_details(request_headers=request.headers) user_id = authenticated_user["user_principal_id"] @@ -402,15 +423,14 @@ async def add_conversation(): try: # make sure cosmos is configured - cosmos_conversation_client = init_cosmosdb_client() - if not cosmos_conversation_client: + if not current_app.cosmos_conversation_client: raise Exception("CosmosDB is not configured or not working") # check for the conversation_id, if the conversation is not set, we will create a new one history_metadata = {} if not conversation_id: title = await generate_title(request_json["messages"]) - conversation_dict = await cosmos_conversation_client.create_conversation( + conversation_dict = await current_app.cosmos_conversation_client.create_conversation( user_id=user_id, title=title ) conversation_id = conversation_dict["id"] @@ -421,7 +441,7 @@ async def add_conversation(): ## then write it to the conversation history in cosmos messages = request_json["messages"] if len(messages) > 0 and messages[-1]["role"] == "user": - createdMessageValue = await cosmos_conversation_client.create_message( + createdMessageValue = await current_app.cosmos_conversation_client.create_message( uuid=str(uuid.uuid4()), conversation_id=conversation_id, user_id=user_id, @@ -436,8 +456,6 @@ async def add_conversation(): else: raise Exception("No user message found") - await cosmos_conversation_client.cosmosdb_client.close() - # Submit request to Chat Completions for response request_body = await request.get_json() history_metadata["conversation_id"] = conversation_id @@ -451,6 +469,7 @@ async def add_conversation(): @bp.route("/history/update", methods=["POST"]) async def update_conversation(): + await cosmos_db_ready.wait() authenticated_user = get_authenticated_user_details(request_headers=request.headers) user_id = authenticated_user["user_principal_id"] @@ -460,8 +479,7 @@ async def update_conversation(): try: # make sure cosmos is configured - cosmos_conversation_client = init_cosmosdb_client() - if not cosmos_conversation_client: + if not current_app.cosmos_conversation_client: raise Exception("CosmosDB is not configured or not working") # check for the conversation_id, if the conversation is not set, we will create a new one @@ -474,14 +492,14 @@ async def update_conversation(): if len(messages) > 0 and messages[-1]["role"] == "assistant": if len(messages) > 1 and messages[-2].get("role", None) == "tool": # write the tool message first - await cosmos_conversation_client.create_message( + await current_app.cosmos_conversation_client.create_message( uuid=str(uuid.uuid4()), conversation_id=conversation_id, user_id=user_id, input_message=messages[-2], ) # write the assistant message - await cosmos_conversation_client.create_message( + await current_app.cosmos_conversation_client.create_message( uuid=messages[-1]["id"], conversation_id=conversation_id, user_id=user_id, @@ -491,7 +509,6 @@ async def update_conversation(): raise Exception("No bot messages found") # Submit request to Chat Completions for response - await cosmos_conversation_client.cosmosdb_client.close() response = {"success": True} return jsonify(response), 200 @@ -502,9 +519,9 @@ async def update_conversation(): @bp.route("/history/message_feedback", methods=["POST"]) async def update_message(): + await cosmos_db_ready.wait() authenticated_user = get_authenticated_user_details(request_headers=request.headers) user_id = authenticated_user["user_principal_id"] - cosmos_conversation_client = init_cosmosdb_client() ## check request for message_id request_json = await request.get_json() @@ -518,7 +535,7 @@ async def update_message(): return jsonify({"error": "message_feedback is required"}), 400 ## update the message in cosmos - updated_message = await cosmos_conversation_client.update_message_feedback( + updated_message = await current_app.cosmos_conversation_client.update_message_feedback( user_id, message_id, message_feedback ) if updated_message: @@ -548,6 +565,7 @@ async def update_message(): @bp.route("/history/delete", methods=["DELETE"]) async def delete_conversation(): + await cosmos_db_ready.wait() ## get the user id from the request headers authenticated_user = get_authenticated_user_details(request_headers=request.headers) user_id = authenticated_user["user_principal_id"] @@ -561,22 +579,19 @@ async def delete_conversation(): return jsonify({"error": "conversation_id is required"}), 400 ## make sure cosmos is configured - cosmos_conversation_client = init_cosmosdb_client() - if not cosmos_conversation_client: + if not current_app.cosmos_conversation_client: raise Exception("CosmosDB is not configured or not working") ## delete the conversation messages from cosmos first - deleted_messages = await cosmos_conversation_client.delete_messages( + deleted_messages = await current_app.cosmos_conversation_client.delete_messages( conversation_id, user_id ) ## Now delete the conversation - deleted_conversation = await cosmos_conversation_client.delete_conversation( + deleted_conversation = await current_app.cosmos_conversation_client.delete_conversation( user_id, conversation_id ) - await cosmos_conversation_client.cosmosdb_client.close() - return ( jsonify( { @@ -593,20 +608,19 @@ async def delete_conversation(): @bp.route("/history/list", methods=["GET"]) async def list_conversations(): + await cosmos_db_ready.wait() offset = request.args.get("offset", 0) authenticated_user = get_authenticated_user_details(request_headers=request.headers) user_id = authenticated_user["user_principal_id"] ## make sure cosmos is configured - cosmos_conversation_client = init_cosmosdb_client() - if not cosmos_conversation_client: + if not current_app.cosmos_conversation_client: raise Exception("CosmosDB is not configured or not working") ## get the conversations from cosmos - conversations = await cosmos_conversation_client.get_conversations( + conversations = await current_app.cosmos_conversation_client.get_conversations( user_id, offset=offset, limit=25 ) - await cosmos_conversation_client.cosmosdb_client.close() if not isinstance(conversations, list): return jsonify({"error": f"No conversations for {user_id} were found"}), 404 @@ -617,6 +631,7 @@ async def list_conversations(): @bp.route("/history/read", methods=["POST"]) async def get_conversation(): + await cosmos_db_ready.wait() authenticated_user = get_authenticated_user_details(request_headers=request.headers) user_id = authenticated_user["user_principal_id"] @@ -628,12 +643,11 @@ async def get_conversation(): return jsonify({"error": "conversation_id is required"}), 400 ## make sure cosmos is configured - cosmos_conversation_client = init_cosmosdb_client() - if not cosmos_conversation_client: + if not current_app.cosmos_conversation_client: raise Exception("CosmosDB is not configured or not working") ## get the conversation object and the related messages from cosmos - conversation = await cosmos_conversation_client.get_conversation( + conversation = await current_app.cosmos_conversation_client.get_conversation( user_id, conversation_id ) ## return the conversation id and the messages in the bot frontend format @@ -648,7 +662,7 @@ async def get_conversation(): ) # get the messages for the conversation from cosmos - conversation_messages = await cosmos_conversation_client.get_messages( + conversation_messages = await current_app.cosmos_conversation_client.get_messages( user_id, conversation_id ) @@ -664,12 +678,12 @@ async def get_conversation(): for msg in conversation_messages ] - await cosmos_conversation_client.cosmosdb_client.close() return jsonify({"conversation_id": conversation_id, "messages": messages}), 200 @bp.route("/history/rename", methods=["POST"]) async def rename_conversation(): + await cosmos_db_ready.wait() authenticated_user = get_authenticated_user_details(request_headers=request.headers) user_id = authenticated_user["user_principal_id"] @@ -681,12 +695,11 @@ async def rename_conversation(): return jsonify({"error": "conversation_id is required"}), 400 ## make sure cosmos is configured - cosmos_conversation_client = init_cosmosdb_client() - if not cosmos_conversation_client: + if not current_app.cosmos_conversation_client: raise Exception("CosmosDB is not configured or not working") ## get the conversation from cosmos - conversation = await cosmos_conversation_client.get_conversation( + conversation = await current_app.cosmos_conversation_client.get_conversation( user_id, conversation_id ) if not conversation: @@ -704,16 +717,16 @@ async def rename_conversation(): if not title: return jsonify({"error": "title is required"}), 400 conversation["title"] = title - updated_conversation = await cosmos_conversation_client.upsert_conversation( + updated_conversation = await current_app.cosmos_conversation_client.upsert_conversation( conversation ) - await cosmos_conversation_client.cosmosdb_client.close() return jsonify(updated_conversation), 200 @bp.route("/history/delete_all", methods=["DELETE"]) async def delete_all_conversations(): + await cosmos_db_ready.wait() ## get the user id from the request headers authenticated_user = get_authenticated_user_details(request_headers=request.headers) user_id = authenticated_user["user_principal_id"] @@ -721,11 +734,10 @@ async def delete_all_conversations(): # get conversations for user try: ## make sure cosmos is configured - cosmos_conversation_client = init_cosmosdb_client() - if not cosmos_conversation_client: + if not current_app.cosmos_conversation_client: raise Exception("CosmosDB is not configured or not working") - conversations = await cosmos_conversation_client.get_conversations( + conversations = await current_app.cosmos_conversation_client.get_conversations( user_id, offset=0, limit=None ) if not conversations: @@ -734,15 +746,14 @@ async def delete_all_conversations(): # delete each conversation for conversation in conversations: ## delete the conversation messages from cosmos first - deleted_messages = await cosmos_conversation_client.delete_messages( + deleted_messages = await current_app.cosmos_conversation_client.delete_messages( conversation["id"], user_id ) ## Now delete the conversation - deleted_conversation = await cosmos_conversation_client.delete_conversation( + deleted_conversation = await current_app.cosmos_conversation_client.delete_conversation( user_id, conversation["id"] ) - await cosmos_conversation_client.cosmosdb_client.close() return ( jsonify( { @@ -759,6 +770,7 @@ async def delete_all_conversations(): @bp.route("/history/clear", methods=["POST"]) async def clear_messages(): + await cosmos_db_ready.wait() ## get the user id from the request headers authenticated_user = get_authenticated_user_details(request_headers=request.headers) user_id = authenticated_user["user_principal_id"] @@ -772,12 +784,11 @@ async def clear_messages(): return jsonify({"error": "conversation_id is required"}), 400 ## make sure cosmos is configured - cosmos_conversation_client = init_cosmosdb_client() - if not cosmos_conversation_client: + if not current_app.cosmos_conversation_client: raise Exception("CosmosDB is not configured or not working") ## delete the conversation messages from cosmos - deleted_messages = await cosmos_conversation_client.delete_messages( + deleted_messages = await current_app.cosmos_conversation_client.delete_messages( conversation_id, user_id ) @@ -797,18 +808,17 @@ async def clear_messages(): @bp.route("/history/ensure", methods=["GET"]) async def ensure_cosmos(): + await cosmos_db_ready.wait() if not app_settings.chat_history: return jsonify({"error": "CosmosDB is not configured"}), 404 try: - cosmos_conversation_client = init_cosmosdb_client() - success, err = await cosmos_conversation_client.ensure() - if not cosmos_conversation_client or not success: + success, err = await current_app.cosmos_conversation_client.ensure() + if not current_app.cosmos_conversation_client or not success: if err: return jsonify({"error": err}), 422 return jsonify({"error": "CosmosDB is not configured or not working"}), 500 - await cosmos_conversation_client.cosmosdb_client.close() return jsonify({"message": "CosmosDB is configured and working"}), 200 except Exception as e: logging.exception("Exception in /history/ensure") @@ -848,7 +858,7 @@ async def generate_title(conversation_messages) -> str: messages.append({"role": "user", "content": title_prompt}) try: - azure_openai_client = init_openai_client() + azure_openai_client = await init_openai_client() response = await azure_openai_client.chat.completions.create( model=app_settings.azure_openai.model, messages=messages, temperature=1, max_tokens=64 )