From fb21c313d8c1c46dd728c909ca8b7fe94036dc41 Mon Sep 17 00:00:00 2001 From: antonkulaga Date: Wed, 8 Jan 2025 23:21:38 +0200 Subject: [PATCH] editing stream --- pyproject.toml | 3 + web/just_agents/web/rest_api.py | 135 +++++++++++++++++++++++--------- web/just_agents/web/run.py | 22 +++--- 3 files changed, 114 insertions(+), 46 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index b8f91a3..4368bb4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -37,6 +37,9 @@ just-agents-examples = "0.4.6" pytest = ">=8.3.4" python-dotenv = "*" +[tool.poetry.scripts] +run-agent = "just_agents.web.run:app" + [build-system] requires = ["poetry-core"] build-backend = "poetry.core.masonry.api" \ No newline at end of file diff --git a/web/just_agents/web/rest_api.py b/web/just_agents/web/rest_api.py index 9fac5a5..49ee467 100644 --- a/web/just_agents/web/rest_api.py +++ b/web/just_agents/web/rest_api.py @@ -7,11 +7,11 @@ from starlette.responses import StreamingResponse from dotenv import load_dotenv from fastapi.middleware.cors import CORSMiddleware -import loguru +from pycomfort.logging import log_function import yaml import os from pycomfort.logging import log_function -from eliot import log_call +from eliot import log_call, log_message import json @@ -76,7 +76,7 @@ def _agent_related_config(self, agent_config: Path | str, agent_section: Optiona if agent_config is None: # Load from environment variable or use default agent_config = os.getenv('AGENT_CONFIG_PATH', 'agent_profiles.yaml') - self.agent = BaseAgent.from_yaml(file_path=agent_config, section_name=agent_section, parent_section=agent_parent_section) + self.agent: BaseAgent = BaseAgent.from_yaml(file_path=agent_config, section_name=agent_section, parent_section=agent_parent_section) @@ -115,44 +115,105 @@ def default(self): return f"This is default page for the {self.title}" @log_call(action_type="chat_completions", include_result=False) - #TODO: I think this is wrong, we should send deltas when required using litellm streaming def chat_completions(self, request: dict): try: - loguru.logger.debug(request) agent = self.agent self._clean_messages(request) self._remove_system_prompt(request) - if request["messages"]: - if request.get("stream") and str(request.get("stream")).lower() != "false": - return StreamingResponse( - agent.stream(request["messages"]), media_type="text/event-stream" - ) - resp_content = agent.query(request["messages"]) + + if not request["messages"]: + log_message( + message_type="validation_error", + error="No messages provided in request" + ) + return { + "error": { + "message": "No messages provided in request", + "type": "invalid_request_error", + "param": "messages", + "code": "invalid_request_error" + } + }, 400 + + # Validate required fields + if "model" not in request: + log_message( + message_type="validation_error", + error="model is required" + ) + return { + "error": { + "message": "model is required", + "type": "invalid_request_error", + "param": "model", + "code": "invalid_request_error" + } + }, 400 + + is_streaming = request.get("stream", False) + stream_generator = agent.stream(request["messages"]) + + if is_streaming: + return StreamingResponse( + stream_generator, + media_type="text/event-stream", + headers={ + "Cache-Control": "no-cache", + "Connection": "keep-alive", + "Content-Type": "text/event-stream" + } + ) else: - resp_content = "Something goes wrong, request did not contain messages!!!" - except Exception as e: - loguru.logger.error(str(e)) - resp_content = str(e) + # Collect all chunks into final response + response_content = "" + for chunk in stream_generator: + if chunk == "[DONE]": + break + try: + # Parse the SSE data + data = json.loads(chunk.decode().split("data: ")[1]) + if "choices" in data and len(data["choices"]) > 0: + delta = data["choices"][0].get("delta", {}) + if "content" in delta: + response_content += delta["content"] + except Exception: + continue - #TODO: I took it from Alex Karmazin implementation in LongevityGPTs but I THINK THIS IS TOTALLY WRONG - - # Updated response format to match OpenAI API v1 - return { - "id": f"chatcmpl-{time.time()}", # Should be a unique identifier - "object": "chat.completion", - "created": int(time.time()), - "model": request.get("model", "unknown"), - "choices": [{ - "index": 0, - "message": { - "role": "assistant", - "content": resp_content - }, - "finish_reason": "stop" # Added finish_reason - }], - "usage": { - "prompt_tokens": 0, # Should implement token counting - "completion_tokens": 0, # Should implement token counting - "total_tokens": 0 # Should implement token counting - } - } \ No newline at end of file + return { + "id": f"chatcmpl-{time.time()}", + "object": "chat.completion", + "created": int(time.time()), + "model": request.get("model", "unknown"), + "choices": [{ + "index": 0, + "message": { + "role": "assistant", + "content": response_content + }, + "finish_reason": "stop" + }], + "usage": { + "prompt_tokens": 0, + "completion_tokens": 0, + "total_tokens": 0 + } + } + + except Exception as e: + log_message( + message_type="chat_completion_error", + error=str(e), + error_type=type(e).__name__, + request_details={ + "model": request.get("model"), + "message_count": len(request.get("messages", [])), + "streaming": request.get("stream", False) + } + ) + return { + "error": { + "message": str(e), + "type": "server_error", + "code": "internal_server_error" + } + }, 500 \ No newline at end of file diff --git a/web/just_agents/web/run.py b/web/just_agents/web/run.py index 7d7ed2c..0d2b9c0 100644 --- a/web/just_agents/web/run.py +++ b/web/just_agents/web/run.py @@ -3,6 +3,8 @@ from just_agents.web.rest_api import AgentRestAPI import uvicorn import typer +from pycomfort.logging import to_nice_stdout +from eliot import start_action, start_task app = typer.Typer() @@ -27,6 +29,7 @@ def run_server( agent_section: Optional section name in the config file agent_parent_section: Optional parent section name in the config file """ + to_nice_stdout() api = AgentRestAPI( agent_config=config, title=title, @@ -53,15 +56,16 @@ def run_server_command( parent_section: Optional[str] = typer.Option(None, help="Optional parent section name in the config file") ) -> None: """Run the FastAPI server with the given configuration.""" - run_server( - config=config, - host=host, - port=port, - workers=workers, - title=title, - section=section, - parent_section=parent_section - ) + with start_task(action_type="run_server"): + run_server( + config=config, + host=host, + port=port, + workers=workers, + title=title, + section=section, + parent_section=parent_section + ) if __name__ == "__main__": app() \ No newline at end of file